00001
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #ifndef PublisherNew_cpp
00031 #define PublisherNew_cpp
00032
00033 #include <cppunit/ui/text/TestRunner.h>
00034 #include <cppunit/TextOutputter.h>
00035 #include <cppunit/extensions/TestFactoryRegistry.h>
00036 #include <cppunit/extensions/HelperMacros.h>
00037 #include <cppunit/TestAssert.h>
00038
00039 #include <coil/Properties.h>
00040 #include <rtm/InPortConsumer.h>
00041 #include <rtm/PublisherNew.h>
00042 #include <coil/Time.h>
00043 #include <rtm/CORBA_SeqUtil.h>
00044 #include <rtm/NVUtil.h>
00045 #include <rtm/InPortCorbaCdrConsumer.h>
00046 #include <rtm/CdrRingBuffer.h>
00047
00048 #include <rtm/idl/BasicDataTypeSkel.h>
00049 #include <rtm/ConnectorListener.h>
00050
00055 namespace PublisherNew
00056 {
00057 int m_OnCheck = 0;
00058
00062 class DataListener
00063 : public RTC::ConnectorDataListenerT<RTC::TimedLong>
00064 {
00065 public:
00066 DataListener(const char* name) : m_name(name) {}
00067 virtual ~DataListener()
00068 {
00069 }
00070
00071 virtual void operator()(const RTC::ConnectorInfo& info,
00072 const RTC::TimedLong& data)
00073 {
00074 std::cout << "------------------------------" << std::endl;
00075 std::cout << "Listener: " << m_name << std::endl;
00076 std::cout << " Data: " << data.data << std::endl;
00077 std::cout << "------------------------------" << std::endl;
00078 };
00079 std::string m_name;
00080 };
00081
00086 class PublisherNewMock
00087 : public RTC::PublisherNew
00088 {
00089 public:
00090 PublisherNewMock(void)
00091 {
00092 ;
00093 }
00094 virtual ~PublisherNewMock(void)
00095 {
00096 ;
00097 }
00098 };
00103 class InPortCorbaCdrConsumerMock
00104 : public RTC::InPortCorbaCdrConsumer
00105 {
00106 public:
00111 InPortCorbaCdrConsumerMock(void)
00112 {
00113 m_buffer = new RTC::CdrRingBuffer();
00114 m_test_mode = 0;
00115 }
00120 virtual ~InPortCorbaCdrConsumerMock()
00121 {
00122 delete m_buffer;
00123 }
00128 virtual ReturnCode put(const cdrMemoryStream& data)
00129 {
00130 if(m_test_mode == 0)
00131 {
00132 if (m_buffer->full())
00133 {
00134 return RTC::PublisherNew::SEND_FULL;
00135 }
00136
00137 RTC::BufferStatus::Enum ret = m_buffer->write(data);
00138
00139
00140 if(m_OnCheck == 0) {
00141 switch(ret)
00142 {
00143 case RTC::BufferStatus::BUFFER_OK:
00144 return RTC::PublisherNew::PORT_OK;
00145 break;
00146 case RTC::BufferStatus::BUFFER_ERROR:
00147 return RTC::PublisherNew::PORT_ERROR;
00148 break;
00149 case RTC::BufferStatus::BUFFER_FULL:
00150 return RTC::PublisherNew::SEND_FULL;
00151 break;
00152 case RTC::BufferStatus::BUFFER_EMPTY:
00153 return RTC::PublisherNew::BUFFER_EMPTY;
00154 break;
00155 case RTC::BufferStatus::TIMEOUT:
00156 return RTC::PublisherNew::SEND_TIMEOUT;
00157 break;
00158 default:
00159 return RTC::PublisherNew::UNKNOWN_ERROR;
00160 }
00161 return RTC::PublisherNew::UNKNOWN_ERROR;
00162 }
00163 else if(m_OnCheck == 1) {
00164 return RTC::PublisherNew::PORT_OK;
00165 }
00166 else if(m_OnCheck == 2) {
00167 return RTC::PublisherNew::PORT_ERROR;
00168 }
00169 else if(m_OnCheck == 3) {
00170 return RTC::PublisherNew::SEND_FULL;
00171 }
00172 else if(m_OnCheck == 4) {
00173 return RTC::PublisherNew::SEND_TIMEOUT;
00174 }
00175 else if(m_OnCheck == 5) {
00176 return RTC::PublisherNew::UNKNOWN_ERROR;
00177 }
00178 else if(m_OnCheck == 6) {
00179 return RTC::PublisherNew::CONNECTION_LOST;
00180 }
00181 }
00182 else if(m_test_mode == 1)
00183 {
00184 std::string str("test");
00185 throw str;
00186 }
00187 else
00188 {
00189 }
00190 }
00195 cdrMemoryStream get_m_put_data(void)
00196 {
00197 cdrMemoryStream cdr;
00198 m_buffer->read(cdr);
00199
00200 return cdr;
00201 }
00206 int get_m_put_data_len(void)
00207 {
00208 int ic;
00209 ic = (int)m_buffer->readable();
00210
00211 return ic;
00212 }
00213
00218 void set_m_mode(int mode)
00219 {
00220 m_test_mode = mode;
00221 }
00222 private:
00223 RTC::CdrBufferBase* m_buffer;
00224 ::OpenRTM::CdrData m_put_data;
00225 int m_test_mode;
00226 };
00227
00228 class PublisherNewTests
00229 : public CppUnit::TestFixture
00230 {
00231 CPPUNIT_TEST_SUITE(PublisherNewTests);
00232
00233
00234 CPPUNIT_TEST(test_setConsumer);
00235 CPPUNIT_TEST(test_setBuffer);
00236 CPPUNIT_TEST(test_activate_deactivate_isActive);
00237 CPPUNIT_TEST(test_pushAll);
00238 CPPUNIT_TEST(test_pushAll_2);
00239 CPPUNIT_TEST(test_pushFifo);
00240 CPPUNIT_TEST(test_pushFifo_2);
00241 CPPUNIT_TEST(test_pushSkip);
00242 CPPUNIT_TEST(test_pushSkip_2);
00243 CPPUNIT_TEST(test_pushNew);
00244 CPPUNIT_TEST(test_write);
00245
00246 CPPUNIT_TEST_SUITE_END();
00247
00248 private:
00249
00250 public:
00251 RTC::ConnectorListeners m_listeners;
00252
00256 PublisherNewTests()
00257 {
00258 }
00259
00263 ~PublisherNewTests()
00264 {
00265 }
00266
00270 virtual void setUp()
00271 {
00272
00273 }
00274
00278 virtual void tearDown()
00279 {
00280 }
00281
00286 void test_init(void)
00287 {
00288 PublisherNewMock publisher;
00289 RTC::PublisherBase::ReturnCode retcode;
00290 coil::Properties prop;
00291
00292
00293 retcode = publisher.init(prop);
00294 coil::usleep(10000);
00295 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK, retcode);
00296
00297 prop.setProperty("publisher.push_policy","new");
00298 prop.setProperty("thread_type","bar");
00299 prop.setProperty("measurement.exec_time","default");
00300 prop.setProperty("measurement.period_count","1");
00301
00302
00303 retcode = publisher.init(prop);
00304 coil::usleep(10000);
00305 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::INVALID_ARGS, retcode);
00306
00307
00308 prop.setProperty("publisher.push_policy","all");
00309 prop.setProperty("publisher.skip_count","0");
00310 prop.setProperty("thread_type","default");
00311 prop.setProperty("measurement.exec_time","enable");
00312 prop.setProperty("measurement.exec_count","0");
00313 prop.setProperty("measurement.period_time","enable");
00314 prop.setProperty("measurement.period_count","0");
00315 retcode = publisher.init(prop);
00316 coil::usleep(10000);
00317 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK, retcode);
00318
00319 prop.setProperty("publisher.push_policy","fifo");
00320 prop.setProperty("publisher.skip_count","1");
00321 prop.setProperty("thread_type","default");
00322 prop.setProperty("measurement.exec_time","disable");
00323 prop.setProperty("measurement.exec_count","1");
00324 prop.setProperty("measurement.period_time","disable");
00325 prop.setProperty("measurement.period_count","1");
00326 retcode = publisher.init(prop);
00327 coil::usleep(10000);
00328 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK, retcode);
00329
00330 prop.setProperty("publisher.push_policy","skip");
00331 prop.setProperty("publisher.skip_count","-1");
00332 prop.setProperty("thread_type","default");
00333 prop.setProperty("measurement.exec_time","bar");
00334 prop.setProperty("measurement.exec_count","-1");
00335 prop.setProperty("measurement.period_time","bar");
00336 prop.setProperty("measurement.period_count","-1");
00337 retcode = publisher.init(prop);
00338 coil::usleep(10000);
00339 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK, retcode);
00340
00341 prop.setProperty("publisher.push_policy","new");
00342 prop.setProperty("publisher.skip_count","foo");
00343 prop.setProperty("thread_type","default");
00344 prop.setProperty("measurement.exec_time","enable");
00345 prop.setProperty("measurement.exec_count","foo");
00346 prop.setProperty("measurement.period_time","enable");
00347 prop.setProperty("measurement.period_count","foo");
00348 retcode = publisher.init(prop);
00349 coil::usleep(10000);
00350 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK, retcode);
00351
00352 prop.setProperty("publisher.push_policy","bar");
00353 prop.setProperty("publisher.skip_count","0");
00354 prop.setProperty("thread_type","default");
00355 prop.setProperty("measurement.exec_time","enable");
00356 prop.setProperty("measurement.exec_count","0");
00357 prop.setProperty("measurement.period_time","enable");
00358 prop.setProperty("measurement.period_count","0");
00359 retcode = publisher.init(prop);
00360 coil::usleep(10000);
00361 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK, retcode);
00362
00363 }
00368 void test_setConsumer(void)
00369 {
00370 RTC::InPortCorbaCdrConsumer *consumer0
00371 = new RTC::InPortCorbaCdrConsumer();
00372 RTC::InPortCorbaCdrConsumer *consumer1
00373 = new RTC::InPortCorbaCdrConsumer();
00374 RTC::PublisherNew publisher;
00375
00376
00377 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::INVALID_ARGS,
00378 publisher.setConsumer(NULL));
00379
00380
00381 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK,
00382 publisher.setConsumer(consumer0));
00383
00384
00385 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK,
00386 publisher.setConsumer(consumer1));
00387
00388 delete consumer0;
00389 delete consumer1;
00390 }
00395 void test_setBuffer(void)
00396 {
00397 RTC::CdrBufferBase* buffer0 = new RTC::CdrRingBuffer();
00398 RTC::CdrBufferBase* buffer1 = new RTC::CdrRingBuffer();
00399 RTC::PublisherNew publisher;
00400
00401
00402 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::INVALID_ARGS,
00403 publisher.setBuffer(NULL));
00404
00405
00406 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK,
00407 publisher.setBuffer(buffer0));
00408
00409
00410 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK,
00411 publisher.setBuffer(buffer1));
00412
00413 delete buffer0;
00414 delete buffer1;
00415 }
00420 void test_activate_deactivate_isActive(void)
00421 {
00422 RTC::InPortCorbaCdrConsumer *consumer
00423 = new RTC::InPortCorbaCdrConsumer();
00424 RTC::PublisherNew publisher;
00425 publisher.setConsumer(consumer);
00426
00427 CPPUNIT_ASSERT_EQUAL(false,
00428 publisher.isActive());
00429
00430 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK,
00431 publisher.activate());
00432
00433 CPPUNIT_ASSERT_EQUAL(true,
00434 publisher.isActive());
00435
00436
00437
00438
00439 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK,
00440 publisher.activate());
00441
00442 CPPUNIT_ASSERT_EQUAL(true,
00443 publisher.isActive());
00444
00445 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK,
00446 publisher.deactivate());
00447
00448 CPPUNIT_ASSERT_EQUAL(false,
00449 publisher.isActive());
00450
00451
00452
00453
00454 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK,
00455 publisher.deactivate());
00456
00457 CPPUNIT_ASSERT_EQUAL(false,
00458 publisher.isActive());
00459
00460 coil::usleep(10000);
00461 delete consumer;
00462 }
00468 void test_pushAll(void)
00469 {
00470 InPortCorbaCdrConsumerMock *consumer
00471 = new InPortCorbaCdrConsumerMock();
00472 RTC::CdrBufferBase* buffer = new RTC::CdrRingBuffer();
00473 PublisherNewMock publisher;
00474
00475 coil::Properties prop;
00476 prop.setProperty("publisher.push_policy","all");
00477 prop.setProperty("publisher.skip_count","0");
00478 prop.setProperty("thread_type","default");
00479 prop.setProperty("measurement.exec_time","enable");
00480 prop.setProperty("measurement.exec_count","0");
00481 prop.setProperty("measurement.period_time","enable");
00482 prop.setProperty("measurement.period_count","0");
00483 publisher.init(prop);
00484 coil::usleep(10000);
00485
00486
00487 coil::vstring ports;
00488 RTC::ConnectorInfo info("name", "id", ports, prop);
00489
00490
00491 m_listeners.connectorData_[RTC::ON_BUFFER_WRITE].addListener(
00492 new DataListener("ON_BUFFER_WRITE"), true);
00493 m_listeners.connectorData_[RTC::ON_BUFFER_FULL].addListener(
00494 new DataListener("ON_BUFFER_FULL"), true);
00495 m_listeners.connectorData_[RTC::ON_BUFFER_WRITE_TIMEOUT].addListener(
00496 new DataListener("ON_BUFFER_WRITE_TIMEOUT"), true);
00497 m_listeners.connectorData_[RTC::ON_BUFFER_OVERWRITE].addListener(
00498 new DataListener("ON_BUFFER_OVERWRITE"), true);
00499 m_listeners.connectorData_[RTC::ON_BUFFER_READ].addListener(
00500 new DataListener("ON_BUFFER_READ"), true);
00501 m_listeners.connectorData_[RTC::ON_SEND].addListener(
00502 new DataListener("ON_SEND"), true);
00503 m_listeners.connectorData_[RTC::ON_RECEIVED].addListener(
00504 new DataListener("ON_RECEIVED"), true);
00505 m_listeners.connectorData_[RTC::ON_RECEIVER_FULL].addListener(
00506 new DataListener("ON_RECEIVER_FULL"), true);
00507 m_listeners.connectorData_[RTC::ON_RECEIVER_TIMEOUT].addListener(
00508 new DataListener("ON_RECEIVER_TIMEOUT"), true);
00509 m_listeners.connectorData_[RTC::ON_RECEIVER_ERROR].addListener(
00510 new DataListener("ON_RECEIVER_ERROR"), true);
00511
00512
00513 CPPUNIT_ASSERT_EQUAL(RTC::DataPortStatus::INVALID_ARGS,
00514 publisher.setListener(info, 0));
00515 CPPUNIT_ASSERT_EQUAL(RTC::DataPortStatus::PORT_OK,
00516 publisher.setListener(info, &m_listeners));
00517
00518 publisher.setConsumer(consumer);
00519 publisher.setBuffer(buffer);
00520 publisher.activate();
00521
00522 for(int icc(0);icc<8;++icc)
00523 {
00524 cdrMemoryStream cdr;
00525 RTC::TimedLong td;
00526 td.data = icc;
00527 td >>= cdr;
00528
00529 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK,
00530 publisher.write(cdr,0,0));
00531
00532 }
00533
00534
00535 {
00536 cdrMemoryStream cdr;
00537 RTC::TimedLong td;
00538 td.data = 8;
00539 td >>= cdr;
00540 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK,
00541 publisher.write(cdr,0,0));
00542 coil::usleep(10000);
00543 }
00544 {
00545 cdrMemoryStream cdr;
00546 RTC::TimedLong td;
00547 td.data = 9;
00548 td >>= cdr;
00549 RTC::PublisherBase::ReturnCode ret = publisher.write(cdr,0,0);
00550 bool bret = false;
00551 if( (ret == RTC::PublisherNew::PORT_OK) ||
00552 (ret == RTC::PublisherNew::BUFFER_FULL) ) bret = true;
00553 CPPUNIT_ASSERT(bret);
00554 coil::usleep(10000);
00555 }
00556
00557
00558
00559 for(int icc(0);icc<4;++icc)
00560 {
00561 cdrMemoryStream data;
00562 data = consumer->get_m_put_data();
00563 CORBA::ULong inlen = data.bufSize();
00564 CPPUNIT_ASSERT_EQUAL(12,(int)inlen);
00565
00566 RTC::TimedLong rtd;
00567 rtd <<= data;
00568 CPPUNIT_ASSERT_EQUAL((long)icc, (long)rtd.data);
00569 }
00570
00571
00572 {
00573 cdrMemoryStream cdr;
00574 RTC::TimedLong td;
00575 td.data = 10;
00576 td >>= cdr;
00577 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::BUFFER_FULL,
00578 publisher.write(cdr,0,0));
00579 coil::usleep(10000);
00580 }
00581 {
00582 cdrMemoryStream cdr;
00583 RTC::TimedLong td;
00584 td.data = 11;
00585 td >>= cdr;
00586 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK,
00587 publisher.write(cdr,0,0));
00588 coil::usleep(10000);
00589 }
00590
00591
00592 for(int icc(0);icc<8;++icc)
00593 {
00594 cdrMemoryStream data;
00595 data = consumer->get_m_put_data();
00596 CORBA::ULong inlen = data.bufSize();
00597 CPPUNIT_ASSERT_EQUAL(12,(int)inlen);
00598
00599 RTC::TimedLong rtd;
00600 rtd <<= data;
00601 CPPUNIT_ASSERT_EQUAL((long)icc+4, (long)rtd.data);
00602 }
00603 publisher.deactivate();
00604
00605 coil::usleep(10000);
00606 delete buffer;
00607 delete consumer;
00608
00609 }
00615 void test_pushAll_2(void)
00616 {
00617 InPortCorbaCdrConsumerMock *consumer
00618 = new InPortCorbaCdrConsumerMock();
00619 RTC::CdrBufferBase* buffer = new RTC::CdrRingBuffer();
00620 PublisherNewMock publisher;
00621
00622 coil::Properties prop;
00623 prop.setProperty("publisher.push_policy","all");
00624 prop.setProperty("publisher.skip_count","0");
00625 prop.setProperty("thread_type","default");
00626 prop.setProperty("measurement.exec_time","enable");
00627 prop.setProperty("measurement.exec_count","0");
00628 prop.setProperty("measurement.period_time","enable");
00629 prop.setProperty("measurement.period_count","0");
00630 publisher.init(prop);
00631 coil::usleep(10000);
00632
00633
00634 coil::vstring ports;
00635 RTC::ConnectorInfo info("name", "id", ports, prop);
00636
00637
00638 m_listeners.connectorData_[RTC::ON_BUFFER_WRITE].addListener(
00639 new DataListener("ON_BUFFER_WRITE"), true);
00640 m_listeners.connectorData_[RTC::ON_BUFFER_FULL].addListener(
00641 new DataListener("ON_BUFFER_FULL"), true);
00642 m_listeners.connectorData_[RTC::ON_BUFFER_WRITE_TIMEOUT].addListener(
00643 new DataListener("ON_BUFFER_WRITE_TIMEOUT"), true);
00644 m_listeners.connectorData_[RTC::ON_BUFFER_OVERWRITE].addListener(
00645 new DataListener("ON_BUFFER_OVERWRITE"), true);
00646 m_listeners.connectorData_[RTC::ON_BUFFER_READ].addListener(
00647 new DataListener("ON_BUFFER_READ"), true);
00648 m_listeners.connectorData_[RTC::ON_SEND].addListener(
00649 new DataListener("ON_SEND"), true);
00650 m_listeners.connectorData_[RTC::ON_RECEIVED].addListener(
00651 new DataListener("ON_RECEIVED"), true);
00652 m_listeners.connectorData_[RTC::ON_RECEIVER_FULL].addListener(
00653 new DataListener("ON_RECEIVER_FULL"), true);
00654 m_listeners.connectorData_[RTC::ON_RECEIVER_TIMEOUT].addListener(
00655 new DataListener("ON_RECEIVER_TIMEOUT"), true);
00656 m_listeners.connectorData_[RTC::ON_RECEIVER_ERROR].addListener(
00657 new DataListener("ON_RECEIVER_ERROR"), true);
00658
00659
00660 CPPUNIT_ASSERT_EQUAL(RTC::DataPortStatus::INVALID_ARGS,
00661 publisher.setListener(info, 0));
00662 CPPUNIT_ASSERT_EQUAL(RTC::DataPortStatus::PORT_OK,
00663 publisher.setListener(info, &m_listeners));
00664
00665 publisher.setConsumer(consumer);
00666 publisher.setBuffer(buffer);
00667 publisher.activate();
00668
00669 for(int icc(0);icc<16;++icc)
00670 {
00671 cdrMemoryStream cdr;
00672 RTC::TimedLong td;
00673 td.data = icc;
00674 td >>= cdr;
00675
00676 RTC::PublisherBase::ReturnCode ret;
00677 ret = publisher.write(cdr,0,0);
00678 if(icc<9)
00679 {
00680 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK,
00681 ret);
00682 }
00683 else
00684 {
00685 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::BUFFER_FULL,
00686 ret);
00687 }
00688 coil::usleep(10000);
00689
00690 }
00691
00692
00693
00694 {
00695 cdrMemoryStream cdr;
00696 RTC::TimedLong td;
00697 td.data = 16;
00698 td >>= cdr;
00699 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::BUFFER_FULL,
00700 publisher.write(cdr,0,0));
00701 coil::usleep(10000);
00702 }
00703
00704
00705 for(int icc(0);icc<8;++icc)
00706 {
00707 cdrMemoryStream data;
00708 data = consumer->get_m_put_data();
00709 CORBA::ULong inlen = data.bufSize();
00710 CPPUNIT_ASSERT_EQUAL(12,(int)inlen);
00711
00712 RTC::TimedLong rtd;
00713 rtd <<= data;
00714 CPPUNIT_ASSERT_EQUAL((long)icc, (long)rtd.data);
00715 }
00716
00717
00718
00719 {
00720 cdrMemoryStream cdr;
00721 RTC::TimedLong td;
00722 td.data = 17;
00723 td >>= cdr;
00724 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::BUFFER_FULL,
00725 publisher.write(cdr,0,0));
00726 coil::usleep(10000);
00727 }
00728
00729 for(int icc(0);icc<8;++icc)
00730 {
00731 cdrMemoryStream data;
00732 data = consumer->get_m_put_data();
00733 CORBA::ULong inlen = data.bufSize();
00734 CPPUNIT_ASSERT_EQUAL(12,(int)inlen);
00735
00736 RTC::TimedLong rtd;
00737 rtd <<= data;
00738 CPPUNIT_ASSERT_EQUAL((long)icc+8, (long)rtd.data);
00739 }
00740 {
00741 cdrMemoryStream cdr;
00742 RTC::TimedLong td;
00743 td.data = 18;
00744 td >>= cdr;
00745 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK,
00746 publisher.write(cdr,0,0));
00747 coil::usleep(10000);
00748 }
00749
00750 {
00751 cdrMemoryStream data;
00752 data = consumer->get_m_put_data();
00753 CORBA::ULong inlen = data.bufSize();
00754 CPPUNIT_ASSERT_EQUAL(12,(int)inlen);
00755
00756 RTC::TimedLong rtd;
00757 rtd <<= data;
00758 CPPUNIT_ASSERT_EQUAL((long)18, (long)rtd.data);
00759 }
00760
00761 coil::usleep(10000);
00762 publisher.deactivate();
00763
00764 coil::usleep(10000);
00765 delete buffer;
00766 delete consumer;
00767
00768 }
00773 void test_pushFifo(void)
00774 {
00775 InPortCorbaCdrConsumerMock *consumer
00776 = new InPortCorbaCdrConsumerMock();
00777 RTC::CdrBufferBase* buffer = new RTC::CdrRingBuffer();
00778 PublisherNewMock publisher;
00779
00780 coil::Properties prop;
00781 prop.setProperty("publisher.push_policy","fifo");
00782 prop.setProperty("publisher.skip_count","0");
00783 prop.setProperty("thread_type","default");
00784 prop.setProperty("measurement.exec_time","enable");
00785 prop.setProperty("measurement.exec_count","0");
00786 prop.setProperty("measurement.period_time","enable");
00787 prop.setProperty("measurement.period_count","0");
00788 publisher.init(prop);
00789 coil::usleep(10000);
00790
00791
00792 coil::vstring ports;
00793 RTC::ConnectorInfo info("name", "id", ports, prop);
00794
00795
00796 m_listeners.connectorData_[RTC::ON_BUFFER_WRITE].addListener(
00797 new DataListener("ON_BUFFER_WRITE"), true);
00798 m_listeners.connectorData_[RTC::ON_BUFFER_FULL].addListener(
00799 new DataListener("ON_BUFFER_FULL"), true);
00800 m_listeners.connectorData_[RTC::ON_BUFFER_WRITE_TIMEOUT].addListener(
00801 new DataListener("ON_BUFFER_WRITE_TIMEOUT"), true);
00802 m_listeners.connectorData_[RTC::ON_BUFFER_OVERWRITE].addListener(
00803 new DataListener("ON_BUFFER_OVERWRITE"), true);
00804 m_listeners.connectorData_[RTC::ON_BUFFER_READ].addListener(
00805 new DataListener("ON_BUFFER_READ"), true);
00806 m_listeners.connectorData_[RTC::ON_SEND].addListener(
00807 new DataListener("ON_SEND"), true);
00808 m_listeners.connectorData_[RTC::ON_RECEIVED].addListener(
00809 new DataListener("ON_RECEIVED"), true);
00810 m_listeners.connectorData_[RTC::ON_RECEIVER_FULL].addListener(
00811 new DataListener("ON_RECEIVER_FULL"), true);
00812 m_listeners.connectorData_[RTC::ON_RECEIVER_TIMEOUT].addListener(
00813 new DataListener("ON_RECEIVER_TIMEOUT"), true);
00814 m_listeners.connectorData_[RTC::ON_RECEIVER_ERROR].addListener(
00815 new DataListener("ON_RECEIVER_ERROR"), true);
00816
00817
00818 CPPUNIT_ASSERT_EQUAL(RTC::DataPortStatus::INVALID_ARGS,
00819 publisher.setListener(info, 0));
00820 CPPUNIT_ASSERT_EQUAL(RTC::DataPortStatus::PORT_OK,
00821 publisher.setListener(info, &m_listeners));
00822
00823 publisher.setConsumer(consumer);
00824 publisher.setBuffer(buffer);
00825 publisher.activate();
00826
00827 for(int icc(0);icc<8;++icc)
00828 {
00829 cdrMemoryStream cdr;
00830 RTC::TimedLong td;
00831 td.data = icc;
00832 td >>= cdr;
00833
00834 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK,
00835 publisher.write(cdr,0,0));
00836
00837 coil::usleep(10000);
00838 }
00839
00840
00841 {
00842 cdrMemoryStream cdr;
00843 RTC::TimedLong td;
00844 td.data = 8;
00845 td >>= cdr;
00846 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK,
00847 publisher.write(cdr,0,0));
00848 coil::usleep(50000);
00849 }
00850 {
00851 cdrMemoryStream cdr;
00852 RTC::TimedLong td;
00853 td.data = 9;
00854 td >>= cdr;
00855
00856
00857 publisher.write(cdr,0,0);
00858 coil::usleep(10000);
00859 }
00860
00861
00862
00863 coil::usleep(10000);
00864 for(int icc(0);icc<4;++icc)
00865 {
00866 cdrMemoryStream data;
00867 data = consumer->get_m_put_data();
00868 CORBA::ULong inlen = data.bufSize();
00869 CPPUNIT_ASSERT_EQUAL(12,(int)inlen);
00870
00871 RTC::TimedLong rtd;
00872 rtd <<= data;
00873 CPPUNIT_ASSERT_EQUAL((long)icc, (long)rtd.data);
00874 }
00875
00876
00877 {
00878 cdrMemoryStream cdr;
00879 RTC::TimedLong td;
00880 td.data = 10;
00881 td >>= cdr;
00882 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::BUFFER_FULL,
00883 publisher.write(cdr,0,0));
00884 coil::usleep(30000);
00885 }
00886 {
00887 cdrMemoryStream cdr;
00888 RTC::TimedLong td;
00889 td.data = 11;
00890 td >>= cdr;
00891 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK,
00892 publisher.write(cdr,0,0));
00893 coil::usleep(10000);
00894 }
00895 {
00896 cdrMemoryStream cdr;
00897 RTC::TimedLong td;
00898 td.data = 12;
00899 td >>= cdr;
00900 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK,
00901 publisher.write(cdr,0,0));
00902 coil::usleep(10000);
00903 }
00904 {
00905 cdrMemoryStream cdr;
00906 RTC::TimedLong td;
00907 td.data = 13;
00908 td >>= cdr;
00909 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK,
00910 publisher.write(cdr,0,0));
00911 coil::usleep(10000);
00912 }
00913
00914
00915 for(int icc(0);icc<8;++icc)
00916 {
00917 cdrMemoryStream data;
00918 data = consumer->get_m_put_data();
00919 CORBA::ULong inlen = data.bufSize();
00920 CPPUNIT_ASSERT_EQUAL(12,(int)inlen);
00921
00922 RTC::TimedLong rtd;
00923 rtd <<= data;
00924 CPPUNIT_ASSERT_EQUAL((long)icc+4, (long)rtd.data);
00925 }
00926
00927
00928
00929 coil::usleep(10000);
00930 publisher.deactivate();
00931 coil::usleep(10000);
00932 delete buffer;
00933 delete consumer;
00934
00935 }
00941 void test_pushFifo_2(void)
00942 {
00943 InPortCorbaCdrConsumerMock *consumer
00944 = new InPortCorbaCdrConsumerMock();
00945 RTC::CdrBufferBase* buffer = new RTC::CdrRingBuffer();
00946 PublisherNewMock publisher;
00947
00948 coil::Properties prop;
00949 prop.setProperty("publisher.push_policy","fifo");
00950 prop.setProperty("publisher.skip_count","0");
00951 prop.setProperty("thread_type","default");
00952 prop.setProperty("measurement.exec_time","enable");
00953 prop.setProperty("measurement.exec_count","0");
00954 prop.setProperty("measurement.period_time","enable");
00955 prop.setProperty("measurement.period_count","0");
00956 publisher.init(prop);
00957 coil::usleep(10000);
00958
00959
00960 coil::vstring ports;
00961 RTC::ConnectorInfo info("name", "id", ports, prop);
00962
00963
00964 m_listeners.connectorData_[RTC::ON_BUFFER_WRITE].addListener(
00965 new DataListener("ON_BUFFER_WRITE"), true);
00966 m_listeners.connectorData_[RTC::ON_BUFFER_FULL].addListener(
00967 new DataListener("ON_BUFFER_FULL"), true);
00968 m_listeners.connectorData_[RTC::ON_BUFFER_WRITE_TIMEOUT].addListener(
00969 new DataListener("ON_BUFFER_WRITE_TIMEOUT"), true);
00970 m_listeners.connectorData_[RTC::ON_BUFFER_OVERWRITE].addListener(
00971 new DataListener("ON_BUFFER_OVERWRITE"), true);
00972 m_listeners.connectorData_[RTC::ON_BUFFER_READ].addListener(
00973 new DataListener("ON_BUFFER_READ"), true);
00974 m_listeners.connectorData_[RTC::ON_SEND].addListener(
00975 new DataListener("ON_SEND"), true);
00976 m_listeners.connectorData_[RTC::ON_RECEIVED].addListener(
00977 new DataListener("ON_RECEIVED"), true);
00978 m_listeners.connectorData_[RTC::ON_RECEIVER_FULL].addListener(
00979 new DataListener("ON_RECEIVER_FULL"), true);
00980 m_listeners.connectorData_[RTC::ON_RECEIVER_TIMEOUT].addListener(
00981 new DataListener("ON_RECEIVER_TIMEOUT"), true);
00982 m_listeners.connectorData_[RTC::ON_RECEIVER_ERROR].addListener(
00983 new DataListener("ON_RECEIVER_ERROR"), true);
00984
00985
00986 CPPUNIT_ASSERT_EQUAL(RTC::DataPortStatus::INVALID_ARGS,
00987 publisher.setListener(info, 0));
00988 CPPUNIT_ASSERT_EQUAL(RTC::DataPortStatus::PORT_OK,
00989 publisher.setListener(info, &m_listeners));
00990
00991 publisher.setConsumer(consumer);
00992 publisher.setBuffer(buffer);
00993 publisher.activate();
00994
00995
00996 for(int icc(0);icc<16;++icc)
00997 {
00998 cdrMemoryStream cdr;
00999 RTC::TimedLong td;
01000 td.data = icc;
01001 td >>= cdr;
01002
01003 RTC::PublisherBase::ReturnCode ret;
01004 ret = publisher.write(cdr,0,0);
01005 if(icc<9)
01006 {
01007 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK,
01008 ret);
01009 }
01010 else
01011 {
01012 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::BUFFER_FULL,
01013 ret);
01014 }
01015 coil::usleep(10000);
01016
01017 }
01018
01019
01020
01021 {
01022 cdrMemoryStream cdr;
01023 RTC::TimedLong td;
01024 td.data = 16;
01025 td >>= cdr;
01026 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::BUFFER_FULL,
01027 publisher.write(cdr,0,0));
01028 coil::usleep(10000);
01029 }
01030
01031
01032 for(int icc(0);icc<8;++icc)
01033 {
01034 cdrMemoryStream data;
01035 data = consumer->get_m_put_data();
01036 CORBA::ULong inlen = data.bufSize();
01037 CPPUNIT_ASSERT_EQUAL(12,(int)inlen);
01038
01039 RTC::TimedLong rtd;
01040 rtd <<= data;
01041 CPPUNIT_ASSERT_EQUAL((long)icc, (long)rtd.data);
01042 }
01043
01044
01045
01046 {
01047 cdrMemoryStream cdr;
01048 RTC::TimedLong td;
01049 td.data = 17;
01050 td >>= cdr;
01051 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::BUFFER_FULL,
01052 publisher.write(cdr,0,0));
01053 coil::usleep(10000);
01054 }
01055
01056 for(int icc(0);icc<7;++icc)
01057 {
01058 cdrMemoryStream cdr;
01059 RTC::TimedLong td;
01060 td.data = (18+icc);
01061 td >>= cdr;
01062 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK,
01063 publisher.write(cdr,0,0));
01064 coil::usleep(10000);
01065 }
01066
01067 for(int icc(0);icc<8;++icc)
01068 {
01069 cdrMemoryStream data;
01070 data = consumer->get_m_put_data();
01071 CORBA::ULong inlen = data.bufSize();
01072 CPPUNIT_ASSERT_EQUAL(12,(int)inlen);
01073
01074 RTC::TimedLong rtd;
01075 rtd <<= data;
01076 CPPUNIT_ASSERT_EQUAL((long)icc+8, (long)rtd.data);
01077 }
01078 {
01079 cdrMemoryStream cdr;
01080 RTC::TimedLong td;
01081 td.data = 26;
01082 td >>= cdr;
01083 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK,
01084 publisher.write(cdr,0,0));
01085 coil::usleep(10000);
01086 }
01087
01088 {
01089 cdrMemoryStream data;
01090 data = consumer->get_m_put_data();
01091 CORBA::ULong inlen = data.bufSize();
01092 CPPUNIT_ASSERT_EQUAL(12,(int)inlen);
01093
01094 RTC::TimedLong rtd;
01095 rtd <<= data;
01096 CPPUNIT_ASSERT_EQUAL((long)18, (long)rtd.data);
01097 }
01098
01099
01100 coil::usleep(10000);
01101 publisher.deactivate();
01102
01103 coil::usleep(10000);
01104 delete buffer;
01105 delete consumer;
01106
01107 }
01112 void test_pushSkip(void)
01113 {
01114 InPortCorbaCdrConsumerMock *consumer
01115 = new InPortCorbaCdrConsumerMock();
01116 RTC::CdrBufferBase* buffer = new RTC::CdrRingBuffer();
01117 PublisherNewMock publisher;
01118
01119 coil::Properties prop;
01120 prop.setProperty("publisher.push_policy","skip");
01121 prop.setProperty("publisher.skip_count","1");
01122 prop.setProperty("thread_type","default");
01123 prop.setProperty("measurement.exec_time","enable");
01124 prop.setProperty("measurement.exec_count","0");
01125 prop.setProperty("measurement.period_time","enable");
01126 prop.setProperty("measurement.period_count","0");
01127 publisher.init(prop);
01128 coil::usleep(10000);
01129
01130
01131 coil::vstring ports;
01132 RTC::ConnectorInfo info("name", "id", ports, prop);
01133
01134
01135 m_listeners.connectorData_[RTC::ON_BUFFER_WRITE].addListener(
01136 new DataListener("ON_BUFFER_WRITE"), true);
01137 m_listeners.connectorData_[RTC::ON_BUFFER_FULL].addListener(
01138 new DataListener("ON_BUFFER_FULL"), true);
01139 m_listeners.connectorData_[RTC::ON_BUFFER_WRITE_TIMEOUT].addListener(
01140 new DataListener("ON_BUFFER_WRITE_TIMEOUT"), true);
01141 m_listeners.connectorData_[RTC::ON_BUFFER_OVERWRITE].addListener(
01142 new DataListener("ON_BUFFER_OVERWRITE"), true);
01143 m_listeners.connectorData_[RTC::ON_BUFFER_READ].addListener(
01144 new DataListener("ON_BUFFER_READ"), true);
01145 m_listeners.connectorData_[RTC::ON_SEND].addListener(
01146 new DataListener("ON_SEND"), true);
01147 m_listeners.connectorData_[RTC::ON_RECEIVED].addListener(
01148 new DataListener("ON_RECEIVED"), true);
01149 m_listeners.connectorData_[RTC::ON_RECEIVER_FULL].addListener(
01150 new DataListener("ON_RECEIVER_FULL"), true);
01151 m_listeners.connectorData_[RTC::ON_RECEIVER_TIMEOUT].addListener(
01152 new DataListener("ON_RECEIVER_TIMEOUT"), true);
01153 m_listeners.connectorData_[RTC::ON_RECEIVER_ERROR].addListener(
01154 new DataListener("ON_RECEIVER_ERROR"), true);
01155
01156
01157 CPPUNIT_ASSERT_EQUAL(RTC::DataPortStatus::INVALID_ARGS,
01158 publisher.setListener(info, 0));
01159 CPPUNIT_ASSERT_EQUAL(RTC::DataPortStatus::PORT_OK,
01160 publisher.setListener(info, &m_listeners));
01161
01162 publisher.setConsumer(consumer);
01163 publisher.setBuffer(buffer);
01164 publisher.activate();
01165
01166 for(int icc(0);icc<16;++icc)
01167 {
01168 cdrMemoryStream cdr;
01169 RTC::TimedLong td;
01170 td.data = icc;
01171 td >>= cdr;
01172
01173
01174
01175 publisher.write(cdr,0,0);
01176 coil::usleep(10000);
01177 }
01178
01179
01180 {
01181 cdrMemoryStream cdr;
01182 RTC::TimedLong td;
01183 td.data = 8;
01184 td >>= cdr;
01185 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK,
01186 publisher.write(cdr,0,0));
01187 coil::usleep(10000);
01188 }
01189 {
01190 cdrMemoryStream cdr;
01191 RTC::TimedLong td;
01192 td.data = 9;
01193 td >>= cdr;
01194 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK,
01195 publisher.write(cdr,0,0));
01196 coil::usleep(10000);
01197 }
01198
01199
01200
01201 for(int icc(0);icc<4;++icc)
01202 {
01203 cdrMemoryStream data;
01204 data = consumer->get_m_put_data();
01205 CORBA::ULong inlen = data.bufSize();
01206 CPPUNIT_ASSERT_EQUAL(12,(int)inlen);
01207
01208 RTC::TimedLong rtd;
01209 rtd <<= data;
01210 CPPUNIT_ASSERT_EQUAL((long)icc*2+1, (long)rtd.data);
01211 }
01212
01213
01214 {
01215 cdrMemoryStream cdr;
01216 RTC::TimedLong td;
01217 td.data = 10;
01218 td >>= cdr;
01219 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::BUFFER_FULL,
01220 publisher.write(cdr,0,0));
01221 coil::usleep(10000);
01222 }
01223 {
01224 cdrMemoryStream cdr;
01225 RTC::TimedLong td;
01226 td.data = 11;
01227 td >>= cdr;
01228 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK,
01229 publisher.write(cdr,0,0));
01230 coil::usleep(10000);
01231 }
01232
01233
01234 for(int icc(0);icc<2;++icc)
01235 {
01236 cdrMemoryStream data;
01237 data = consumer->get_m_put_data();
01238 CORBA::ULong inlen = data.bufSize();
01239 CPPUNIT_ASSERT_EQUAL(12,(int)inlen);
01240
01241 RTC::TimedLong rtd;
01242 rtd <<= data;
01243 CPPUNIT_ASSERT_EQUAL((long)icc*2+9, (long)rtd.data);
01244 }
01245
01246 coil::usleep(100000);
01247 publisher.deactivate();
01248 coil::usleep(10000);
01249 delete buffer;
01250 delete consumer;
01251 }
01257 void test_pushSkip_2(void)
01258 {
01259 InPortCorbaCdrConsumerMock *consumer
01260 = new InPortCorbaCdrConsumerMock();
01261 RTC::CdrBufferBase* buffer = new RTC::CdrRingBuffer();
01262 PublisherNewMock publisher;
01263
01264 coil::Properties prop;
01265 prop.setProperty("publisher.push_policy","skip");
01266 prop.setProperty("publisher.skip_count","1");
01267 prop.setProperty("thread_type","default");
01268 prop.setProperty("measurement.exec_time","enable");
01269 prop.setProperty("measurement.exec_count","0");
01270 prop.setProperty("measurement.period_time","enable");
01271 prop.setProperty("measurement.period_count","0");
01272 publisher.init(prop);
01273 coil::usleep(10000);
01274
01275
01276 coil::vstring ports;
01277 RTC::ConnectorInfo info("name", "id", ports, prop);
01278
01279
01280 m_listeners.connectorData_[RTC::ON_BUFFER_WRITE].addListener(
01281 new DataListener("ON_BUFFER_WRITE"), true);
01282 m_listeners.connectorData_[RTC::ON_BUFFER_FULL].addListener(
01283 new DataListener("ON_BUFFER_FULL"), true);
01284 m_listeners.connectorData_[RTC::ON_BUFFER_WRITE_TIMEOUT].addListener(
01285 new DataListener("ON_BUFFER_WRITE_TIMEOUT"), true);
01286 m_listeners.connectorData_[RTC::ON_BUFFER_OVERWRITE].addListener(
01287 new DataListener("ON_BUFFER_OVERWRITE"), true);
01288 m_listeners.connectorData_[RTC::ON_BUFFER_READ].addListener(
01289 new DataListener("ON_BUFFER_READ"), true);
01290 m_listeners.connectorData_[RTC::ON_SEND].addListener(
01291 new DataListener("ON_SEND"), true);
01292 m_listeners.connectorData_[RTC::ON_RECEIVED].addListener(
01293 new DataListener("ON_RECEIVED"), true);
01294 m_listeners.connectorData_[RTC::ON_RECEIVER_FULL].addListener(
01295 new DataListener("ON_RECEIVER_FULL"), true);
01296 m_listeners.connectorData_[RTC::ON_RECEIVER_TIMEOUT].addListener(
01297 new DataListener("ON_RECEIVER_TIMEOUT"), true);
01298 m_listeners.connectorData_[RTC::ON_RECEIVER_ERROR].addListener(
01299 new DataListener("ON_RECEIVER_ERROR"), true);
01300
01301
01302 CPPUNIT_ASSERT_EQUAL(RTC::DataPortStatus::INVALID_ARGS,
01303 publisher.setListener(info, 0));
01304 CPPUNIT_ASSERT_EQUAL(RTC::DataPortStatus::PORT_OK,
01305 publisher.setListener(info, &m_listeners));
01306
01307 publisher.setConsumer(consumer);
01308 publisher.setBuffer(buffer);
01309 publisher.activate();
01310
01311
01312 for(int icc(0);icc<24;++icc)
01313 {
01314 cdrMemoryStream cdr;
01315 RTC::TimedLong td;
01316 td.data = icc;
01317 td >>= cdr;
01318
01319 RTC::PublisherBase::ReturnCode ret;
01320 ret = publisher.write(cdr,0,0);
01321 if(icc<18)
01322 {
01323 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK,
01324 ret);
01325 }
01326 else
01327 {
01328 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::BUFFER_FULL,
01329 ret);
01330 }
01331 coil::usleep(10000);
01332
01333 }
01334
01335
01336
01337 {
01338 cdrMemoryStream cdr;
01339 RTC::TimedLong td;
01340 td.data = 24;
01341 td >>= cdr;
01342 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::BUFFER_FULL,
01343 publisher.write(cdr,0,0));
01344 coil::usleep(10000);
01345 }
01346
01347
01348 for(int icc(0);icc<8;++icc)
01349 {
01350 cdrMemoryStream data;
01351 data = consumer->get_m_put_data();
01352 CORBA::ULong inlen = data.bufSize();
01353 CPPUNIT_ASSERT_EQUAL(12,(int)inlen);
01354
01355 RTC::TimedLong rtd;
01356 rtd <<= data;
01357 CPPUNIT_ASSERT_EQUAL((long)icc*2+1, (long)rtd.data);
01358 }
01359
01360
01361
01362 {
01363 cdrMemoryStream cdr;
01364 RTC::TimedLong td;
01365 td.data = 25;
01366 td >>= cdr;
01367 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::BUFFER_FULL,
01368 publisher.write(cdr,0,0));
01369 coil::usleep(10000);
01370 }
01371
01372 int len =consumer->get_m_put_data_len();
01373 CPPUNIT_ASSERT_EQUAL(4,len);
01374 for(int icc(0);icc<len;++icc)
01375 {
01376 cdrMemoryStream data;
01377 data = consumer->get_m_put_data();
01378 CORBA::ULong inlen = data.bufSize();
01379 CPPUNIT_ASSERT_EQUAL(12,(int)inlen);
01380
01381 RTC::TimedLong rtd;
01382 rtd <<= data;
01383 CPPUNIT_ASSERT_EQUAL((long)icc*2+17, (long)rtd.data);
01384 }
01385 {
01386 cdrMemoryStream cdr;
01387 RTC::TimedLong td;
01388 td.data = 26;
01389 td >>= cdr;
01390 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK,
01391 publisher.write(cdr,0,0));
01392 coil::usleep(10000);
01393 }
01394 {
01395 cdrMemoryStream cdr;
01396 RTC::TimedLong td;
01397 td.data = 27;
01398 td >>= cdr;
01399 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK,
01400 publisher.write(cdr,0,0));
01401 coil::usleep(10000);
01402 }
01403
01404 {
01405 cdrMemoryStream data;
01406 data = consumer->get_m_put_data();
01407 CORBA::ULong inlen = data.bufSize();
01408 CPPUNIT_ASSERT_EQUAL(12,(int)inlen);
01409
01410 RTC::TimedLong rtd;
01411 rtd <<= data;
01412 CPPUNIT_ASSERT_EQUAL((long)27, (long)rtd.data);
01413 }
01414
01415 coil::usleep(10000);
01416 publisher.deactivate();
01417
01418 coil::usleep(10000);
01419 delete buffer;
01420 delete consumer;
01421
01422 }
01427 void test_pushNew(void)
01428 {
01429 InPortCorbaCdrConsumerMock *consumer
01430 = new InPortCorbaCdrConsumerMock();
01431 RTC::CdrBufferBase* buffer = new RTC::CdrRingBuffer();
01432 PublisherNewMock publisher;
01433
01434 coil::Properties prop;
01435 prop.setProperty("publisher.push_policy","new");
01436 prop.setProperty("publisher.skip_count","0");
01437 prop.setProperty("thread_type","default");
01438 prop.setProperty("measurement.exec_time","enable");
01439 prop.setProperty("measurement.exec_count","0");
01440 prop.setProperty("measurement.period_time","enable");
01441 prop.setProperty("measurement.period_count","0");
01442 publisher.init(prop);
01443 coil::usleep(10000);
01444
01445
01446 coil::vstring ports;
01447 RTC::ConnectorInfo info("name", "id", ports, prop);
01448
01449
01450 m_listeners.connectorData_[RTC::ON_BUFFER_WRITE].addListener(
01451 new DataListener("ON_BUFFER_WRITE"), true);
01452 m_listeners.connectorData_[RTC::ON_BUFFER_FULL].addListener(
01453 new DataListener("ON_BUFFER_FULL"), true);
01454 m_listeners.connectorData_[RTC::ON_BUFFER_WRITE_TIMEOUT].addListener(
01455 new DataListener("ON_BUFFER_WRITE_TIMEOUT"), true);
01456 m_listeners.connectorData_[RTC::ON_BUFFER_OVERWRITE].addListener(
01457 new DataListener("ON_BUFFER_OVERWRITE"), true);
01458 m_listeners.connectorData_[RTC::ON_BUFFER_READ].addListener(
01459 new DataListener("ON_BUFFER_READ"), true);
01460 m_listeners.connectorData_[RTC::ON_SEND].addListener(
01461 new DataListener("ON_SEND"), true);
01462 m_listeners.connectorData_[RTC::ON_RECEIVED].addListener(
01463 new DataListener("ON_RECEIVED"), true);
01464 m_listeners.connectorData_[RTC::ON_RECEIVER_FULL].addListener(
01465 new DataListener("ON_RECEIVER_FULL"), true);
01466 m_listeners.connectorData_[RTC::ON_RECEIVER_TIMEOUT].addListener(
01467 new DataListener("ON_RECEIVER_TIMEOUT"), true);
01468 m_listeners.connectorData_[RTC::ON_RECEIVER_ERROR].addListener(
01469 new DataListener("ON_RECEIVER_ERROR"), true);
01470
01471
01472 CPPUNIT_ASSERT_EQUAL(RTC::DataPortStatus::INVALID_ARGS,
01473 publisher.setListener(info, 0));
01474 CPPUNIT_ASSERT_EQUAL(RTC::DataPortStatus::PORT_OK,
01475 publisher.setListener(info, &m_listeners));
01476
01477 publisher.setConsumer(consumer);
01478 publisher.setBuffer(buffer);
01479 publisher.activate();
01480
01481
01482
01483 for(int icc(0);icc<7;++icc)
01484 {
01485 cdrMemoryStream cdr;
01486 RTC::TimedLong td;
01487 td.data = icc;
01488 td >>= cdr;
01489
01490 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK,
01491 publisher.write(cdr,0,0));
01492
01493 }
01494
01495 coil::usleep(10000);
01496 {
01497 cdrMemoryStream cdr;
01498 RTC::TimedLong td;
01499 td.data = 7;
01500 td >>= cdr;
01501
01502 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK,
01503 publisher.write(cdr,0,0));
01504
01505 }
01506
01507 coil::usleep(10000);
01508
01509
01510 int len = consumer->get_m_put_data_len() -1;
01511 for(int icc(0);icc<len;++icc)
01512 {
01513 cdrMemoryStream data;
01514 data = consumer->get_m_put_data();
01515 CORBA::ULong inlen = data.bufSize();
01516 CPPUNIT_ASSERT_EQUAL(12,(int)inlen);
01517 }
01518
01519 {
01520 cdrMemoryStream data;
01521 data = consumer->get_m_put_data();
01522 CORBA::ULong inlen = data.bufSize();
01523 CPPUNIT_ASSERT_EQUAL(12,(int)inlen);
01524
01525 RTC::TimedLong rtd;
01526 rtd <<= data;
01527 CPPUNIT_ASSERT_EQUAL((long)7, (long)rtd.data);
01528 }
01529
01530 coil::usleep(1000000);
01531 publisher.deactivate();
01532 coil::usleep(1000000);
01533 delete buffer;
01534 delete consumer;
01535 }
01541 void test_write(void)
01542 {
01543 InPortCorbaCdrConsumerMock *consumer
01544 = new InPortCorbaCdrConsumerMock();
01545 RTC::CdrBufferBase* buffer = new RTC::CdrRingBuffer();
01546 PublisherNewMock publisher;
01547
01548 coil::Properties prop;
01549 prop.setProperty("publisher.push_policy","all");
01550 prop.setProperty("publisher.skip_count","0");
01551 prop.setProperty("thread_type","default");
01552 prop.setProperty("measurement.exec_time","enable");
01553 prop.setProperty("measurement.exec_count","0");
01554 prop.setProperty("measurement.period_time","enable");
01555 prop.setProperty("measurement.period_count","0");
01556 publisher.init(prop);
01557 coil::usleep(10000);
01558
01559
01560 coil::vstring ports;
01561 RTC::ConnectorInfo info("name", "id", ports, prop);
01562
01563
01564 m_listeners.connectorData_[RTC::ON_BUFFER_WRITE].addListener(
01565 new DataListener("ON_BUFFER_WRITE"), true);
01566 m_listeners.connectorData_[RTC::ON_BUFFER_FULL].addListener(
01567 new DataListener("ON_BUFFER_FULL"), true);
01568 m_listeners.connectorData_[RTC::ON_BUFFER_WRITE_TIMEOUT].addListener(
01569 new DataListener("ON_BUFFER_WRITE_TIMEOUT"), true);
01570 m_listeners.connectorData_[RTC::ON_BUFFER_OVERWRITE].addListener(
01571 new DataListener("ON_BUFFER_OVERWRITE"), true);
01572 m_listeners.connectorData_[RTC::ON_BUFFER_READ].addListener(
01573 new DataListener("ON_BUFFER_READ"), true);
01574 m_listeners.connectorData_[RTC::ON_SEND].addListener(
01575 new DataListener("ON_SEND"), true);
01576 m_listeners.connectorData_[RTC::ON_RECEIVED].addListener(
01577 new DataListener("ON_RECEIVED"), true);
01578 m_listeners.connectorData_[RTC::ON_RECEIVER_FULL].addListener(
01579 new DataListener("ON_RECEIVER_FULL"), true);
01580 m_listeners.connectorData_[RTC::ON_RECEIVER_TIMEOUT].addListener(
01581 new DataListener("ON_RECEIVER_TIMEOUT"), true);
01582 m_listeners.connectorData_[RTC::ON_RECEIVER_ERROR].addListener(
01583 new DataListener("ON_RECEIVER_ERROR"), true);
01584
01585
01586 CPPUNIT_ASSERT_EQUAL(RTC::DataPortStatus::INVALID_ARGS,
01587 publisher.setListener(info, 0));
01588 CPPUNIT_ASSERT_EQUAL(RTC::DataPortStatus::PORT_OK,
01589 publisher.setListener(info, &m_listeners));
01590
01591
01592 {
01593 cdrMemoryStream cdr;
01594 RTC::TimedLong td;
01595 td.data = 101;
01596 td >>= cdr;
01597 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PRECONDITION_NOT_MET,
01598 publisher.write(cdr,0,0));
01599 coil::usleep(10000);
01600 }
01601 publisher.setBuffer(buffer);
01602 {
01603 cdrMemoryStream cdr;
01604 RTC::TimedLong td;
01605 td.data = 102;
01606 td >>= cdr;
01607 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PRECONDITION_NOT_MET,
01608 publisher.write(cdr,0,0));
01609 coil::usleep(10000);
01610 }
01611
01612
01613 publisher.setConsumer(consumer);
01614 {
01615 cdrMemoryStream cdr;
01616 RTC::TimedLong td;
01617 td.data = 103;
01618 td >>= cdr;
01619 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK,
01620 publisher.write(cdr,0,0));
01621 coil::usleep(10000);
01622 }
01623 publisher.activate();
01624
01625 {
01626 cdrMemoryStream cdr;
01627 RTC::TimedLong td;
01628 td.data = 104;
01629 td >>= cdr;
01630 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK,
01631 publisher.write(cdr,0,0));
01632 coil::usleep(10000);
01633 }
01634
01635
01636
01637 {
01638 cdrMemoryStream cdr;
01639 RTC::TimedLong td;
01640 td.data = 105;
01641 td >>= cdr;
01642 CPPUNIT_ASSERT_EQUAL(RTC::PublisherNew::PORT_OK,
01643 publisher.write(cdr,0,0));
01644 coil::usleep(10000);
01645 }
01646
01647
01648 cdrMemoryStream cdr;
01649 RTC::TimedLong td;
01650 td.data = 777;
01651 td >>= cdr;
01652
01653
01654
01655 m_OnCheck = 2;
01656 publisher.write(cdr,0,0);
01657 coil::usleep(10000);
01658 m_OnCheck = 3;
01659 publisher.write(cdr,0,0);
01660 coil::usleep(10000);
01661 m_OnCheck = 4;
01662 publisher.write(cdr,0,0);
01663 coil::usleep(10000);
01664 m_OnCheck = 5;
01665 publisher.write(cdr,0,0);
01666 coil::usleep(10000);
01667 m_OnCheck = 6;
01668 publisher.write(cdr,0,0);
01669 coil::usleep(10000);
01670
01671 delete consumer;
01672 delete buffer;
01673
01674 }
01682
01683
01684
01685
01686
01687
01688
01689
01690
01691
01692
01693
01694
01695
01696
01697
01698
01699
01700
01701
01702
01703
01704
01705
01706
01707
01708
01709
01710
01711
01712
01713
01714
01722
01723
01724
01725
01726
01727
01728
01729
01730
01731
01732
01733
01734
01735
01736
01737
01738
01739
01740
01741
01742
01743
01744
01745
01746
01747
01748
01749
01750
01751
01752
01753
01754
01755
01761
01762
01763
01764
01765
01766
01767
01768
01769
01770
01771
01772
01773
01774
01775
01776
01777
01778
01779
01780
01781
01782
01783
01784
01785
01786 };
01787 };
01788
01789
01790
01791
01792 CPPUNIT_TEST_SUITE_REGISTRATION(PublisherNew::PublisherNewTests);
01793
01794 #ifdef LOCAL_MAIN
01795 int main(int argc, char* argv[])
01796 {
01797
01798 FORMAT format = TEXT_OUT;
01799 int target = 0;
01800 std::string xsl;
01801 std::string ns;
01802 std::string fname;
01803 std::ofstream ofs;
01804
01805 int i(1);
01806 while (i < argc)
01807 {
01808 std::string arg(argv[i]);
01809 std::string next_arg;
01810 if (i + 1 < argc) next_arg = argv[i + 1];
01811 else next_arg = "";
01812
01813 if (arg == "--text") { format = TEXT_OUT; break; }
01814 if (arg == "--xml")
01815 {
01816 if (next_arg == "")
01817 {
01818 fname = argv[0];
01819 fname += ".xml";
01820 }
01821 else
01822 {
01823 fname = next_arg;
01824 }
01825 format = XML_OUT;
01826 ofs.open(fname.c_str());
01827 }
01828 if ( arg == "--compiler" ) { format = COMPILER_OUT; break; }
01829 if ( arg == "--cerr" ) { target = 1; break; }
01830 if ( arg == "--xsl" )
01831 {
01832 if (next_arg == "") xsl = "default.xsl";
01833 else xsl = next_arg;
01834 }
01835 if ( arg == "--namespace" )
01836 {
01837 if (next_arg == "")
01838 {
01839 std::cerr << "no namespace specified" << std::endl;
01840 exit(1);
01841 }
01842 else
01843 {
01844 xsl = next_arg;
01845 }
01846 }
01847 ++i;
01848 }
01849 CppUnit::TextUi::TestRunner runner;
01850 if ( ns.empty() )
01851 runner.addTest(CppUnit::TestFactoryRegistry::getRegistry().makeTest());
01852 else
01853 runner.addTest(CppUnit::TestFactoryRegistry::getRegistry(ns).makeTest());
01854 CppUnit::Outputter* outputter = 0;
01855 std::ostream* stream = target ? &std::cerr : &std::cout;
01856 switch ( format )
01857 {
01858 case TEXT_OUT :
01859 outputter = new CppUnit::TextOutputter(&runner.result(),*stream);
01860 break;
01861 case XML_OUT :
01862 std::cout << "XML_OUT" << std::endl;
01863 outputter = new CppUnit::XmlOutputter(&runner.result(),
01864 ofs, "shift_jis");
01865 static_cast<CppUnit::XmlOutputter*>(outputter)->setStyleSheet(xsl);
01866 break;
01867 case COMPILER_OUT :
01868 outputter = new CppUnit::CompilerOutputter(&runner.result(),*stream);
01869 break;
01870 }
01871 runner.setOutputter(outputter);
01872 runner.run();
01873 return 0;
01874 }
01875 #endif // MAIN
01876 #endif // PublisherNew_cpp