$search
00001 /*************************************************************************** 00002 tag: The SourceWorks Tue Sep 7 00:54:57 CEST 2010 corba_mqueue_ipc_test.cpp 00003 00004 corba_mqueue_ipc_test.cpp - description 00005 ------------------- 00006 begin : Tue September 07 2010 00007 copyright : (C) 2010 The SourceWorks 00008 email : peter@thesourceworks.com 00009 00010 *************************************************************************** 00011 * * 00012 * This program is free software; you can redistribute it and/or modify * 00013 * it under the terms of the GNU General Public License as published by * 00014 * the Free Software Foundation; either version 2 of the License, or * 00015 * (at your option) any later version. * 00016 * * 00017 ***************************************************************************/ 00018 00019 #include "unit.hpp" 00020 00021 #include "corba_mqueue_test.hpp" 00022 00023 #include <iostream> 00024 00025 #include <transports/corba/DataFlowI.h> 00026 #include <rtt/transports/corba/RemotePorts.hpp> 00027 #include <rtt/transports/mqueue/MQLib.hpp> 00028 #include <rtt/transports/corba/CorbaConnPolicy.hpp> 00029 #include <transports/corba/corba.h> 00030 #include <rtt/InputPort.hpp> 00031 #include <rtt/OutputPort.hpp> 00032 #include <rtt/TaskContext.hpp> 00033 #include <transports/corba/TaskContextServer.hpp> 00034 #include <transports/corba/TaskContextProxy.hpp> 00035 #include <string> 00036 #include <cstdlib> 00037 00038 using namespace RTT; 00039 using namespace RTT::detail; 00040 00041 class CorbaMQueueIPCTest 00042 { 00043 public: 00044 CorbaMQueueIPCTest() { this->setUp(); } 00045 ~CorbaMQueueIPCTest() { this->tearDown(); } 00046 00047 TaskContext* tc; 00048 corba::TaskContextProxy* tp, *tp2; 00049 corba::TaskContextServer* ts, *ts2; 00050 00051 base::PortInterface* signalled_port; 00052 void new_data_listener(base::PortInterface* port); 00053 00054 // Ports 00055 InputPort<double>* mr1; 00056 OutputPort<double>* mw1; 00057 00058 void setUp(); 00059 void tearDown(); 00060 00061 // helper test functions 00062 void testPortDataConnection(); 00063 void testPortBufferConnection(); 00064 void testPortDisconnected(); 00065 }; 00066 00067 using namespace std; 00068 using corba::TaskContextProxy; 00069 00070 void 00071 CorbaMQueueIPCTest::setUp() 00072 { 00073 // setup DataPorts: we write into mw1, the server roundtrips it to mr1 00074 mr1 = new InputPort<double>("mr"); 00075 mw1 = new OutputPort<double>("mw"); 00076 00077 tc = new TaskContext( "root" ); 00078 tc->ports()->addEventPort( *mr1, boost::bind(&CorbaMQueueIPCTest::new_data_listener, this, _1) ); 00079 tc->ports()->addPort( *mw1 ); 00080 tc->start(); 00081 00082 ts2 = ts = 0; 00083 tp2 = tp = 0; 00084 } 00085 00086 00087 void 00088 CorbaMQueueIPCTest::tearDown() 00089 { 00090 delete tp; 00091 delete ts; 00092 delete tp2; 00093 delete tc; 00094 00095 delete mr1; 00096 delete mw1; 00097 } 00098 00099 void CorbaMQueueIPCTest::new_data_listener(base::PortInterface* port) 00100 { 00101 signalled_port = port; 00102 } 00103 00104 00105 #define ASSERT_PORT_SIGNALLING(code, read_port) \ 00106 signalled_port = 0; \ 00107 code; \ 00108 usleep(100000); \ 00109 BOOST_CHECK( read_port == signalled_port ); 00110 00111 bool wait_for_helper; 00112 #define wait_for( cond, times ) \ 00113 wait = 0; \ 00114 while( (wait_for_helper = !(cond)) && wait++ != times ) \ 00115 usleep(100000); \ 00116 if (wait_for_helper) BOOST_CHECK( cond ); 00117 00118 #define wait_for_equal( a, b, times ) \ 00119 wait = 0; \ 00120 while( (wait_for_helper = ((a) != (b))) && wait++ != times ) \ 00121 usleep(100000); \ 00122 if (wait_for_helper) BOOST_CHECK_EQUAL( a, b ); 00123 00124 void CorbaMQueueIPCTest::testPortDataConnection() 00125 { 00126 // This test assumes that there is a data connection mw1 => server => mr1 00127 // Check if connection succeeded both ways: 00128 BOOST_CHECK( mw1->connected() ); 00129 BOOST_CHECK( mr1->connected() ); 00130 00131 double value = 0; 00132 00133 // Check if no-data works 00134 BOOST_CHECK( !mr1->read(value) ); 00135 00136 // Check if writing works (including signalling) 00137 ASSERT_PORT_SIGNALLING(mw1->write(1.0), mr1) 00138 BOOST_CHECK( mr1->read(value) ); 00139 BOOST_CHECK_EQUAL( 1.0, value ); 00140 ASSERT_PORT_SIGNALLING(mw1->write(2.0), mr1); 00141 BOOST_CHECK( mr1->read(value) ); 00142 BOOST_CHECK_EQUAL( 2.0, value ); 00143 } 00144 00145 void CorbaMQueueIPCTest::testPortBufferConnection() 00146 { 00147 // This test assumes that there is a buffer connection mw1 => server => mr1 of size 3 00148 // Check if connection succeeded both ways: 00149 BOOST_CHECK( mw1->connected() ); 00150 BOOST_CHECK( mr1->connected() ); 00151 00152 double value = 0; 00153 00154 // Check if no-data works 00155 BOOST_CHECK( !mr1->read(value) ); 00156 00157 // Check if writing works 00158 ASSERT_PORT_SIGNALLING(mw1->write(1.0), mr1); 00159 ASSERT_PORT_SIGNALLING(mw1->write(2.0), mr1); 00160 ASSERT_PORT_SIGNALLING(mw1->write(3.0), mr1); 00161 // There are two connections between mw1 and mr1. We first flush one of the 00162 // two, and then the second one (normal multi-writer single-reader 00163 // behaviour) 00164 BOOST_CHECK( mr1->read(value) ); 00165 BOOST_CHECK_EQUAL( 1.0, value ); 00166 BOOST_CHECK( mr1->read(value) ); 00167 BOOST_CHECK_EQUAL( 2.0, value ); 00168 BOOST_CHECK( mr1->read(value) ); 00169 BOOST_CHECK_EQUAL( 3.0, value ); 00170 BOOST_CHECK( mr1->read(value) ); 00171 BOOST_CHECK_EQUAL( 1.0, value ); 00172 BOOST_CHECK( mr1->read(value) ); 00173 BOOST_CHECK_EQUAL( 2.0, value ); 00174 BOOST_CHECK( mr1->read(value) ); 00175 BOOST_CHECK_EQUAL( 3.0, value ); 00176 BOOST_CHECK_EQUAL( mr1->read(value), OldData ); 00177 } 00178 00179 void CorbaMQueueIPCTest::testPortDisconnected() 00180 { 00181 BOOST_CHECK( !mw1->connected() ); 00182 BOOST_CHECK( !mr1->connected() ); 00183 } 00184 00185 00186 // Registers the fixture into the 'registry' 00187 BOOST_FIXTURE_TEST_SUITE( CorbaMQueueIPCTestSuite, CorbaMQueueIPCTest ) 00188 00189 BOOST_AUTO_TEST_CASE( testPortConnections ) 00190 { 00191 // This test tests the different port-to-port connections. 00192 ts = corba::TaskContextServer::Create( tc, false ); //no-naming 00193 //tp = corba::TaskContextProxy::Create("other"); 00194 //if (!tp ) 00195 tp = corba::TaskContextProxy::CreateFromFile( "root.ior"); 00196 BOOST_CHECK( tp ); 00197 00198 // Create a default CORBA policy specification 00199 RTT::corba::CConnPolicy policy = toCORBA( RTT::ConnPolicy() ); 00200 policy.type = RTT::corba::CData; 00201 policy.init = false; 00202 policy.lock_policy = RTT::corba::CLockFree; 00203 policy.size = 0; 00204 policy.data_size = 0; 00205 00206 corba::CDataFlowInterface_var ports = ts->server()->ports(); 00207 corba::CDataFlowInterface_var ports2 = tp->server()->ports(); 00208 00209 // WARNING: in the following, there is four configuration tested. There is 00210 // also three different ways to disconnect. We need to test those three 00211 // "disconnection methods", so beware when you change something ... 00212 00213 policy.type = RTT::corba::CData; 00214 policy.pull = false; 00215 policy.transport = ORO_MQUEUE_PROTOCOL_ID; 00216 BOOST_CHECK( ports->createConnection("mw", ports2, "mr", policy) ); 00217 BOOST_CHECK( ports2->createConnection("mw", ports, "mr", policy) ); 00218 usleep(100000); // gives dispatcher time to catch up. 00219 testPortDataConnection(); 00220 ports->disconnectPort("mw"); 00221 ports2->disconnectPort("mw"); 00222 testPortDisconnected(); 00223 00224 policy.type = RTT::corba::CData; 00225 policy.pull = true; 00226 policy.transport = ORO_MQUEUE_PROTOCOL_ID; 00227 BOOST_CHECK( ports->createConnection("mw", ports2, "mr", policy) ); 00228 BOOST_CHECK( ports2->createConnection("mw", ports, "mr", policy) ); 00229 usleep(100000); 00230 testPortDataConnection(); 00231 ports2->disconnectPort("mr"); 00232 ports->disconnectPort("mr"); 00233 testPortDisconnected(); 00234 00235 #if 1 00236 00237 policy.type = RTT::corba::CBuffer; 00238 policy.pull = false; 00239 policy.size = 3; 00240 policy.transport = ORO_MQUEUE_PROTOCOL_ID; 00241 BOOST_CHECK( ports->createConnection("mw", ports2, "mr", policy) ); 00242 BOOST_CHECK( ports2->createConnection("mw", ports, "mr", policy) ); 00243 testPortBufferConnection(); 00244 ports->disconnectPort("mw"); 00245 ports2->disconnectPort("mw"); 00246 testPortDisconnected(); 00247 00248 policy.type = RTT::corba::CBuffer; 00249 policy.pull = true; 00250 policy.size = 3; 00251 policy.transport = ORO_MQUEUE_PROTOCOL_ID; 00252 BOOST_CHECK( ports->createConnection("mw", ports2, "mr", policy) ); 00253 BOOST_CHECK( ports2->createConnection("mw", ports, "mr", policy) ); 00254 testPortBufferConnection(); 00255 ports->disconnectPort("mw"); 00256 ports2->disconnectPort("mw"); 00257 testPortDisconnected(); 00258 #endif 00259 } 00260 00261 BOOST_AUTO_TEST_SUITE_END() 00262