$search
00001 /*************************************************************************** 00002 tag: The SourceWorks Tue Sep 7 00:54:57 CEST 2010 mqueue_test.cpp 00003 00004 mqueue_test.cpp - description 00005 ------------------- 00006 begin : Tue September 07 2010 00007 copyright : (C) 2010 The SourceWorks 00008 email : peter@thesourceworks.com 00009 00010 *************************************************************************** 00011 * * 00012 * This program is free software; you can redistribute it and/or modify * 00013 * it under the terms of the GNU General Public License as published by * 00014 * the Free Software Foundation; either version 2 of the License, or * 00015 * (at your option) any later version. * 00016 * * 00017 ***************************************************************************/ 00018 00019 #include "unit.hpp" 00020 00021 #include <iostream> 00022 00023 #include <Service.hpp> 00024 #include <transports/mqueue/MQLib.hpp> 00025 #include <transports/mqueue/MQChannelElement.hpp> 00026 #include <transports/mqueue/MQTemplateProtocol.hpp> 00027 #include <os/fosi.h> 00028 00029 using namespace std; 00030 using namespace RTT; 00031 using namespace RTT::detail; 00032 00033 #include <InputPort.hpp> 00034 #include <OutputPort.hpp> 00035 #include <TaskContext.hpp> 00036 #include <string> 00037 00038 using namespace RTT; 00039 using namespace RTT::detail; 00040 00041 class MQueueTest 00042 { 00043 public: 00044 MQueueTest() 00045 { 00046 // connect DataPorts 00047 mr1 = new InputPort<double>("mr"); 00048 mw1 = new OutputPort<double>("mw"); 00049 00050 mr2 = new InputPort<double>("mr"); 00051 mw2 = new OutputPort<double>("mw"); 00052 00053 // both tc's are non periodic 00054 tc = new TaskContext( "root" ); 00055 tc->ports()->addEventPort( *mr1 ); 00056 tc->ports()->addPort( *mw1 ); 00057 00058 t2 = new TaskContext("other"); 00059 t2->ports()->addEventPort( *mr2, boost::bind(&MQueueTest::new_data_listener, this, _1) ); 00060 t2->ports()->addPort( *mw2 ); 00061 00062 tc->start(); 00063 t2->start(); 00064 } 00065 00066 ~MQueueTest() 00067 { 00068 delete tc; 00069 delete t2; 00070 00071 delete mr1; 00072 delete mw1; 00073 delete mr2; 00074 delete mw2; 00075 } 00076 00077 TaskContext* tc; 00078 TaskContext* t2; 00079 00080 PortInterface* signalled_port; 00081 void new_data_listener(PortInterface* port) 00082 { 00083 signalled_port = port; 00084 } 00085 00086 // Ports 00087 InputPort<double>* mr1; 00088 OutputPort<double>* mw1; 00089 InputPort<double>* mr2; 00090 OutputPort<double>* mw2; 00091 00092 ConnPolicy policy; 00093 00094 // helper test functions 00095 void testPortDataConnection(); 00096 void testPortBufferConnection(); 00097 void testPortDisconnected(); 00098 }; 00099 00100 class MQueueFixture : public MQueueTest 00101 { 00102 public: 00103 MQueueFixture() { 00104 // Create a default policy specification 00105 policy.type = ConnPolicy::DATA; 00106 policy.init = false; 00107 policy.lock_policy = ConnPolicy::LOCK_FREE; 00108 policy.size = 0; 00109 policy.pull = true; 00110 policy.transport = ORO_MQUEUE_PROTOCOL_ID; 00111 } 00112 }; 00113 00114 #define ASSERT_PORT_SIGNALLING(code, read_port) \ 00115 signalled_port = 0; \ 00116 code; \ 00117 rtos_disable_rt_warning(); \ 00118 usleep(100000); \ 00119 rtos_enable_rt_warning(); \ 00120 BOOST_CHECK( read_port == signalled_port ); 00121 00122 void MQueueTest::testPortDataConnection() 00123 { 00124 rtos_enable_rt_warning(); 00125 // This test assumes that there is a data connection mw1 => mr2 00126 // Check if connection succeeded both ways: 00127 BOOST_CHECK( mw1->connected() ); 00128 BOOST_CHECK( mr2->connected() ); 00129 00130 double value = 0; 00131 00132 // Check if no-data works 00133 BOOST_CHECK( NoData == mr2->read(value) ); 00134 00135 // Check if writing works (including signalling) 00136 ASSERT_PORT_SIGNALLING(mw1->write(1.0), mr2) 00137 BOOST_CHECK( mr2->read(value) ); 00138 BOOST_CHECK_EQUAL( 1.0, value ); 00139 ASSERT_PORT_SIGNALLING(mw1->write(2.0), mr2); 00140 BOOST_CHECK( mr2->read(value) ); 00141 BOOST_CHECK_EQUAL( 2.0, value ); 00142 BOOST_CHECK( OldData == mr2->read(value) ); 00143 00144 rtos_disable_rt_warning(); 00145 } 00146 00147 void MQueueTest::testPortBufferConnection() 00148 { 00149 rtos_enable_rt_warning(); 00150 // This test assumes that there is a buffer connection mw1 => mr2 of size 3 00151 // Check if connection succeeded both ways: 00152 BOOST_CHECK( mw1->connected() ); 00153 BOOST_CHECK( mr2->connected() ); 00154 00155 double value = 0; 00156 00157 // Check if no-data works 00158 BOOST_CHECK( NoData == mr2->read(value) ); 00159 00160 // Check if writing works 00161 ASSERT_PORT_SIGNALLING(mw1->write(1.0), mr2); 00162 ASSERT_PORT_SIGNALLING(mw1->write(2.0), mr2); 00163 ASSERT_PORT_SIGNALLING(mw1->write(3.0), mr2); 00164 ASSERT_PORT_SIGNALLING(mw1->write(4.0), 0); // because size == 3 00165 BOOST_CHECK( mr2->read(value) ); 00166 BOOST_CHECK_EQUAL( 1.0, value ); 00167 BOOST_CHECK( mr2->read(value) ); 00168 BOOST_CHECK_EQUAL( 2.0, value ); 00169 BOOST_CHECK( mr2->read(value) ); 00170 BOOST_CHECK_EQUAL( 3.0, value ); 00171 BOOST_CHECK( OldData == mr2->read(value) ); 00172 00173 rtos_disable_rt_warning(); 00174 } 00175 00176 void MQueueTest::testPortDisconnected() 00177 { 00178 BOOST_CHECK( !mw1->connected() ); 00179 BOOST_CHECK( !mr2->connected() ); 00180 } 00181 00182 00183 // Registers the fixture into the 'registry' 00184 BOOST_FIXTURE_TEST_SUITE( MQueueTestSuite, MQueueFixture ) 00185 00186 00190 BOOST_AUTO_TEST_CASE( testPortConnections ) 00191 { 00192 #if 1 00193 // WARNING: in the following, there is four configuration tested. 00194 // We need to manually disconnect both sides since mqueue are connection-less. 00195 policy.type = ConnPolicy::DATA; 00196 policy.pull = true; 00197 // test user supplied connection. 00198 policy.name_id = "/data1"; 00199 BOOST_REQUIRE( mw1->createConnection(*mr2, policy) ); 00200 BOOST_CHECK( policy.name_id == "/data1" ); 00201 testPortDataConnection(); 00202 mw1->disconnect(); 00203 mr2->disconnect(); 00204 testPortDisconnected(); 00205 00206 policy.type = ConnPolicy::DATA; 00207 policy.pull = true; 00208 policy.name_id = ""; 00209 BOOST_REQUIRE( mw1->createConnection(*mr2, policy) ); 00210 testPortDataConnection(); 00211 mw1->disconnect(); 00212 mr2->disconnect(); 00213 testPortDisconnected(); 00214 #endif 00215 #if 1 00216 policy.type = ConnPolicy::BUFFER; 00217 policy.pull = false; 00218 policy.size = 3; 00219 policy.name_id = ""; 00220 //policy.name_id = "buffer1"; 00221 BOOST_REQUIRE( mw1->createConnection(*mr2, policy) ); 00222 testPortBufferConnection(); 00223 mw1->disconnect(); 00224 mr2->disconnect(); 00225 testPortDisconnected(); 00226 #endif 00227 #if 1 00228 policy.type = ConnPolicy::BUFFER; 00229 policy.pull = true; 00230 policy.size = 3; 00231 policy.name_id = ""; 00232 //policy.name_id = "buffer2"; 00233 BOOST_REQUIRE( mw1->createConnection(*mr2, policy) ); 00234 testPortBufferConnection(); 00235 //while(1) sleep(1); 00236 mw1->disconnect(); 00237 mr2->disconnect(); 00238 testPortDisconnected(); 00239 #endif 00240 } 00241 00242 BOOST_AUTO_TEST_CASE( testPortStreams ) 00243 { 00244 // Test all four configurations of Data/Buffer & push/pull 00245 policy.type = ConnPolicy::DATA; 00246 policy.pull = false; 00247 policy.name_id = "/data1"; 00248 BOOST_REQUIRE( mw1->createStream( policy ) ); 00249 BOOST_REQUIRE( mr2->createStream( policy ) ); 00250 testPortDataConnection(); 00251 mw1->disconnect(); 00252 mr2->disconnect(); 00253 testPortDisconnected(); 00254 00255 policy.type = ConnPolicy::DATA; 00256 policy.pull = true; 00257 policy.name_id = ""; 00258 BOOST_REQUIRE( mw1->createStream( policy ) ); 00259 BOOST_REQUIRE( mr2->createStream( policy ) ); 00260 testPortDataConnection(); 00261 mw1->disconnect(); 00262 mr2->disconnect(); 00263 testPortDisconnected(); 00264 00265 policy.type = ConnPolicy::BUFFER; 00266 policy.pull = false; 00267 policy.size = 3; 00268 policy.name_id = "/buffer1"; 00269 BOOST_REQUIRE( mw1->createStream( policy ) ); 00270 BOOST_REQUIRE( mr2->createStream( policy ) ); 00271 testPortBufferConnection(); 00272 mw1->disconnect(); 00273 mr2->disconnect(); 00274 testPortDisconnected(); 00275 00276 policy.type = ConnPolicy::BUFFER; 00277 policy.pull = true; 00278 policy.size = 3; 00279 policy.name_id = ""; 00280 BOOST_REQUIRE( mw1->createStream( policy ) ); 00281 BOOST_REQUIRE( mr2->createStream( policy ) ); 00282 testPortBufferConnection(); 00283 mw1->disconnect(); 00284 mr2->disconnect(); 00285 testPortDisconnected(); 00286 } 00287 00288 BOOST_AUTO_TEST_CASE( testPortStreamsTimeout ) 00289 { 00290 // Test creating an input stream without an output stream available. 00291 policy.type = ConnPolicy::DATA; 00292 policy.pull = false; 00293 policy.name_id = "/data1"; 00294 BOOST_REQUIRE( mr2->createStream( policy ) == false ); 00295 BOOST_CHECK( mr2->connected() == false ); 00296 mr2->disconnect(); 00297 00298 policy.type = ConnPolicy::BUFFER; 00299 policy.pull = false; 00300 policy.size = 10; 00301 policy.name_id = "/buffer1"; 00302 BOOST_REQUIRE( mr2->createStream( policy ) == false ); 00303 BOOST_CHECK( mr2->connected() == false ); 00304 mr2->disconnect(); 00305 } 00306 00307 00308 BOOST_AUTO_TEST_CASE( testPortStreamsWrongName ) 00309 { 00310 // Test creating an input/output stream with a wrong name 00311 policy.type = ConnPolicy::DATA; 00312 policy.pull = false; 00313 policy.name_id = "data1"; // name must start with '/' 00314 BOOST_REQUIRE( mr2->createStream( policy ) == false ); 00315 BOOST_CHECK( mr2->connected() == false ); 00316 mr2->disconnect(); 00317 00318 policy.type = ConnPolicy::BUFFER; 00319 policy.pull = false; 00320 policy.size = 10; 00321 policy.name_id = "buffer1"; 00322 BOOST_REQUIRE( mw2->createStream( policy ) == false ); 00323 BOOST_CHECK( mw2->connected() == false ); 00324 mw2->disconnect(); 00325 } 00326 00327 // copied from testPortStreams 00328 BOOST_AUTO_TEST_CASE( testVectorTransport ) 00329 { 00330 DataFlowInterface* ports = tc->ports(); 00331 DataFlowInterface* ports2 = t2->ports(); 00332 00333 std::vector<double> data(20, 3.33); 00334 InputPort< std::vector<double> > vin("VIn"); 00335 OutputPort< std::vector<double> > vout("Vout"); 00336 ports->addPort(vin).doc("input port"); 00337 ports2->addPort(vout).doc("output port"); 00338 00339 // init the output port with a vector of size 20, values 3.33 00340 vout.setDataSample( data ); 00341 data = vout.getLastWrittenValue(); 00342 for(int i=0; i != 20; ++i) 00343 BOOST_CHECK_CLOSE( data[i], 3.33, 0.01); 00344 00345 policy.type = ConnPolicy::DATA; 00346 policy.pull = false; 00347 policy.name_id = "/vdata1"; 00348 BOOST_REQUIRE( vout.createStream( policy ) ); 00349 BOOST_REQUIRE( vin.createStream( policy ) ); 00350 00351 // check that the receiver did not get any data 00352 BOOST_CHECK_EQUAL( vin.read(data), NoData); 00353 00354 // prepare a new data sample, size 10, values 6.66 00355 data.clear(); 00356 data.resize(10, 6.66); 00357 for(unsigned int i=0; i != data.size(); ++i) 00358 BOOST_CHECK_CLOSE( data[i], 6.66, 0.01); 00359 00360 rtos_enable_rt_warning(); 00361 vout.write( data ); 00362 rtos_disable_rt_warning(); 00363 00364 // prepare data buffer for reception: 00365 data.clear(); 00366 data.resize(20, 0.0); 00367 usleep(200000); 00368 00369 rtos_enable_rt_warning(); 00370 BOOST_CHECK_EQUAL( vin.read(data), NewData); 00371 rtos_disable_rt_warning(); 00372 00373 // check if both size and capacity and values are as expected. 00374 BOOST_CHECK_EQUAL( data.size(), 10); 00375 BOOST_CHECK_EQUAL( data.capacity(), 20); 00376 for(unsigned int i=0; i != data.size(); ++i) 00377 BOOST_CHECK_CLOSE( data[i], 6.66, 0.01); 00378 00379 rtos_enable_rt_warning(); 00380 BOOST_CHECK_EQUAL( vin.read(data), OldData); 00381 rtos_disable_rt_warning(); 00382 } 00383 00384 BOOST_AUTO_TEST_SUITE_END() 00385