$search
00001 /*************************************************************************** 00002 tag: The SourceWorks Tue Sep 7 00:54:57 CEST 2010 mqueue_ipc_server.cpp 00003 00004 mqueue_ipc_server.cpp - description 00005 ------------------- 00006 begin : Tue September 07 2010 00007 copyright : (C) 2010 The SourceWorks 00008 email : peter@thesourceworks.com 00009 00010 *************************************************************************** 00011 * * 00012 * This program is free software; you can redistribute it and/or modify * 00013 * it under the terms of the GNU General Public License as published by * 00014 * the Free Software Foundation; either version 2 of the License, or * 00015 * (at your option) any later version. * 00016 * * 00017 ***************************************************************************/ 00018 00019 #include "unit.hpp" 00020 00021 #include "mqueue_test.hpp" 00022 00023 #include <iostream> 00024 00025 #include <Service.hpp> 00026 #include <transports/mqueue/MQLib.hpp> 00027 #include <transports/mqueue/MQChannelElement.hpp> 00028 #include <transports/mqueue/MQTemplateProtocol.hpp> 00029 #include <os/fosi.h> 00030 00031 using namespace std; 00032 using namespace RTT; 00033 using namespace RTT::detail; 00034 00035 void 00036 MQueueTest::setUp() 00037 { 00038 // connect DataPorts 00039 mr1 = new InputPort<double>("mr"); 00040 mw1 = new OutputPort<double>("mw"); 00041 00042 mr2 = new InputPort<double>("mr"); 00043 mw2 = new OutputPort<double>("mw"); 00044 00045 tc = new TaskContext( "root" ); 00046 tc->ports()->addPort( mr1 ); 00047 tc->ports()->addPort( mw1 ); 00048 00049 t2 = new TaskContext("other"); 00050 t2->ports()->addPort( mr2 ); 00051 t2->ports()->addPort( mw2 ); 00052 00053 } 00054 00055 00056 void 00057 MQueueTest::tearDown() 00058 { 00059 delete tc; 00060 delete t2; 00061 00062 delete mr1; 00063 delete mw1; 00064 delete mr2; 00065 delete mw2; 00066 } 00067 00068 void MQueueTest::new_data_listener(PortInterface* port) 00069 { 00070 signalled_port = port; 00071 } 00072 00073 00074 #define ASSERT_PORT_SIGNALLING(code, read_port) \ 00075 signalled_port = 0; \ 00076 code; \ 00077 rtos_disable_rt_warning(); \ 00078 usleep(100000); \ 00079 rtos_enable_rt_warning(); \ 00080 BOOST_CHECK( read_port == signalled_port ); 00081 00082 void MQueueTest::testPortDataConnection() 00083 { 00084 rtos_enable_rt_warning(); 00085 // This test assumes that there is a data connection mw1 => mr2 00086 // Check if connection succeeded both ways: 00087 BOOST_CHECK( mw1->connected() ); 00088 BOOST_CHECK( mr2->connected() ); 00089 00090 double value = 0; 00091 00092 // Check if no-data works 00093 BOOST_CHECK( NoData == mr2->read(value) ); 00094 00095 // Check if writing works (including signalling) 00096 ASSERT_PORT_SIGNALLING(mw1->write(1.0), mr2) 00097 BOOST_CHECK( mr2->read(value) ); 00098 BOOST_CHECK_EQUAL( 1.0, value ); 00099 ASSERT_PORT_SIGNALLING(mw1->write(2.0), mr2); 00100 BOOST_CHECK( mr2->read(value) ); 00101 BOOST_CHECK_EQUAL( 2.0, value ); 00102 BOOST_CHECK( OldData == mr2->read(value) ); 00103 00104 rtos_disable_rt_warning(); 00105 } 00106 00107 void MQueueTest::testPortBufferConnection() 00108 { 00109 rtos_enable_rt_warning(); 00110 // This test assumes that there is a buffer connection mw1 => mr2 of size 3 00111 // Check if connection succeeded both ways: 00112 BOOST_CHECK( mw1->connected() ); 00113 BOOST_CHECK( mr2->connected() ); 00114 00115 double value = 0; 00116 00117 // Check if no-data works 00118 BOOST_CHECK( NoData == mr2->read(value) ); 00119 00120 // Check if writing works 00121 ASSERT_PORT_SIGNALLING(mw1->write(1.0), mr2); 00122 ASSERT_PORT_SIGNALLING(mw1->write(2.0), mr2); 00123 ASSERT_PORT_SIGNALLING(mw1->write(3.0), mr2); 00124 ASSERT_PORT_SIGNALLING(mw1->write(4.0), 0); // because size == 3 00125 BOOST_CHECK( mr2->read(value) ); 00126 BOOST_CHECK_EQUAL( 1.0, value ); 00127 BOOST_CHECK( mr2->read(value) ); 00128 BOOST_CHECK_EQUAL( 2.0, value ); 00129 BOOST_CHECK( mr2->read(value) ); 00130 BOOST_CHECK_EQUAL( 3.0, value ); 00131 BOOST_CHECK( OldData == mr2->read(value) ); 00132 00133 rtos_disable_rt_warning(); 00134 } 00135 00136 void MQueueTest::testPortDisconnected() 00137 { 00138 BOOST_CHECK( !mw1->connected() ); 00139 BOOST_CHECK( !mr2->connected() ); 00140 } 00141 00142 00143 // Registers the fixture into the 'registry' 00144 BOOST_FIXTURE_TEST_SUITE( MQueueTestSuite, MQueueTest ) 00145 00146 00150 BOOST_AUTO_TEST_CASE( testPortConnections ) 00151 { 00152 // Create a default policy specification 00153 ConnPolicy policy; 00154 policy.type = ConnPolicy::DATA; 00155 policy.init = false; 00156 policy.lock_policy = ConnPolicy::LOCK_FREE; 00157 policy.size = 0; 00158 policy.transport = ORO_MQUEUE_PROTOCOL_ID; 00159 00160 // Set up an event handler to check if signalling works properly as well 00161 Handle hl( mr2->getNewDataOnPortEvent()->setup( 00162 boost::bind(&MQueueTest::new_data_listener, this, _1) ) ); 00163 hl.connect(); 00164 00165 DataFlowInterface* ports = tc->ports(); 00166 DataFlowInterface* ports2 = t2->ports(); 00167 00168 #if 1 00169 // WARNING: in the following, there is four configuration tested. 00170 // We need to manually disconnect both sides since mqueue are connection-less. 00171 policy.type = ConnPolicy::DATA; 00172 policy.pull = false; 00173 // test user supplied connection. 00174 policy.name_id = "/data1"; 00175 BOOST_CHECK( mw1->createConnection(*mr2, policy) ); 00176 BOOST_CHECK( policy.name_id == "/data1" ); 00177 testPortDataConnection(); 00178 mw1->disconnect(); 00179 mr2->disconnect(); 00180 testPortDisconnected(); 00181 00182 policy.type = ConnPolicy::DATA; 00183 policy.pull = true; 00184 policy.name_id = ""; 00185 BOOST_CHECK( mw1->createConnection(*mr2, policy) ); 00186 testPortDataConnection(); 00187 mw1->disconnect(); 00188 mr2->disconnect(); 00189 testPortDisconnected(); 00190 #endif 00191 #if 1 00192 policy.type = ConnPolicy::BUFFER; 00193 policy.pull = false; 00194 policy.size = 3; 00195 policy.name_id = ""; 00196 //policy.name_id = "buffer1"; 00197 BOOST_CHECK( mw1->createConnection(*mr2, policy) ); 00198 testPortBufferConnection(); 00199 mw1->disconnect(); 00200 mr2->disconnect(); 00201 testPortDisconnected(); 00202 #endif 00203 #if 1 00204 policy.type = ConnPolicy::BUFFER; 00205 policy.pull = true; 00206 policy.size = 3; 00207 policy.name_id = ""; 00208 //policy.name_id = "buffer2"; 00209 BOOST_CHECK( mw1->createConnection(*mr2, policy) ); 00210 testPortBufferConnection(); 00211 //while(1) sleep(1); 00212 mw1->disconnect(); 00213 mr2->disconnect(); 00214 testPortDisconnected(); 00215 #endif 00216 } 00217 00218 BOOST_AUTO_TEST_CASE( testPortStreams ) 00219 { 00220 // Create a default policy specification 00221 ConnPolicy policy; 00222 policy.type = ConnPolicy::DATA; 00223 policy.init = false; 00224 policy.lock_policy = ConnPolicy::LOCK_FREE; 00225 policy.size = 0; 00226 policy.transport = ORO_MQUEUE_PROTOCOL_ID; 00227 00228 // Set up an event handler to check if signalling works properly as well 00229 Handle hl( mr2->getNewDataOnPortEvent()->setup( 00230 boost::bind(&MQueueTest::new_data_listener, this, _1) ) ); 00231 hl.connect(); 00232 00233 DataFlowInterface* ports = tc->ports(); 00234 DataFlowInterface* ports2 = t2->ports(); 00235 00236 00237 // Test all four configurations of Data/Buffer & push/pull 00238 policy.type = ConnPolicy::DATA; 00239 policy.pull = false; 00240 policy.name_id = "/data1"; 00241 BOOST_CHECK( mw1->createStream( policy ) ); 00242 BOOST_CHECK( mr2->createStream( policy ) ); 00243 testPortDataConnection(); 00244 mw1->disconnect(); 00245 mr2->disconnect(); 00246 testPortDisconnected(); 00247 00248 policy.type = ConnPolicy::DATA; 00249 policy.pull = true; 00250 policy.name_id = ""; 00251 BOOST_CHECK( mw1->createStream( policy ) ); 00252 BOOST_CHECK( mr2->createStream( policy ) ); 00253 testPortDataConnection(); 00254 mw1->disconnect(); 00255 mr2->disconnect(); 00256 testPortDisconnected(); 00257 00258 policy.type = ConnPolicy::BUFFER; 00259 policy.pull = false; 00260 policy.size = 3; 00261 policy.name_id = "/buffer1"; 00262 BOOST_CHECK( mw1->createStream( policy ) ); 00263 BOOST_CHECK( mr2->createStream( policy ) ); 00264 testPortBufferConnection(); 00265 mw1->disconnect(); 00266 mr2->disconnect(); 00267 testPortDisconnected(); 00268 00269 policy.type = ConnPolicy::BUFFER; 00270 policy.pull = true; 00271 policy.size = 3; 00272 policy.name_id = ""; 00273 BOOST_CHECK( mw1->createStream( policy ) ); 00274 BOOST_CHECK( mr2->createStream( policy ) ); 00275 testPortBufferConnection(); 00276 mw1->disconnect(); 00277 mr2->disconnect(); 00278 testPortDisconnected(); 00279 } 00280 00281 BOOST_AUTO_TEST_CASE( testPortStreamsTimeout ) 00282 { 00283 // Create a default policy specification 00284 ConnPolicy policy; 00285 policy.type = ConnPolicy::DATA; 00286 policy.init = false; 00287 policy.lock_policy = ConnPolicy::LOCK_FREE; 00288 policy.size = 0; 00289 policy.transport = ORO_MQUEUE_PROTOCOL_ID; 00290 00291 // Test creating an input stream without an output stream available. 00292 policy.type = ConnPolicy::DATA; 00293 policy.pull = false; 00294 policy.name_id = "/data1"; 00295 BOOST_CHECK( mr2->createStream( policy ) == false ); 00296 BOOST_CHECK( mr2->connected() == false ); 00297 mr2->disconnect(); 00298 00299 policy.type = ConnPolicy::BUFFER; 00300 policy.pull = false; 00301 policy.size = 10; 00302 policy.name_id = "/buffer1"; 00303 BOOST_CHECK( mr2->createStream( policy ) == false ); 00304 BOOST_CHECK( mr2->connected() == false ); 00305 mr2->disconnect(); 00306 } 00307 00308 00309 BOOST_AUTO_TEST_CASE( testPortStreamsWrongName ) 00310 { 00311 // Create a default policy specification 00312 ConnPolicy policy; 00313 policy.type = ConnPolicy::DATA; 00314 policy.init = false; 00315 policy.lock_policy = ConnPolicy::LOCK_FREE; 00316 policy.size = 0; 00317 policy.transport = ORO_MQUEUE_PROTOCOL_ID; 00318 00319 // Test creating an input stream without an output stream available. 00320 policy.type = ConnPolicy::DATA; 00321 policy.pull = false; 00322 policy.name_id = "data1"; // name must start with '/' 00323 BOOST_CHECK( mr2->createStream( policy ) == false ); 00324 BOOST_CHECK( mr2->connected() == false ); 00325 mr2->disconnect(); 00326 00327 policy.type = ConnPolicy::BUFFER; 00328 policy.pull = false; 00329 policy.size = 10; 00330 policy.name_id = "buffer1"; 00331 BOOST_CHECK( mr2->createStream( policy ) == false ); 00332 BOOST_CHECK( mr2->connected() == false ); 00333 mr2->disconnect(); 00334 } 00335 00336 // copied from testPortStreams 00337 BOOST_AUTO_TEST_CASE( testVectorTransport ) 00338 { 00339 // Create a default policy specification 00340 ConnPolicy policy; 00341 policy.type = ConnPolicy::DATA; 00342 policy.init = false; 00343 policy.lock_policy = ConnPolicy::LOCK_FREE; 00344 policy.size = 0; 00345 policy.transport = ORO_MQUEUE_PROTOCOL_ID; 00346 00347 // Set up an event handler to check if signalling works properly as well 00348 Handle hl( mr2->getNewDataOnPortEvent()->setup( 00349 boost::bind(&MQueueTest::new_data_listener, this, _1) ) ); 00350 hl.connect(); 00351 00352 DataFlowInterface* ports = tc->ports(); 00353 DataFlowInterface* ports2 = t2->ports(); 00354 00355 std::vector<double> data(20, 3.33); 00356 InputPort< std::vector<double> > vin("VIn"); 00357 OutputPort< std::vector<double> > vout("Vout"); 00358 ports->addPort(&vin, "input port"); 00359 ports2->addPort(&vout, "output port"); 00360 00361 // init the output port with a vector of size 20, values 3.33 00362 vout.setDataSample( data ); 00363 data = vout.getLastWrittenValue(); 00364 for(int i=0; i != 20; ++i) 00365 BOOST_CHECK_CLOSE( data[i], 3.33, 0.01); 00366 00367 policy.type = ConnPolicy::DATA; 00368 policy.pull = false; 00369 policy.name_id = "/vdata1"; 00370 BOOST_CHECK( vout.createStream( policy ) ); 00371 BOOST_CHECK( vin.createStream( policy ) ); 00372 00373 // check that the receiver did not get any data 00374 BOOST_CHECK_EQUAL( vin.read(data), NoData); 00375 00376 // prepare a new data sample, size 10, values 6.66 00377 data.clear(); 00378 data.resize(10, 6.66); 00379 for(int i=0; i != data.size(); ++i) 00380 BOOST_CHECK_CLOSE( data[i], 6.66, 0.01); 00381 00382 rtos_enable_rt_warning(); 00383 vout.write( data ); 00384 rtos_disable_rt_warning(); 00385 00386 // prepare data buffer for reception: 00387 data.clear(); 00388 data.resize(20, 0.0); 00389 usleep(200000); 00390 00391 rtos_enable_rt_warning(); 00392 BOOST_CHECK_EQUAL( vin.read(data), NewData); 00393 rtos_disable_rt_warning(); 00394 00395 // check if both size and capacity and values are as expected. 00396 BOOST_CHECK_EQUAL( data.size(), 10); 00397 BOOST_CHECK_EQUAL( data.capacity(), 20); 00398 for(int i=0; i != data.size(); ++i) 00399 BOOST_CHECK_CLOSE( data[i], 6.66, 0.01); 00400 00401 rtos_enable_rt_warning(); 00402 BOOST_CHECK_EQUAL( vin.read(data), OldData); 00403 rtos_disable_rt_warning(); 00404 } 00405 00406 BOOST_AUTO_TEST_SUITE_END() 00407