00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #include "unit.hpp"
00020
00021 #include <iostream>
00022
00023 #include <rtt/OperationCaller.hpp>
00024 #include <rtt/Service.hpp>
00025 #include <transports/corba/DataFlowI.h>
00026 #include <rtt/transports/corba/RemotePorts.hpp>
00027 #include <transports/corba/ServiceC.h>
00028 #include <transports/corba/corba.h>
00029 #include <transports/corba/CorbaConnPolicy.hpp>
00030 #include <rtt/InputPort.hpp>
00031 #include <rtt/OutputPort.hpp>
00032 #include <rtt/TaskContext.hpp>
00033 #include <transports/corba/TaskContextServer.hpp>
00034 #include <transports/corba/TaskContextProxy.hpp>
00035 #include <transports/corba/CorbaLib.hpp>
00036 #include <rtt/internal/DataSourceTypeInfo.hpp>
00037
00038 #include <string>
00039 #include <stdlib.h>
00040
00041 using namespace RTT;
00042 using namespace RTT::detail;
00043
00044 class CorbaTest : public TaskContext
00045 {
00046 public:
00047 CorbaTest() : TaskContext("CorbaTest") { this->setUp(); }
00048 ~CorbaTest() { this->tearDown(); }
00049
00050 TaskContext* tc;
00051 TaskContext* t2;
00052 TaskContextProxy* tp;
00053 corba::TaskContextServer* ts;
00054 TaskContext* tp2;
00055 corba::TaskContextServer* ts2;
00056 CTaskContext_ptr s;
00057 CTaskContext_ptr s2;
00058
00059 base::PortInterface* signalled_port;
00060 void new_data_listener(base::PortInterface* port);
00061
00062
00063 InputPort<double>* mi;
00064 OutputPort<double>* mo;
00065 bool is_calling, is_sending;
00066 SendHandle<void(TaskContext*, string const&)> handle;
00067
00068 int wait, cbcount;
00069
00070 void setUp();
00071 void tearDown();
00072
00073
00074 void testPortDataConnection();
00075 void testPortBufferConnection();
00076 void testPortDisconnected();
00077
00078 void callBackPeer(TaskContext* peer, string const& opname) {
00079 OperationCaller<void(TaskContext*, string const&)> op1( peer->getOperation(opname), this->engine());
00080 int count = ++cbcount;
00081 log(Info) << "Test executes callBackPeer():"<< count <<endlog();
00082 if (!is_calling) {
00083 is_calling = true;
00084 log(Info) << "Test calls server:" << count <<endlog();
00085 op1(this, "callBackPeer");
00086 log(Info) << "Test finishes server call:"<<count <<endlog();
00087 }
00088
00089 if (!is_sending) {
00090 is_sending = true;
00091 log(Info) << "Test sends server:"<<count <<endlog();
00092 handle = op1.send(this, "callBackPeerOwn");
00093 log(Info) << "Test finishes server send:"<< count <<endlog();
00094 }
00095 log(Info) << "Test finishes callBackPeer():"<< count <<endlog();
00096 }
00097
00098 };
00099
00100 using namespace std;
00101 using corba::TaskContextProxy;
00102
00103 void
00104 CorbaTest::setUp()
00105 {
00106
00107 mi = new InputPort<double>("mi");
00108 mo = new OutputPort<double>("mo");
00109
00110 tc = new TaskContext( "root" );
00111 tc->ports()->addPort( *mi );
00112 tc->ports()->addPort( *mo );
00113
00114 t2 = 0;
00115 ts2 = ts = 0;
00116 tp2 = tp = 0;
00117 wait = cbcount = 0;
00118 is_calling = false, is_sending = false;
00119
00120 addOperation("callBackPeer", &CorbaTest::callBackPeer, this,ClientThread);
00121 addOperation("callBackPeerOwn", &CorbaTest::callBackPeer, this,OwnThread);
00122 }
00123
00124
00125 void
00126 CorbaTest::tearDown()
00127 {
00128 delete tp;
00129 delete ts;
00130 delete tp2;
00131 delete ts2;
00132 delete tc;
00133 delete t2;
00134
00135 delete mi;
00136 delete mo;
00137 }
00138
00139 void CorbaTest::new_data_listener(base::PortInterface* port)
00140 {
00141 signalled_port = port;
00142 }
00143
00144
00145 #define ASSERT_PORT_SIGNALLING(code, read_port) \
00146 signalled_port = 0; wait = 0;\
00147 code; \
00148 while (read_port != signalled_port && wait++ != 5) \
00149 usleep(100000); \
00150 BOOST_CHECK( read_port == signalled_port );
00151
00152 bool wait_for_helper;
00153 #define wait_for( cond, times ) \
00154 wait = 0; \
00155 while( (wait_for_helper = !(cond)) && wait++ != times ) \
00156 usleep(100000); \
00157 if (wait_for_helper) BOOST_CHECK( cond );
00158
00159 #define wait_for_equal( a, b, times ) \
00160 wait = 0; \
00161 while( (wait_for_helper = ((a) != (b))) && wait++ != times ) \
00162 usleep(100000); \
00163 if (wait_for_helper) BOOST_CHECK_EQUAL( a, b );
00164
00165
00166 void CorbaTest::testPortDataConnection()
00167 {
00168
00169
00170 BOOST_CHECK( mo->connected() );
00171 BOOST_CHECK( mi->connected() );
00172
00173 double value = 0;
00174
00175
00176 BOOST_CHECK_EQUAL( mi->read(value), NoData );
00177
00178
00179 ASSERT_PORT_SIGNALLING(mo->write(1.0), mi)
00180 BOOST_CHECK( mi->read(value) );
00181 BOOST_CHECK_EQUAL( 1.0, value );
00182 ASSERT_PORT_SIGNALLING(mo->write(2.0), mi);
00183 BOOST_CHECK( mi->read(value) );
00184 BOOST_CHECK_EQUAL( 2.0, value );
00185 }
00186
00187 void CorbaTest::testPortBufferConnection()
00188 {
00189
00190
00191 BOOST_CHECK( mo->connected() );
00192 BOOST_CHECK( mi->connected() );
00193
00194 double value = 0;
00195
00196
00197 BOOST_CHECK_EQUAL( mi->read(value), NoData );
00198
00199
00200 ASSERT_PORT_SIGNALLING(mo->write(1.0), mi);
00201 ASSERT_PORT_SIGNALLING(mo->write(2.0), mi);
00202 ASSERT_PORT_SIGNALLING(mo->write(3.0), mi);
00203 ASSERT_PORT_SIGNALLING(mo->write(4.0), 0);
00204 BOOST_CHECK( mi->read(value) );
00205 BOOST_CHECK_EQUAL( 1.0, value );
00206 BOOST_CHECK( mi->read(value) );
00207 BOOST_CHECK_EQUAL( 2.0, value );
00208 BOOST_CHECK( mi->read(value) );
00209 BOOST_CHECK_EQUAL( 3.0, value );
00210 BOOST_CHECK_EQUAL( mi->read(value), OldData );
00211 }
00212
00213 void CorbaTest::testPortDisconnected()
00214 {
00215 BOOST_CHECK( !mo->connected() );
00216 BOOST_CHECK( !mi->connected() );
00217 }
00218
00219
00220
00221 BOOST_FIXTURE_TEST_SUITE( CorbaIPCTestSuite, CorbaTest )
00222
00223
00224 BOOST_AUTO_TEST_CASE( testRemoteOperationCallerC )
00225 {
00226 tp = corba::TaskContextProxy::Create( "peerRMC", false );
00227 if (!tp )
00228 tp = corba::TaskContextProxy::CreateFromFile( "peerRMC.ior");
00229 BOOST_REQUIRE( tp );
00230
00231
00232 internal::OperationCallerC mc;
00233 double r = 0.0;
00234 mc = tp->provides("methods")->create("vm0", tc->engine() );
00235 BOOST_CHECK( mc.call() );
00236 BOOST_CHECK( r == 0.0 );
00237
00238 mc = tp->provides("methods")->create("m0", tc->engine() ).ret( r );
00239 BOOST_CHECK( mc.call() );
00240 BOOST_CHECK( r == -1.0 );
00241
00242 mc = tp->provides("methods")->create("m2", tc->engine()).argC(1).argC(2.0).ret( r );
00243 BOOST_CHECK( mc.call() );
00244 BOOST_CHECK( r == -3.0 );
00245
00246 mc = tp->provides("methods")->create("m3", tc->engine()).ret( r ).argC(1).argC(2.0).argC(true);
00247 BOOST_CHECK( mc.call() );
00248 BOOST_CHECK( r == -4.0 );
00249
00250 }
00251
00252 BOOST_AUTO_TEST_CASE( testRemoteOperationCaller )
00253 {
00254 tp = corba::TaskContextProxy::Create( "peerRM" , false);
00255 if (!tp )
00256 tp = corba::TaskContextProxy::CreateFromFile( "peerRM.ior");
00257 BOOST_REQUIRE(tp);
00258
00259
00260 RTT::OperationCaller<double(void)> m0 = tp->provides("methods")->getOperation("m0");
00261 RTT::OperationCaller<double(int)> m1 = tp->provides("methods")->getOperation("m1");
00262 RTT::OperationCaller<double(int,double)> m2 = tp->provides("methods")->getOperation("m2");
00263 RTT::OperationCaller<double(int,double,bool)> m3 = tp->provides("methods")->getOperation("m3");
00264 RTT::OperationCaller<double(int,double,bool,std::string)> m4 = tp->provides("methods")->getOperation("m4");
00265
00266 BOOST_CHECK_EQUAL( -1.0, m0() );
00267 BOOST_CHECK_EQUAL( -2.0, m1(1) );
00268 BOOST_CHECK_EQUAL( -3.0, m2(1, 2.0) );
00269 BOOST_CHECK_EQUAL( -4.0, m3(1, 2.0, true) );
00270 BOOST_CHECK_EQUAL( -5.0, m4(1, 2.0, true,"hello") );
00271 }
00272
00277 BOOST_AUTO_TEST_CASE( testRemoteOperationCallerCallback )
00278 {
00279 tp = corba::TaskContextProxy::Create( "peerRMCb" , false);
00280 if (!tp )
00281 tp = corba::TaskContextProxy::CreateFromFile( "peerRMC.ior");
00282 BOOST_REQUIRE(tp);
00283
00284 BOOST_REQUIRE( RTT::internal::DataSourceTypeInfo<TaskContext*>::getTypeInfo() != 0 );
00285 BOOST_REQUIRE( RTT::internal::DataSourceTypeInfo<TaskContext*>::getTypeInfo() != RTT::internal::DataSourceTypeInfo<UnknownType>::getTypeInfo());
00286 BOOST_REQUIRE( RTT::internal::DataSourceTypeInfo<TaskContext*>::getTypeInfo()->getProtocol(ORO_CORBA_PROTOCOL_ID) != 0 );
00287
00288 this->callBackPeer(tp, "callBackPeer");
00289 sleep(1);
00290 BOOST_CHECK( is_calling );
00291 BOOST_CHECK( is_sending );
00292 BOOST_CHECK( handle.ready() );
00293 BOOST_CHECK_EQUAL( handle.collectIfDone(), SendSuccess );
00294 }
00295
00296 BOOST_AUTO_TEST_CASE( testAnyOperationCaller )
00297 {
00298 double d;
00299 tp = corba::TaskContextProxy::Create( "peerAM" , false);
00300 if (!tp )
00301 tp = corba::TaskContextProxy::CreateFromFile( "peerAM.ior");
00302
00303 BOOST_REQUIRE(tp);
00304 s = tp->server();
00305 BOOST_REQUIRE( s );
00306
00307 corba::CService_var co = s->getProvider("methods");
00308 BOOST_CHECK( co.in() );
00309
00310 corba::CAnyArguments_var any_args = new corba::CAnyArguments(0);
00311 CORBA::Any_var vm0 = co->callOperation("vm0", any_args.inout() );
00312
00313
00314 CORBA::Any_var m0 = co->callOperation("m0", any_args.inout());
00315 BOOST_CHECK( m0 >>= d );
00316 BOOST_CHECK_EQUAL(d, -1.0 );
00317
00318 any_args = new corba::CAnyArguments(1);
00319 any_args->length(1);
00320 unsigned int index = 0;
00321 any_args[index] <<= (CORBA::Long) 1;
00322 CORBA::Any_var m1;
00323 BOOST_CHECK_NO_THROW( m1 = co->callOperation("m1", any_args.inout()));
00324 BOOST_CHECK( m1 >>= d );
00325 BOOST_CHECK_EQUAL(d, -2.0 );
00326
00327
00328 any_args = new corba::CAnyArguments(2);
00329 any_args->length(2);
00330 index = 0;
00331 any_args[index] <<= (CORBA::Long) 1;
00332 ++index;
00333 any_args[index] <<= (CORBA::Double) 2.0;
00334 CORBA::Any_var m2;
00335 BOOST_CHECK_NO_THROW( m2 = co->callOperation("m2", any_args.inout()));
00336 BOOST_CHECK( m2 >>= d );
00337 BOOST_CHECK_EQUAL(d, -3.0 );
00338
00339 any_args = new corba::CAnyArguments(3);
00340 any_args->length(3);
00341 index = 0;
00342 any_args[index] <<= (CORBA::Long) 1;
00343 ++index;
00344 any_args[index] <<= (CORBA::Double) 2.0;
00345 ++index;
00346 any_args[index] <<= CORBA::Any::from_boolean( true );
00347 CORBA::Any_var m3;
00348 BOOST_CHECK_NO_THROW( m3= co->callOperation("m3", any_args.inout()) );
00349 BOOST_CHECK( m3 >>= d );
00350 BOOST_CHECK_EQUAL(d, -4.0 );
00351
00352 any_args = new corba::CAnyArguments(4);
00353 any_args->length(4);
00354 index = 0;
00355 any_args[index] <<= (CORBA::Long) 1;
00356 ++index;
00357 any_args[index] <<= (CORBA::Double) 2.0;
00358 ++index;
00359 any_args[index] <<= CORBA::Any::from_boolean( true );
00360 ++index;
00361 any_args[index] <<= "hello";
00362 CORBA::Any_var m4;
00363 BOOST_CHECK_NO_THROW ( m4 = co->callOperation("m4", any_args.inout()) );
00364 BOOST_CHECK( m4 >>= d );
00365 BOOST_CHECK_EQUAL(d, -5.0 );
00366 }
00367
00368 BOOST_AUTO_TEST_CASE(testDataFlowInterface)
00369 {
00370 tp = corba::TaskContextProxy::Create( "peerDFI" , false);
00371 if (!tp )
00372 tp = corba::TaskContextProxy::CreateFromFile( "peerDFI.ior");
00373
00374 corba::CDataFlowInterface_var ports = tp->server()->ports();
00375
00376 corba::CDataFlowInterface::CPortNames_var names =
00377 ports->getPorts();
00378
00379 BOOST_CHECK_EQUAL(CORBA::ULong(2), names->length());
00380 BOOST_CHECK_EQUAL(string("mi"), string(names[CORBA::ULong(0)]));
00381 BOOST_CHECK_EQUAL(string("mo"), string(names[CORBA::ULong(1)]));
00382
00383
00384 BOOST_CHECK_EQUAL(RTT::corba::COutput,
00385 ports->getPortType("mo"));
00386 BOOST_CHECK_EQUAL(RTT::corba::CInput,
00387 ports->getPortType("mi"));
00388
00389
00390 CORBA::String_var cstr = ports->getDataType("mo");
00391 BOOST_CHECK_EQUAL(string("double"),
00392 string(cstr.in()));
00393 }
00394
00395 BOOST_AUTO_TEST_CASE( testPortConnections )
00396 {
00397
00398 tp = corba::TaskContextProxy::Create( "peerPC" , false);
00399 if (!tp )
00400 tp = corba::TaskContextProxy::CreateFromFile( "peerPC.ior");
00401
00402 s = tp->server();
00403
00404 ts2 = corba::TaskContextServer::Create( tc, false );
00405 s2 = ts2->server();
00406
00407
00408 RTT::corba::CConnPolicy policy;
00409 policy.type = RTT::corba::CData;
00410 policy.init = false;
00411 policy.lock_policy = RTT::corba::CLockFree;
00412 policy.size = 0;
00413 policy.transport = ORO_CORBA_PROTOCOL_ID;
00414
00415
00416 Handle hl( mi->getNewDataOnPortEvent()->setup(
00417 boost::bind(&CorbaTest::new_data_listener, this, _1) ) );
00418 hl.connect();
00419
00420 corba::CDataFlowInterface_var ports = s->ports();
00421 corba::CDataFlowInterface_var ports2 = s2->ports();
00422
00423
00424 BOOST_CHECK_THROW( ports->createConnection("mo", ports2, "does_not_exist", policy), CNoSuchPortException );
00425 BOOST_CHECK_THROW( ports->createConnection("does_not_exist", ports2, "mi", policy), CNoSuchPortException );
00426 BOOST_CHECK_THROW( ports->createConnection("does_not_exist", ports2, "does_not_exist", policy), CNoSuchPortException );
00427 BOOST_CHECK_THROW( ports->createConnection("mo", ports2, "mo", policy), CNoSuchPortException );
00428 BOOST_CHECK_THROW( ports->createConnection("mi", ports2, "mi", policy), CNoSuchPortException );
00429 BOOST_CHECK_THROW( ports->createConnection("mi", ports2, "mo", policy), CNoSuchPortException );
00430
00431
00432
00433
00434
00435
00436
00437 policy.type = RTT::corba::CData;
00438 policy.pull = false;
00439 BOOST_CHECK( ports->createConnection("mo", ports2, "mi", policy) );
00440 BOOST_CHECK( ports2->createConnection("mo", ports, "mi", policy) );
00441 testPortDataConnection();
00442 ports->disconnectPort("mo");
00443 ports->disconnectPort("mi");
00444 testPortDisconnected();
00445
00446 return;
00447
00448 policy.type = RTT::corba::CData;
00449 policy.pull = true;
00450 BOOST_CHECK( ports2->createConnection("mo", ports, "mi", policy) );
00451 BOOST_CHECK( ports2->createConnection("mo", ports, "mi", policy) );
00452 testPortDataConnection();
00453 ports2->disconnectPort("mi");
00454 ports2->disconnectPort("mo");
00455 testPortDisconnected();
00456
00457 policy.type = RTT::corba::CBuffer;
00458 policy.pull = false;
00459 policy.size = 3;
00460 BOOST_CHECK( ports->createConnection("mo", ports2, "mi", policy) );
00461 BOOST_CHECK( ports2->createConnection("mo", ports, "mi", policy) );
00462 testPortBufferConnection();
00463 ports->disconnectPort("mo");
00464 ports->disconnectPort("mi");
00465 testPortDisconnected();
00466
00467 policy.type = RTT::corba::CBuffer;
00468 policy.pull = true;
00469 BOOST_CHECK( ports->createConnection("mo", ports2, "mi", policy) );
00470 BOOST_CHECK( ports2->createConnection("mo", ports, "mi", policy) );
00471 testPortBufferConnection();
00472 ports2->disconnectPort("mo");
00473 ports2->disconnectPort("mi");
00474 testPortDisconnected();
00475
00476 #if 0
00477
00478
00479 mo->createConnection(*mi);
00480
00481 ports->removeConnection("mo", ports2, "mi");
00482 ports->removeConnection("mi", ports2, "mo");
00483
00484 BOOST_CHECK(mo->connected());
00485 BOOST_CHECK(mi->connected());
00486 BOOST_CHECK(!mi->connected());
00487 #endif
00488 }
00489
00490 BOOST_AUTO_TEST_CASE( testPortProxying )
00491 {
00492
00493 tp = corba::TaskContextProxy::Create( "peerPP" , false);
00494 if (!tp )
00495 tp = corba::TaskContextProxy::CreateFromFile( "peerPP.ior");
00496
00497 base::PortInterface* untyped_port;
00498
00499 untyped_port = tp->ports()->getPort("mi");
00500 BOOST_CHECK(untyped_port);
00501 base::InputPortInterface* read_port = dynamic_cast<base::InputPortInterface*>(tp->ports()->getPort("mi"));
00502 BOOST_CHECK(read_port);
00503
00504 untyped_port = tp->ports()->getPort("mo");
00505 BOOST_CHECK(untyped_port);
00506 base::OutputPortInterface* write_port = dynamic_cast<base::OutputPortInterface*>(tp->ports()->getPort("mo"));
00507 BOOST_CHECK(write_port);
00508
00509
00510
00511 BOOST_CHECK(dynamic_cast<corba::RemoteInputPort*>(read_port));
00512 BOOST_CHECK(dynamic_cast<corba::RemoteOutputPort*>(write_port));
00513
00514 BOOST_CHECK(!read_port->connected());
00515 BOOST_CHECK(read_port->getTypeInfo() == mi->getTypeInfo());
00516 BOOST_CHECK(!write_port->connected());
00517 BOOST_CHECK(write_port->getTypeInfo() == mo->getTypeInfo());
00518
00519 mo->createConnection(*read_port);
00520 write_port->createConnection(*mi);
00521 BOOST_CHECK(read_port->connected());
00522 BOOST_CHECK(write_port->connected());
00523
00524
00525
00526 read_port->disconnect();
00527 write_port->disconnect();
00528 BOOST_CHECK(!read_port->connected());
00529 BOOST_CHECK(!write_port->connected());
00530
00531 mo->createConnection(*read_port);
00532 write_port->createConnection(*mi);
00533 BOOST_CHECK(read_port->connected());
00534 BOOST_CHECK(write_port->connected());
00535 write_port->disconnect();
00536 read_port->disconnect();
00537 BOOST_CHECK(!read_port->connected());
00538 BOOST_CHECK(!write_port->connected());
00539
00540
00541 auto_ptr<base::InputPortInterface> read_clone(dynamic_cast<base::InputPortInterface*>(read_port->clone()));
00542 BOOST_CHECK(mo->createConnection(*read_clone));
00543 BOOST_CHECK(read_clone->connected());
00544 BOOST_CHECK(!read_port->connected());
00545 mo->disconnect();
00546 }
00547
00548 BOOST_AUTO_TEST_CASE( testDataHalfs )
00549 {
00550 double result;
00551
00552 tp = corba::TaskContextProxy::Create( "peerDH" , false);
00553 if (!tp )
00554 tp = corba::TaskContextProxy::CreateFromFile( "peerDH.ior");
00555
00556 s = tp->server();
00557
00558
00559 RTT::corba::CConnPolicy policy;
00560 policy.type = RTT::corba::CData;
00561 policy.init = false;
00562 policy.lock_policy = RTT::corba::CLockFree;
00563 policy.size = 0;
00564 policy.transport = ORO_CORBA_PROTOCOL_ID;
00565
00566
00567 Handle hl( mi->getNewDataOnPortEvent()->setup(
00568 boost::bind(&CorbaTest::new_data_listener, this, _1) ) );
00569 BOOST_CHECK( hl.connect() );
00570
00571 corba::CDataFlowInterface_var ports = s->ports();
00572 BOOST_REQUIRE( ports.in() );
00573
00574
00575 policy.pull = false;
00576 mo->connectTo( tp->ports()->getPort("mi"), toRTT(policy) );
00577 CChannelElement_var cce = ports->buildChannelInput("mo", policy);
00578 CORBA::Any_var sample;
00579 BOOST_REQUIRE( cce.in() );
00580
00581
00582 mo->write( 3.33 );
00583 wait_for_equal( cce->read( sample.out(), true), CNewData, 5 );
00584 sample >>= result;
00585 BOOST_CHECK_EQUAL( result, 3.33);
00586
00587
00588 sample <<= 0.0;
00589 BOOST_CHECK_EQUAL( cce->read( sample.out(), true), COldData );
00590 sample >>= result;
00591 BOOST_CHECK_EQUAL( result, 3.33);
00592
00593 cce->disconnect();
00594 mo->disconnect();
00595
00596
00597 cce = ports->buildChannelOutput("mi", policy);
00598 ports->channelReady("mi", cce);
00599
00600 mi->connectTo( tp->ports()->getPort("mo"), toRTT(policy) );
00601 sample = new CORBA::Any();
00602 BOOST_REQUIRE( cce.in() );
00603
00604
00605 result = 0.0;
00606 sample <<= 4.44;
00607 cce->write( sample.in() );
00608 wait_for_equal( mi->read( result ), NewData, 5 );
00609 BOOST_CHECK_EQUAL( result, 4.44 );
00610
00611
00612 result = 0.0;
00613 BOOST_CHECK_EQUAL( mi->read( result ), OldData );
00614 BOOST_CHECK_EQUAL( result, 4.44);
00615 }
00616
00617 BOOST_AUTO_TEST_CASE( testBufferHalfs )
00618 {
00619 double result;
00620
00621
00622 tp = corba::TaskContextProxy::Create( "peerBH" , false);
00623 if (!tp )
00624 tp = corba::TaskContextProxy::CreateFromFile( "peerBH.ior");
00625
00626 s = tp->server();
00627
00628
00629 RTT::corba::CConnPolicy policy;
00630 policy.type = RTT::corba::CBuffer;
00631 policy.init = false;
00632 policy.lock_policy = RTT::corba::CLockFree;
00633 policy.size = 10;
00634 policy.transport = ORO_CORBA_PROTOCOL_ID;
00635
00636
00637 Handle hl( mi->getNewDataOnPortEvent()->setup(
00638 boost::bind(&CorbaTest::new_data_listener, this, _1) ) );
00639 BOOST_CHECK( hl.connect() );
00640
00641 corba::CDataFlowInterface_var ports = s->ports();
00642 BOOST_REQUIRE( ports.in() );
00643
00644
00645 policy.pull = false;
00646 mo->connectTo( tp->ports()->getPort("mi"), toRTT(policy) );
00647 CChannelElement_var cce = ports->buildChannelInput("mo", policy);
00648 CORBA::Any_var sample;
00649 BOOST_REQUIRE( cce.in() );
00650
00651
00652 mo->write( 6.33 );
00653 mo->write( 3.33 );
00654 wait_for_equal( cce->read( sample.out(), true), CNewData, 5 );
00655 sample >>= result;
00656 BOOST_CHECK_EQUAL( result, 6.33);
00657 wait_for_equal( cce->read( sample.out(), true ), CNewData, 10 );
00658 sample >>= result;
00659 BOOST_CHECK_EQUAL( result, 3.33);
00660
00661
00662 sample <<= 0.0;
00663 BOOST_CHECK_EQUAL( cce->read( sample.out(), true ), COldData );
00664 sample >>= result;
00665 BOOST_CHECK_EQUAL( result, 3.33);
00666
00667 cce->disconnect();
00668 mo->disconnect();
00669
00670
00671 mi->connectTo( tp->ports()->getPort("mo"), toRTT(policy) );
00672 cce = ports->buildChannelOutput("mi", policy);
00673 ports->channelReady("mi", cce);
00674 sample = new CORBA::Any();
00675 BOOST_REQUIRE( cce.in() );
00676
00677
00678 result = 0.0;
00679 sample <<= 6.44;
00680 cce->write( sample.in() );
00681 sample <<= 4.44;
00682 cce->write( sample.in() );
00683 wait_for_equal( mi->read( result ), NewData, 5 );
00684 BOOST_CHECK_EQUAL( result, 6.44 );
00685 wait_for_equal( mi->read( result ), NewData, 5 );
00686 BOOST_CHECK_EQUAL( result, 4.44 );
00687
00688
00689 result = 0.0;
00690 BOOST_CHECK_EQUAL( mi->read( result ), OldData );
00691 BOOST_CHECK_EQUAL( result, 4.44);
00692 }
00693
00694 BOOST_AUTO_TEST_SUITE_END()
00695