00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #include "unit.hpp"
00020
00021 #include "corba_mqueue_test.hpp"
00022
00023 #include <iostream>
00024
00025 #include <transports/corba/DataFlowI.h>
00026 #include <rtt/transports/corba/RemotePorts.hpp>
00027 #include <rtt/transports/mqueue/MQLib.hpp>
00028 #include <rtt/transports/corba/CorbaConnPolicy.hpp>
00029 #include <transports/corba/corba.h>
00030 #include <rtt/InputPort.hpp>
00031 #include <rtt/OutputPort.hpp>
00032 #include <rtt/TaskContext.hpp>
00033 #include <transports/corba/TaskContextServer.hpp>
00034 #include <transports/corba/TaskContextProxy.hpp>
00035 #include <string>
00036 #include <cstdlib>
00037
00038 using namespace RTT;
00039 using namespace RTT::detail;
00040
00041 class CorbaMQueueIPCTest
00042 {
00043 public:
00044 CorbaMQueueIPCTest() { this->setUp(); }
00045 ~CorbaMQueueIPCTest() { this->tearDown(); }
00046
00047 TaskContext* tc;
00048 corba::TaskContextProxy* tp, *tp2;
00049 corba::TaskContextServer* ts, *ts2;
00050
00051 base::PortInterface* signalled_port;
00052 void new_data_listener(base::PortInterface* port);
00053
00054
00055 InputPort<double>* mr1;
00056 OutputPort<double>* mw1;
00057
00058 void setUp();
00059 void tearDown();
00060
00061
00062 void testPortDataConnection();
00063 void testPortBufferConnection();
00064 void testPortDisconnected();
00065 };
00066
00067 using namespace std;
00068 using corba::TaskContextProxy;
00069
00070 void
00071 CorbaMQueueIPCTest::setUp()
00072 {
00073
00074 mr1 = new InputPort<double>("mr");
00075 mw1 = new OutputPort<double>("mw");
00076
00077 tc = new TaskContext( "root" );
00078 tc->ports()->addEventPort( *mr1, boost::bind(&CorbaMQueueIPCTest::new_data_listener, this, _1) );
00079 tc->ports()->addPort( *mw1 );
00080 tc->start();
00081
00082 ts2 = ts = 0;
00083 tp2 = tp = 0;
00084 }
00085
00086
00087 void
00088 CorbaMQueueIPCTest::tearDown()
00089 {
00090 delete tp;
00091 delete ts;
00092 delete tp2;
00093 delete tc;
00094
00095 delete mr1;
00096 delete mw1;
00097 }
00098
00099 void CorbaMQueueIPCTest::new_data_listener(base::PortInterface* port)
00100 {
00101 signalled_port = port;
00102 }
00103
00104
00105 #define ASSERT_PORT_SIGNALLING(code, read_port) \
00106 signalled_port = 0; \
00107 code; \
00108 usleep(100000); \
00109 BOOST_CHECK( read_port == signalled_port );
00110
00111 bool wait_for_helper;
00112 #define wait_for( cond, times ) \
00113 wait = 0; \
00114 while( (wait_for_helper = !(cond)) && wait++ != times ) \
00115 usleep(100000); \
00116 if (wait_for_helper) BOOST_CHECK( cond );
00117
00118 #define wait_for_equal( a, b, times ) \
00119 wait = 0; \
00120 while( (wait_for_helper = ((a) != (b))) && wait++ != times ) \
00121 usleep(100000); \
00122 if (wait_for_helper) BOOST_CHECK_EQUAL( a, b );
00123
00124 void CorbaMQueueIPCTest::testPortDataConnection()
00125 {
00126
00127
00128 BOOST_CHECK( mw1->connected() );
00129 BOOST_CHECK( mr1->connected() );
00130
00131 double value = 0;
00132
00133
00134 BOOST_CHECK( !mr1->read(value) );
00135
00136
00137 ASSERT_PORT_SIGNALLING(mw1->write(1.0), mr1)
00138 BOOST_CHECK( mr1->read(value) );
00139 BOOST_CHECK_EQUAL( 1.0, value );
00140 ASSERT_PORT_SIGNALLING(mw1->write(2.0), mr1);
00141 BOOST_CHECK( mr1->read(value) );
00142 BOOST_CHECK_EQUAL( 2.0, value );
00143 }
00144
00145 void CorbaMQueueIPCTest::testPortBufferConnection()
00146 {
00147
00148
00149 BOOST_CHECK( mw1->connected() );
00150 BOOST_CHECK( mr1->connected() );
00151
00152 double value = 0;
00153
00154
00155 BOOST_CHECK( !mr1->read(value) );
00156
00157
00158 ASSERT_PORT_SIGNALLING(mw1->write(1.0), mr1);
00159 ASSERT_PORT_SIGNALLING(mw1->write(2.0), mr1);
00160 ASSERT_PORT_SIGNALLING(mw1->write(3.0), mr1);
00161
00162
00163
00164 BOOST_CHECK( mr1->read(value) );
00165 BOOST_CHECK_EQUAL( 1.0, value );
00166 BOOST_CHECK( mr1->read(value) );
00167 BOOST_CHECK_EQUAL( 2.0, value );
00168 BOOST_CHECK( mr1->read(value) );
00169 BOOST_CHECK_EQUAL( 3.0, value );
00170 BOOST_CHECK( mr1->read(value) );
00171 BOOST_CHECK_EQUAL( 1.0, value );
00172 BOOST_CHECK( mr1->read(value) );
00173 BOOST_CHECK_EQUAL( 2.0, value );
00174 BOOST_CHECK( mr1->read(value) );
00175 BOOST_CHECK_EQUAL( 3.0, value );
00176 BOOST_CHECK_EQUAL( mr1->read(value), OldData );
00177 }
00178
00179 void CorbaMQueueIPCTest::testPortDisconnected()
00180 {
00181 BOOST_CHECK( !mw1->connected() );
00182 BOOST_CHECK( !mr1->connected() );
00183 }
00184
00185
00186
00187 BOOST_FIXTURE_TEST_SUITE( CorbaMQueueIPCTestSuite, CorbaMQueueIPCTest )
00188
00189 BOOST_AUTO_TEST_CASE( testPortConnections )
00190 {
00191
00192 ts = corba::TaskContextServer::Create( tc, false );
00193
00194
00195 tp = corba::TaskContextProxy::CreateFromFile( "root.ior");
00196 BOOST_CHECK( tp );
00197
00198
00199 RTT::corba::CConnPolicy policy = toCORBA( RTT::ConnPolicy() );
00200 policy.type = RTT::corba::CData;
00201 policy.init = false;
00202 policy.lock_policy = RTT::corba::CLockFree;
00203 policy.size = 0;
00204 policy.data_size = 0;
00205
00206 corba::CDataFlowInterface_var ports = ts->server()->ports();
00207 corba::CDataFlowInterface_var ports2 = tp->server()->ports();
00208
00209
00210
00211
00212
00213 policy.type = RTT::corba::CData;
00214 policy.pull = false;
00215 policy.transport = ORO_MQUEUE_PROTOCOL_ID;
00216 BOOST_CHECK( ports->createConnection("mw", ports2, "mr", policy) );
00217 BOOST_CHECK( ports2->createConnection("mw", ports, "mr", policy) );
00218 usleep(100000);
00219 testPortDataConnection();
00220 ports->disconnectPort("mw");
00221 ports2->disconnectPort("mw");
00222 testPortDisconnected();
00223
00224 policy.type = RTT::corba::CData;
00225 policy.pull = true;
00226 policy.transport = ORO_MQUEUE_PROTOCOL_ID;
00227 BOOST_CHECK( ports->createConnection("mw", ports2, "mr", policy) );
00228 BOOST_CHECK( ports2->createConnection("mw", ports, "mr", policy) );
00229 usleep(100000);
00230 testPortDataConnection();
00231 ports2->disconnectPort("mr");
00232 ports->disconnectPort("mr");
00233 testPortDisconnected();
00234
00235 #if 1
00236
00237 policy.type = RTT::corba::CBuffer;
00238 policy.pull = false;
00239 policy.size = 3;
00240 policy.transport = ORO_MQUEUE_PROTOCOL_ID;
00241 BOOST_CHECK( ports->createConnection("mw", ports2, "mr", policy) );
00242 BOOST_CHECK( ports2->createConnection("mw", ports, "mr", policy) );
00243 testPortBufferConnection();
00244 ports->disconnectPort("mw");
00245 ports2->disconnectPort("mw");
00246 testPortDisconnected();
00247
00248 policy.type = RTT::corba::CBuffer;
00249 policy.pull = true;
00250 policy.size = 3;
00251 policy.transport = ORO_MQUEUE_PROTOCOL_ID;
00252 BOOST_CHECK( ports->createConnection("mw", ports2, "mr", policy) );
00253 BOOST_CHECK( ports2->createConnection("mw", ports, "mr", policy) );
00254 testPortBufferConnection();
00255 ports->disconnectPort("mw");
00256 ports2->disconnectPort("mw");
00257 testPortDisconnected();
00258 #endif
00259 }
00260
00261 BOOST_AUTO_TEST_SUITE_END()
00262