00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #include "unit.hpp"
00020
00021 #include <iostream>
00022
00023 #include <Service.hpp>
00024 #include <transports/mqueue/MQLib.hpp>
00025 #include <transports/mqueue/MQChannelElement.hpp>
00026 #include <transports/mqueue/MQTemplateProtocol.hpp>
00027 #include <os/fosi.h>
00028
00029 using namespace std;
00030 using namespace RTT;
00031 using namespace RTT::detail;
00032
00033 #include <InputPort.hpp>
00034 #include <OutputPort.hpp>
00035 #include <TaskContext.hpp>
00036 #include <string>
00037
00038 using namespace RTT;
00039 using namespace RTT::detail;
00040
00041 class MQueueTest
00042 {
00043 public:
00044 MQueueTest()
00045 {
00046
00047 mr1 = new InputPort<double>("mr");
00048 mw1 = new OutputPort<double>("mw");
00049
00050 mr2 = new InputPort<double>("mr");
00051 mw2 = new OutputPort<double>("mw");
00052
00053
00054 tc = new TaskContext( "root" );
00055 tc->ports()->addEventPort( *mr1 );
00056 tc->ports()->addPort( *mw1 );
00057
00058 t2 = new TaskContext("other");
00059 t2->ports()->addEventPort( *mr2 );
00060 t2->ports()->addPort( *mw2 );
00061
00062 tc->start();
00063 t2->start();
00064 }
00065
00066 ~MQueueTest()
00067 {
00068 delete tc;
00069 delete t2;
00070
00071 delete mr1;
00072 delete mw1;
00073 delete mr2;
00074 delete mw2;
00075 }
00076
00077 TaskContext* tc;
00078 TaskContext* t2;
00079
00080 PortInterface* signalled_port;
00081 void new_data_listener(PortInterface* port)
00082 {
00083 signalled_port = port;
00084 }
00085
00086
00087 InputPort<double>* mr1;
00088 OutputPort<double>* mw1;
00089 InputPort<double>* mr2;
00090 OutputPort<double>* mw2;
00091
00092 Handle hl;
00093 ConnPolicy policy;
00094
00095
00096 void testPortDataConnection();
00097 void testPortBufferConnection();
00098 void testPortDisconnected();
00099 };
00100
00101 class MQueueFixture : public MQueueTest
00102 {
00103 public:
00104 MQueueFixture() {
00105
00106 policy.type = ConnPolicy::DATA;
00107 policy.init = false;
00108 policy.lock_policy = ConnPolicy::LOCK_FREE;
00109 policy.size = 0;
00110 policy.pull = true;
00111 policy.transport = ORO_MQUEUE_PROTOCOL_ID;
00112
00113
00114 hl = mr2->getNewDataOnPortEvent()->setup(
00115 boost::bind(&MQueueTest::new_data_listener, this, _1) );
00116
00117 }
00118 };
00119
00120 #define ASSERT_PORT_SIGNALLING(code, read_port) \
00121 signalled_port = 0; \
00122 code; \
00123 rtos_disable_rt_warning(); \
00124 usleep(100000); \
00125 rtos_enable_rt_warning(); \
00126 BOOST_CHECK( hl.connected() ); \
00127 BOOST_CHECK( read_port == signalled_port );
00128
00129 void MQueueTest::testPortDataConnection()
00130 {
00131 rtos_enable_rt_warning();
00132
00133
00134 BOOST_CHECK( mw1->connected() );
00135 BOOST_CHECK( mr2->connected() );
00136
00137 double value = 0;
00138
00139
00140 BOOST_CHECK( NoData == mr2->read(value) );
00141
00142
00143 ASSERT_PORT_SIGNALLING(mw1->write(1.0), mr2)
00144 BOOST_CHECK( mr2->read(value) );
00145 BOOST_CHECK_EQUAL( 1.0, value );
00146 ASSERT_PORT_SIGNALLING(mw1->write(2.0), mr2);
00147 BOOST_CHECK( mr2->read(value) );
00148 BOOST_CHECK_EQUAL( 2.0, value );
00149 BOOST_CHECK( OldData == mr2->read(value) );
00150
00151 rtos_disable_rt_warning();
00152 }
00153
00154 void MQueueTest::testPortBufferConnection()
00155 {
00156 rtos_enable_rt_warning();
00157
00158
00159 BOOST_CHECK( mw1->connected() );
00160 BOOST_CHECK( mr2->connected() );
00161
00162 double value = 0;
00163
00164
00165 BOOST_CHECK( NoData == mr2->read(value) );
00166
00167
00168 ASSERT_PORT_SIGNALLING(mw1->write(1.0), mr2);
00169 ASSERT_PORT_SIGNALLING(mw1->write(2.0), mr2);
00170 ASSERT_PORT_SIGNALLING(mw1->write(3.0), mr2);
00171 ASSERT_PORT_SIGNALLING(mw1->write(4.0), 0);
00172 BOOST_CHECK( mr2->read(value) );
00173 BOOST_CHECK_EQUAL( 1.0, value );
00174 BOOST_CHECK( mr2->read(value) );
00175 BOOST_CHECK_EQUAL( 2.0, value );
00176 BOOST_CHECK( mr2->read(value) );
00177 BOOST_CHECK_EQUAL( 3.0, value );
00178 BOOST_CHECK( OldData == mr2->read(value) );
00179
00180 rtos_disable_rt_warning();
00181 }
00182
00183 void MQueueTest::testPortDisconnected()
00184 {
00185 BOOST_CHECK( !mw1->connected() );
00186 BOOST_CHECK( !mr2->connected() );
00187 }
00188
00189
00190
00191 BOOST_FIXTURE_TEST_SUITE( MQueueTestSuite, MQueueFixture )
00192
00193
00197 BOOST_AUTO_TEST_CASE( testPortConnections )
00198 {
00199 BOOST_CHECK( hl.connect() );
00200
00201 #if 1
00202
00203
00204 policy.type = ConnPolicy::DATA;
00205 policy.pull = true;
00206
00207 policy.name_id = "/data1";
00208 BOOST_REQUIRE( mw1->createConnection(*mr2, policy) );
00209 BOOST_CHECK( policy.name_id == "/data1" );
00210 testPortDataConnection();
00211 mw1->disconnect();
00212 mr2->disconnect();
00213 testPortDisconnected();
00214
00215 policy.type = ConnPolicy::DATA;
00216 policy.pull = true;
00217 policy.name_id = "";
00218 BOOST_REQUIRE( mw1->createConnection(*mr2, policy) );
00219 testPortDataConnection();
00220 mw1->disconnect();
00221 mr2->disconnect();
00222 testPortDisconnected();
00223 #endif
00224 #if 1
00225 policy.type = ConnPolicy::BUFFER;
00226 policy.pull = false;
00227 policy.size = 3;
00228 policy.name_id = "";
00229
00230 BOOST_REQUIRE( mw1->createConnection(*mr2, policy) );
00231 testPortBufferConnection();
00232 mw1->disconnect();
00233 mr2->disconnect();
00234 testPortDisconnected();
00235 #endif
00236 #if 1
00237 policy.type = ConnPolicy::BUFFER;
00238 policy.pull = true;
00239 policy.size = 3;
00240 policy.name_id = "";
00241
00242 BOOST_REQUIRE( mw1->createConnection(*mr2, policy) );
00243 testPortBufferConnection();
00244
00245 mw1->disconnect();
00246 mr2->disconnect();
00247 testPortDisconnected();
00248 #endif
00249 }
00250
00251 BOOST_AUTO_TEST_CASE( testPortStreams )
00252 {
00253 BOOST_CHECK( hl.connect() );
00254
00255
00256 policy.type = ConnPolicy::DATA;
00257 policy.pull = false;
00258 policy.name_id = "/data1";
00259 BOOST_REQUIRE( mw1->createStream( policy ) );
00260 BOOST_REQUIRE( mr2->createStream( policy ) );
00261 testPortDataConnection();
00262 mw1->disconnect();
00263 mr2->disconnect();
00264 testPortDisconnected();
00265
00266 policy.type = ConnPolicy::DATA;
00267 policy.pull = true;
00268 policy.name_id = "";
00269 BOOST_REQUIRE( mw1->createStream( policy ) );
00270 BOOST_REQUIRE( mr2->createStream( policy ) );
00271 testPortDataConnection();
00272 mw1->disconnect();
00273 mr2->disconnect();
00274 testPortDisconnected();
00275
00276 policy.type = ConnPolicy::BUFFER;
00277 policy.pull = false;
00278 policy.size = 3;
00279 policy.name_id = "/buffer1";
00280 BOOST_REQUIRE( mw1->createStream( policy ) );
00281 BOOST_REQUIRE( mr2->createStream( policy ) );
00282 testPortBufferConnection();
00283 mw1->disconnect();
00284 mr2->disconnect();
00285 testPortDisconnected();
00286
00287 policy.type = ConnPolicy::BUFFER;
00288 policy.pull = true;
00289 policy.size = 3;
00290 policy.name_id = "";
00291 BOOST_REQUIRE( mw1->createStream( policy ) );
00292 BOOST_REQUIRE( mr2->createStream( policy ) );
00293 testPortBufferConnection();
00294 mw1->disconnect();
00295 mr2->disconnect();
00296 testPortDisconnected();
00297 }
00298
00299 BOOST_AUTO_TEST_CASE( testPortStreamsTimeout )
00300 {
00301
00302 policy.type = ConnPolicy::DATA;
00303 policy.pull = false;
00304 policy.name_id = "/data1";
00305 BOOST_REQUIRE( mr2->createStream( policy ) == false );
00306 BOOST_CHECK( mr2->connected() == false );
00307 mr2->disconnect();
00308
00309 policy.type = ConnPolicy::BUFFER;
00310 policy.pull = false;
00311 policy.size = 10;
00312 policy.name_id = "/buffer1";
00313 BOOST_REQUIRE( mr2->createStream( policy ) == false );
00314 BOOST_CHECK( mr2->connected() == false );
00315 mr2->disconnect();
00316 }
00317
00318
00319 BOOST_AUTO_TEST_CASE( testPortStreamsWrongName )
00320 {
00321
00322 policy.type = ConnPolicy::DATA;
00323 policy.pull = false;
00324 policy.name_id = "data1";
00325 BOOST_REQUIRE( mr2->createStream( policy ) == false );
00326 BOOST_CHECK( mr2->connected() == false );
00327 mr2->disconnect();
00328
00329 policy.type = ConnPolicy::BUFFER;
00330 policy.pull = false;
00331 policy.size = 10;
00332 policy.name_id = "buffer1";
00333 BOOST_REQUIRE( mw2->createStream( policy ) == false );
00334 BOOST_CHECK( mw2->connected() == false );
00335 mw2->disconnect();
00336 }
00337
00338
00339 BOOST_AUTO_TEST_CASE( testVectorTransport )
00340 {
00341 BOOST_CHECK( hl.connect() );
00342
00343 DataFlowInterface* ports = tc->ports();
00344 DataFlowInterface* ports2 = t2->ports();
00345
00346 std::vector<double> data(20, 3.33);
00347 InputPort< std::vector<double> > vin("VIn");
00348 OutputPort< std::vector<double> > vout("Vout");
00349 ports->addPort(vin).doc("input port");
00350 ports2->addPort(vout).doc("output port");
00351
00352
00353 vout.setDataSample( data );
00354 data = vout.getLastWrittenValue();
00355 for(int i=0; i != 20; ++i)
00356 BOOST_CHECK_CLOSE( data[i], 3.33, 0.01);
00357
00358 policy.type = ConnPolicy::DATA;
00359 policy.pull = false;
00360 policy.name_id = "/vdata1";
00361 BOOST_REQUIRE( vout.createStream( policy ) );
00362 BOOST_REQUIRE( vin.createStream( policy ) );
00363
00364
00365 BOOST_CHECK_EQUAL( vin.read(data), NoData);
00366
00367
00368 data.clear();
00369 data.resize(10, 6.66);
00370 for(unsigned int i=0; i != data.size(); ++i)
00371 BOOST_CHECK_CLOSE( data[i], 6.66, 0.01);
00372
00373 rtos_enable_rt_warning();
00374 vout.write( data );
00375 rtos_disable_rt_warning();
00376
00377
00378 data.clear();
00379 data.resize(20, 0.0);
00380 usleep(200000);
00381
00382 rtos_enable_rt_warning();
00383 BOOST_CHECK_EQUAL( vin.read(data), NewData);
00384 rtos_disable_rt_warning();
00385
00386
00387 BOOST_CHECK_EQUAL( data.size(), 10);
00388 BOOST_CHECK_EQUAL( data.capacity(), 20);
00389 for(unsigned int i=0; i != data.size(); ++i)
00390 BOOST_CHECK_CLOSE( data[i], 6.66, 0.01);
00391
00392 rtos_enable_rt_warning();
00393 BOOST_CHECK_EQUAL( vin.read(data), OldData);
00394 rtos_disable_rt_warning();
00395 }
00396
00397 BOOST_AUTO_TEST_SUITE_END()
00398