Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #include "unit.hpp"
00020
00021 #include "corba_mqueue_test.hpp"
00022
00023 #include <iostream>
00024
00025 #include <transports/corba/DataFlowI.h>
00026 #include <rtt/transports/corba/RemotePorts.hpp>
00027 #include <rtt/transports/mqueue/MQLib.hpp>
00028 #include <rtt/transports/corba/CorbaConnPolicy.hpp>
00029
00030 using namespace std;
00031 using corba::TaskContextProxy;
00032
00033 void
00034 CorbaMQueueTest::setUp()
00035 {
00036
00037 mr1 = new InputPort<double>("mr");
00038 mw1 = new OutputPort<double>("mw");
00039
00040 mr2 = new InputPort<double>("mr");
00041 mw2 = new OutputPort<double>("mw");
00042
00043 tc = new TaskContext( "localroot" );
00044 tc->ports()->addEventPort( *mr1, boost::bind(&CorbaMQueueTest::new_data_listener, this, _1) );
00045 tc->ports()->addPort( *mw1 );
00046
00047 t2 = new TaskContext("localother");
00048 t2->ports()->addEventPort( *mr2,boost::bind(&CorbaMQueueTest::new_data_listener, this, _1) );
00049 t2->ports()->addPort( *mw2 );
00050
00051 ts2 = ts = 0;
00052 tp2 = tp = 0;
00053 }
00054
00055
00056 void
00057 CorbaMQueueTest::tearDown()
00058 {
00059 delete tp;
00060 delete ts;
00061 delete tp2;
00062 delete ts2;
00063 delete tc;
00064 delete t2;
00065
00066 delete mr1;
00067 delete mw1;
00068 delete mr2;
00069 delete mw2;
00070 }
00071
00072 void CorbaMQueueTest::new_data_listener(base::PortInterface* port)
00073 {
00074 signalled_port = port;
00075 }
00076
00077
00078 #define ASSERT_PORT_SIGNALLING(code, read_port) \
00079 signalled_port = 0; \
00080 code; \
00081 usleep(100000); \
00082 BOOST_CHECK( read_port == signalled_port );
00083
00084 void CorbaMQueueTest::testPortDataConnection()
00085 {
00086
00087
00088 BOOST_CHECK( mw1->connected() );
00089 BOOST_CHECK( mr2->connected() );
00090
00091 double value = 0;
00092
00093
00094 BOOST_CHECK( !mr2->read(value) );
00095
00096
00097 ASSERT_PORT_SIGNALLING(mw1->write(1.0), mr2)
00098 BOOST_CHECK( mr2->read(value) );
00099 BOOST_CHECK_EQUAL( 1.0, value );
00100 ASSERT_PORT_SIGNALLING(mw1->write(2.0), mr2);
00101 BOOST_CHECK( mr2->read(value) );
00102 BOOST_CHECK_EQUAL( 2.0, value );
00103 }
00104
00105 void CorbaMQueueTest::testPortBufferConnection()
00106 {
00107
00108
00109 BOOST_CHECK( mw1->connected() );
00110 BOOST_CHECK( mr2->connected() );
00111
00112 double value = 0;
00113
00114
00115 BOOST_CHECK( !mr2->read(value) );
00116
00117
00118 ASSERT_PORT_SIGNALLING(mw1->write(1.0), mr2);
00119 ASSERT_PORT_SIGNALLING(mw1->write(2.0), mr2);
00120 ASSERT_PORT_SIGNALLING(mw1->write(3.0), mr2);
00121
00122
00123 BOOST_CHECK( mr2->read(value) );
00124 BOOST_CHECK_EQUAL( 1.0, value );
00125 BOOST_CHECK( mr2->read(value) );
00126 BOOST_CHECK_EQUAL( 2.0, value );
00127 BOOST_CHECK( mr2->read(value) );
00128 BOOST_CHECK_EQUAL( 3.0, value );
00129 BOOST_CHECK_EQUAL( mr2->read(value), OldData );
00130 }
00131
00132 void CorbaMQueueTest::testPortDisconnected()
00133 {
00134 BOOST_CHECK( !mw1->connected() );
00135 BOOST_CHECK( !mr2->connected() );
00136 }
00137
00138
00139
00140 BOOST_FIXTURE_TEST_SUITE( CorbaMQueueTestSuite, CorbaMQueueTest )
00141
00142
00143 BOOST_AUTO_TEST_CASE( testPortConnections )
00144 {
00145
00146 ts = corba::TaskContextServer::Create( tc, false );
00147 ts2 = corba::TaskContextServer::Create( t2, false );
00148
00149
00150 RTT::corba::CConnPolicy policy = toCORBA( RTT::ConnPolicy() );
00151 policy.type = RTT::corba::CData;
00152 policy.init = false;
00153 policy.lock_policy = RTT::corba::CLockFree;
00154 policy.size = 0;
00155 policy.data_size = 0;
00156
00157 corba::CDataFlowInterface_var ports = ts->server()->ports();
00158 corba::CDataFlowInterface_var ports2 = ts2->server()->ports();
00159
00160
00161 BOOST_CHECK( t2->start() );
00162
00163
00164
00165
00166 policy.type = RTT::corba::CData;
00167 policy.pull = false;
00168 policy.transport = ORO_MQUEUE_PROTOCOL_ID;
00169 BOOST_CHECK( ports->createConnection("mw", ports2, "mr", policy) );
00170 usleep(100000);
00171 testPortDataConnection();
00172 ports->disconnectPort("mw");
00173 testPortDisconnected();
00174
00175 policy.type = RTT::corba::CData;
00176 policy.pull = true;
00177 policy.transport = ORO_MQUEUE_PROTOCOL_ID;
00178 BOOST_CHECK( ports->createConnection("mw", ports2, "mr", policy) );
00179 usleep(100000);
00180 testPortDataConnection();
00181 ports2->disconnectPort("mr");
00182 testPortDisconnected();
00183
00184 #if 1
00185
00186 policy.type = RTT::corba::CBuffer;
00187 policy.pull = false;
00188 policy.size = 3;
00189 policy.transport = ORO_MQUEUE_PROTOCOL_ID;
00190 BOOST_CHECK( ports->createConnection("mw", ports2, "mr", policy) );
00191 testPortBufferConnection();
00192 ports->disconnectPort("mw");
00193 testPortDisconnected();
00194
00195 policy.type = RTT::corba::CBuffer;
00196 policy.pull = true;
00197 policy.size = 3;
00198 policy.transport = ORO_MQUEUE_PROTOCOL_ID;
00199 BOOST_CHECK( ports->createConnection("mw", ports2, "mr", policy) );
00200 testPortBufferConnection();
00201 ports->disconnectPort("mw");
00202 testPortDisconnected();
00203 #endif
00204 }
00205
00206 BOOST_AUTO_TEST_SUITE_END()
00207