00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #include "unit.hpp"
00020
00021 #include "corba_mqueue_test.hpp"
00022
00023 #include <iostream>
00024
00025 #include <transports/corba/DataFlowI.h>
00026 #include <rtt/transports/corba/RemotePorts.hpp>
00027 #include <rtt/transports/mqueue/MQLib.hpp>
00028 #include <rtt/transports/corba/CorbaConnPolicy.hpp>
00029 #include <transports/corba/corba.h>
00030 #include <rtt/InputPort.hpp>
00031 #include <rtt/OutputPort.hpp>
00032 #include <rtt/TaskContext.hpp>
00033 #include <transports/corba/TaskContextServer.hpp>
00034 #include <transports/corba/TaskContextProxy.hpp>
00035 #include <string>
00036 #include <cstdlib>
00037
00038 using namespace RTT;
00039 using namespace RTT::detail;
00040
00041 class CorbaMQueueIPCTest
00042 {
00043 public:
00044 CorbaMQueueIPCTest() { this->setUp(); }
00045 ~CorbaMQueueIPCTest() { this->tearDown(); }
00046
00047 TaskContext* tc;
00048 corba::TaskContextProxy* tp, *tp2;
00049 corba::TaskContextServer* ts, *ts2;
00050
00051 base::PortInterface* signalled_port;
00052 void new_data_listener(base::PortInterface* port);
00053
00054
00055 InputPort<double>* mr1;
00056 OutputPort<double>* mw1;
00057
00058 void setupCorba();
00059 void cleanupCorba();
00060
00061 void setUp();
00062 void tearDown();
00063
00064 void testPortConnections();
00065 void testPortProxying();
00066
00067
00068 void testPortDataConnection();
00069 void testPortBufferConnection();
00070 void testPortDisconnected();
00071 };
00072
00073 using namespace std;
00074 using corba::TaskContextProxy;
00075
00076 void
00077 CorbaMQueueIPCTest::setUp()
00078 {
00079
00080 mr1 = new InputPort<double>("mr");
00081 mw1 = new OutputPort<double>("mw");
00082
00083 tc = new TaskContext( "root" );
00084 tc->ports()->addPort( *mr1 );
00085 tc->ports()->addPort( *mw1 );
00086
00087 ts2 = ts = 0;
00088 tp2 = tp = 0;
00089 }
00090
00091
00092 void
00093 CorbaMQueueIPCTest::tearDown()
00094 {
00095 delete tp;
00096 delete ts;
00097 delete tp2;
00098 delete tc;
00099
00100 delete mr1;
00101 delete mw1;
00102 }
00103
00104 void CorbaMQueueIPCTest::new_data_listener(base::PortInterface* port)
00105 {
00106 signalled_port = port;
00107 }
00108
00109
00110 #define ASSERT_PORT_SIGNALLING(code, read_port) \
00111 signalled_port = 0; \
00112 code; \
00113 usleep(100000); \
00114 BOOST_CHECK( read_port == signalled_port );
00115
00116 bool wait_for_helper;
00117 #define wait_for( cond, times ) \
00118 wait = 0; \
00119 while( (wait_for_helper = !(cond)) && wait++ != times ) \
00120 usleep(100000); \
00121 if (wait_for_helper) BOOST_CHECK( cond );
00122
00123 #define wait_for_equal( a, b, times ) \
00124 wait = 0; \
00125 while( (wait_for_helper = ((a) != (b))) && wait++ != times ) \
00126 usleep(100000); \
00127 if (wait_for_helper) BOOST_CHECK_EQUAL( a, b );
00128
00129 void CorbaMQueueIPCTest::testPortDataConnection()
00130 {
00131
00132
00133 BOOST_CHECK( mw1->connected() );
00134 BOOST_CHECK( mr1->connected() );
00135
00136 double value = 0;
00137
00138
00139 BOOST_CHECK( !mr1->read(value) );
00140
00141
00142 ASSERT_PORT_SIGNALLING(mw1->write(1.0), mr1)
00143 BOOST_CHECK( mr1->read(value) );
00144 BOOST_CHECK_EQUAL( 1.0, value );
00145 ASSERT_PORT_SIGNALLING(mw1->write(2.0), mr1);
00146 BOOST_CHECK( mr1->read(value) );
00147 BOOST_CHECK_EQUAL( 2.0, value );
00148 }
00149
00150 void CorbaMQueueIPCTest::testPortBufferConnection()
00151 {
00152
00153
00154 BOOST_CHECK( mw1->connected() );
00155 BOOST_CHECK( mr1->connected() );
00156
00157 double value = 0;
00158
00159
00160 BOOST_CHECK( !mr1->read(value) );
00161
00162
00163 ASSERT_PORT_SIGNALLING(mw1->write(1.0), mr1);
00164 ASSERT_PORT_SIGNALLING(mw1->write(2.0), mr1);
00165 ASSERT_PORT_SIGNALLING(mw1->write(3.0), mr1);
00166
00167
00168
00169 BOOST_CHECK( mr1->read(value) );
00170 BOOST_CHECK_EQUAL( 1.0, value );
00171 BOOST_CHECK( mr1->read(value) );
00172 BOOST_CHECK_EQUAL( 2.0, value );
00173 BOOST_CHECK( mr1->read(value) );
00174 BOOST_CHECK_EQUAL( 3.0, value );
00175 BOOST_CHECK( mr1->read(value) );
00176 BOOST_CHECK_EQUAL( 1.0, value );
00177 BOOST_CHECK( mr1->read(value) );
00178 BOOST_CHECK_EQUAL( 2.0, value );
00179 BOOST_CHECK( mr1->read(value) );
00180 BOOST_CHECK_EQUAL( 3.0, value );
00181 BOOST_CHECK_EQUAL( mr1->read(value), OldData );
00182 }
00183
00184 void CorbaMQueueIPCTest::testPortDisconnected()
00185 {
00186 BOOST_CHECK( !mw1->connected() );
00187 BOOST_CHECK( !mr1->connected() );
00188 }
00189
00190
00191
00192 BOOST_FIXTURE_TEST_SUITE( CorbaMQueueIPCTestSuite, CorbaMQueueIPCTest )
00193
00194 BOOST_AUTO_TEST_CASE( testPortConnections )
00195 {
00196
00197 ts = corba::TaskContextServer::Create( tc, false );
00198
00199
00200 tp = corba::TaskContextProxy::CreateFromFile( "root.ior");
00201 BOOST_CHECK( tp );
00202
00203
00204 RTT::corba::CConnPolicy policy = toCORBA( RTT::ConnPolicy() );
00205 policy.type = RTT::corba::CData;
00206 policy.init = false;
00207 policy.lock_policy = RTT::corba::CLockFree;
00208 policy.size = 0;
00209 policy.data_size = 0;
00210
00211
00212 Handle hl( mr1->getNewDataOnPortEvent()->setup(
00213 boost::bind(&CorbaMQueueIPCTest::new_data_listener, this, _1) ) );
00214 hl.connect();
00215
00216 corba::CDataFlowInterface_var ports = ts->server()->ports();
00217 corba::CDataFlowInterface_var ports2 = tp->server()->ports();
00218
00219
00220
00221
00222
00223 policy.type = RTT::corba::CData;
00224 policy.pull = false;
00225 policy.transport = ORO_MQUEUE_PROTOCOL_ID;
00226 BOOST_CHECK( ports->createConnection("mw", ports2, "mr", policy) );
00227 BOOST_CHECK( ports2->createConnection("mw", ports, "mr", policy) );
00228 usleep(100000);
00229 testPortDataConnection();
00230 ports->disconnectPort("mw");
00231 ports2->disconnectPort("mw");
00232 testPortDisconnected();
00233
00234 policy.type = RTT::corba::CData;
00235 policy.pull = true;
00236 policy.transport = ORO_MQUEUE_PROTOCOL_ID;
00237 BOOST_CHECK( ports->createConnection("mw", ports2, "mr", policy) );
00238 BOOST_CHECK( ports2->createConnection("mw", ports, "mr", policy) );
00239 usleep(100000);
00240 testPortDataConnection();
00241 ports2->disconnectPort("mr");
00242 ports->disconnectPort("mr");
00243 testPortDisconnected();
00244
00245 #if 1
00246
00247 policy.type = RTT::corba::CBuffer;
00248 policy.pull = false;
00249 policy.size = 3;
00250 policy.transport = ORO_MQUEUE_PROTOCOL_ID;
00251 BOOST_CHECK( ports->createConnection("mw", ports2, "mr", policy) );
00252 BOOST_CHECK( ports2->createConnection("mw", ports, "mr", policy) );
00253 testPortBufferConnection();
00254 ports->disconnectPort("mw");
00255 ports2->disconnectPort("mw");
00256 testPortDisconnected();
00257
00258 policy.type = RTT::corba::CBuffer;
00259 policy.pull = true;
00260 policy.size = 3;
00261 policy.transport = ORO_MQUEUE_PROTOCOL_ID;
00262 BOOST_CHECK( ports->createConnection("mw", ports2, "mr", policy) );
00263 BOOST_CHECK( ports2->createConnection("mw", ports, "mr", policy) );
00264 testPortBufferConnection();
00265 ports->disconnectPort("mw");
00266 ports2->disconnectPort("mw");
00267 testPortDisconnected();
00268 #endif
00269 }
00270
00271 BOOST_AUTO_TEST_SUITE_END()
00272