00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #include "unit.hpp"
00020
00021 #include <iostream>
00022
00023 #include <rtt/OperationCaller.hpp>
00024 #include <rtt/Service.hpp>
00025 #include <transports/corba/DataFlowI.h>
00026 #include <rtt/transports/corba/RemotePorts.hpp>
00027 #include <transports/corba/ServiceC.h>
00028 #include <transports/corba/corba.h>
00029 #include <transports/corba/CorbaConnPolicy.hpp>
00030 #include <rtt/InputPort.hpp>
00031 #include <rtt/OutputPort.hpp>
00032 #include <rtt/TaskContext.hpp>
00033 #include <transports/corba/TaskContextServer.hpp>
00034 #include <transports/corba/TaskContextProxy.hpp>
00035 #include <transports/corba/CorbaLib.hpp>
00036 #include <rtt/internal/DataSourceTypeInfo.hpp>
00037
00038 #include <string>
00039 #include <stdlib.h>
00040
00041 using namespace RTT;
00042 using namespace RTT::detail;
00043
00044 class CorbaTest : public TaskContext
00045 {
00046 public:
00047 CorbaTest() : TaskContext("CorbaTest") { this->setUp(); }
00048 ~CorbaTest() { this->tearDown(); }
00049
00050 TaskContext* tc;
00051 TaskContext* t2;
00052 TaskContextProxy* tp;
00053 corba::TaskContextServer* ts;
00054 TaskContext* tp2;
00055 corba::TaskContextServer* ts2;
00056 CTaskContext_ptr s;
00057 CTaskContext_ptr s2;
00058
00059 base::PortInterface* signalled_port;
00060 void new_data_listener(base::PortInterface* port);
00061
00062
00063 InputPort<double>* mi;
00064 OutputPort<double>* mo;
00065 bool is_calling, is_sending;
00066 SendHandle<void(TaskContext*, string const&)> handle;
00067
00068 int wait, cbcount;
00069
00070 void setUp();
00071 void tearDown();
00072
00073
00074 void testPortDataConnection();
00075 void testPortBufferConnection();
00076 void testPortDisconnected();
00077
00078 void callBackPeer(TaskContext* peer, string const& opname) {
00079 OperationCaller<void(TaskContext*, string const&)> op1( peer->getOperation(opname), this->engine());
00080 int count = ++cbcount;
00081 log(Info) << "Test executes callBackPeer():"<< count <<endlog();
00082 if (!is_calling) {
00083 is_calling = true;
00084 log(Info) << "Test calls server:" << count <<endlog();
00085 op1(this, "callBackPeer");
00086 log(Info) << "Test finishes server call:"<<count <<endlog();
00087 }
00088
00089 if (!is_sending) {
00090 is_sending = true;
00091 log(Info) << "Test sends server:"<<count <<endlog();
00092 handle = op1.send(this, "callBackPeerOwn");
00093 log(Info) << "Test finishes server send:"<< count <<endlog();
00094 }
00095 log(Info) << "Test finishes callBackPeer():"<< count <<endlog();
00096 }
00097
00098 };
00099
00100 using namespace std;
00101 using corba::TaskContextProxy;
00102
00103 void
00104 CorbaTest::setUp()
00105 {
00106
00107 mi = new InputPort<double>("mi");
00108 mo = new OutputPort<double>("mo");
00109
00110 tc = new TaskContext( "root" );
00111 tc->ports()->addEventPort( *mi,boost::bind(&CorbaTest::new_data_listener, this, _1) );
00112 tc->ports()->addPort( *mo );
00113
00114 t2 = 0;
00115 ts2 = ts = 0;
00116 tp2 = tp = 0;
00117 wait = cbcount = 0;
00118 is_calling = false, is_sending = false;
00119
00120 addOperation("callBackPeer", &CorbaTest::callBackPeer, this,ClientThread);
00121 addOperation("callBackPeerOwn", &CorbaTest::callBackPeer, this,OwnThread);
00122 }
00123
00124
00125 void
00126 CorbaTest::tearDown()
00127 {
00128 delete tp;
00129 delete ts;
00130 delete tp2;
00131 delete ts2;
00132 delete tc;
00133 delete t2;
00134
00135 delete mi;
00136 delete mo;
00137 }
00138
00139 void CorbaTest::new_data_listener(base::PortInterface* port)
00140 {
00141 signalled_port = port;
00142 }
00143
00144
00145 #define ASSERT_PORT_SIGNALLING(code, read_port) \
00146 signalled_port = 0; wait = 0;\
00147 code; \
00148 while (read_port != signalled_port && wait++ != 5) \
00149 usleep(100000); \
00150 BOOST_CHECK( read_port == signalled_port );
00151
00152 bool wait_for_helper;
00153 #define wait_for( cond, times ) \
00154 wait = 0; \
00155 while( (wait_for_helper = !(cond)) && wait++ != times ) \
00156 usleep(100000); \
00157 if (wait_for_helper) BOOST_CHECK( cond );
00158
00159 #define wait_for_equal( a, b, times ) \
00160 wait = 0; \
00161 while( (wait_for_helper = ((a) != (b))) && wait++ != times ) \
00162 usleep(100000); \
00163 if (wait_for_helper) BOOST_CHECK_EQUAL( a, b );
00164
00165
00166 void CorbaTest::testPortDataConnection()
00167 {
00168
00169
00170 BOOST_CHECK( mo->connected() );
00171 BOOST_CHECK( mi->connected() );
00172
00173 double value = 0;
00174
00175
00176 BOOST_CHECK_EQUAL( mi->read(value), NoData );
00177
00178
00179 ASSERT_PORT_SIGNALLING(mo->write(1.0), mi)
00180 BOOST_CHECK( mi->read(value) );
00181 BOOST_CHECK_EQUAL( 1.0, value );
00182 ASSERT_PORT_SIGNALLING(mo->write(2.0), mi);
00183 BOOST_CHECK( mi->read(value) );
00184 BOOST_CHECK_EQUAL( 2.0, value );
00185 }
00186
00187 void CorbaTest::testPortBufferConnection()
00188 {
00189
00190
00191 BOOST_CHECK( mo->connected() );
00192 BOOST_CHECK( mi->connected() );
00193
00194 double value = 0;
00195
00196
00197 BOOST_CHECK_EQUAL( mi->read(value), NoData );
00198
00199
00200 ASSERT_PORT_SIGNALLING(mo->write(1.0), mi);
00201 ASSERT_PORT_SIGNALLING(mo->write(2.0), mi);
00202 ASSERT_PORT_SIGNALLING(mo->write(3.0), mi);
00203 ASSERT_PORT_SIGNALLING(mo->write(4.0), 0);
00204 BOOST_CHECK( mi->read(value) );
00205 BOOST_CHECK_EQUAL( 1.0, value );
00206 BOOST_CHECK( mi->read(value) );
00207 BOOST_CHECK_EQUAL( 2.0, value );
00208 BOOST_CHECK( mi->read(value) );
00209 BOOST_CHECK_EQUAL( 3.0, value );
00210 BOOST_CHECK_EQUAL( mi->read(value), OldData );
00211 }
00212
00213 void CorbaTest::testPortDisconnected()
00214 {
00215 BOOST_CHECK( !mo->connected() );
00216 BOOST_CHECK( !mi->connected() );
00217 }
00218
00219
00220
00221 BOOST_FIXTURE_TEST_SUITE( CorbaIPCTestSuite, CorbaTest )
00222
00223
00224 BOOST_AUTO_TEST_CASE( testRemoteOperationCallerC )
00225 {
00226 tp = corba::TaskContextProxy::Create( "peerRMC", false );
00227 if (!tp )
00228 tp = corba::TaskContextProxy::CreateFromFile( "peerRMC.ior");
00229 BOOST_REQUIRE( tp );
00230
00231
00232 internal::OperationCallerC mc;
00233 double r = 0.0;
00234 mc = tp->provides("methods")->create("vm0", tc->engine() );
00235 BOOST_CHECK( mc.call() );
00236 BOOST_CHECK( r == 0.0 );
00237
00238 mc = tp->provides("methods")->create("m0", tc->engine() ).ret( r );
00239 BOOST_CHECK( mc.call() );
00240 BOOST_CHECK( r == -1.0 );
00241
00242 mc = tp->provides("methods")->create("m2", tc->engine()).argC(1).argC(2.0).ret( r );
00243 BOOST_CHECK( mc.call() );
00244 BOOST_CHECK( r == -3.0 );
00245
00246 mc = tp->provides("methods")->create("m3", tc->engine()).ret( r ).argC(1).argC(2.0).argC(true);
00247 BOOST_CHECK( mc.call() );
00248 BOOST_CHECK( r == -4.0 );
00249
00250 }
00251
00252 BOOST_AUTO_TEST_CASE( testRemoteOperationCaller )
00253 {
00254 tp = corba::TaskContextProxy::Create( "peerRM" , false);
00255 if (!tp )
00256 tp = corba::TaskContextProxy::CreateFromFile( "peerRM.ior");
00257 BOOST_REQUIRE(tp);
00258
00259
00260 RTT::OperationCaller<double(void)> m0 = tp->provides("methods")->getOperation("m0");
00261 RTT::OperationCaller<double(int)> m1 = tp->provides("methods")->getOperation("m1");
00262 RTT::OperationCaller<double(int,double)> m2 = tp->provides("methods")->getOperation("m2");
00263 RTT::OperationCaller<double(int,double,bool)> m3 = tp->provides("methods")->getOperation("m3");
00264 RTT::OperationCaller<double(int,double,bool,std::string)> m4 = tp->provides("methods")->getOperation("m4");
00265
00266 BOOST_CHECK_EQUAL( -1.0, m0() );
00267 BOOST_CHECK_EQUAL( -2.0, m1(1) );
00268 BOOST_CHECK_EQUAL( -3.0, m2(1, 2.0) );
00269 BOOST_CHECK_EQUAL( -4.0, m3(1, 2.0, true) );
00270 BOOST_CHECK_EQUAL( -5.0, m4(1, 2.0, true,"hello") );
00271 }
00272
00277 BOOST_AUTO_TEST_CASE( testRemoteOperationCallerCallback )
00278 {
00279 tp = corba::TaskContextProxy::Create( "peerRMCb" , false);
00280 if (!tp )
00281 tp = corba::TaskContextProxy::CreateFromFile( "peerRMC.ior");
00282 BOOST_REQUIRE(tp);
00283
00284 BOOST_REQUIRE( RTT::internal::DataSourceTypeInfo<TaskContext*>::getTypeInfo() != 0 );
00285 BOOST_REQUIRE( RTT::internal::DataSourceTypeInfo<TaskContext*>::getTypeInfo() != RTT::internal::DataSourceTypeInfo<UnknownType>::getTypeInfo());
00286 BOOST_REQUIRE( RTT::internal::DataSourceTypeInfo<TaskContext*>::getTypeInfo()->getProtocol(ORO_CORBA_PROTOCOL_ID) != 0 );
00287
00288 this->callBackPeer(tp, "callBackPeer");
00289 sleep(1);
00290 BOOST_CHECK( is_calling );
00291 BOOST_CHECK( is_sending );
00292 BOOST_CHECK( handle.ready() );
00293 BOOST_CHECK_EQUAL( handle.collectIfDone(), SendSuccess );
00294 }
00295
00296 BOOST_AUTO_TEST_CASE( testAnyOperationCaller )
00297 {
00298 double d;
00299 tp = corba::TaskContextProxy::Create( "peerAM" , false);
00300 if (!tp )
00301 tp = corba::TaskContextProxy::CreateFromFile( "peerAM.ior");
00302
00303 BOOST_REQUIRE(tp);
00304 s = tp->server();
00305 BOOST_REQUIRE( s );
00306
00307 corba::CService_var co = s->getProvider("methods");
00308 BOOST_CHECK( co.in() );
00309
00310 corba::CAnyArguments_var any_args = new corba::CAnyArguments(0);
00311 CORBA::Any_var vm0 = co->callOperation("vm0", any_args.inout() );
00312
00313
00314 CORBA::Any_var m0 = co->callOperation("m0", any_args.inout());
00315 BOOST_CHECK( m0 >>= d );
00316 BOOST_CHECK_EQUAL(d, -1.0 );
00317
00318 any_args = new corba::CAnyArguments(1);
00319 any_args->length(1);
00320 unsigned int index = 0;
00321 any_args[index] <<= (CORBA::Long) 1;
00322 CORBA::Any_var m1;
00323 BOOST_CHECK_NO_THROW( m1 = co->callOperation("m1", any_args.inout()));
00324 BOOST_CHECK( m1 >>= d );
00325 BOOST_CHECK_EQUAL(d, -2.0 );
00326
00327
00328 any_args = new corba::CAnyArguments(2);
00329 any_args->length(2);
00330 index = 0;
00331 any_args[index] <<= (CORBA::Long) 1;
00332 ++index;
00333 any_args[index] <<= (CORBA::Double) 2.0;
00334 CORBA::Any_var m2;
00335 BOOST_CHECK_NO_THROW( m2 = co->callOperation("m2", any_args.inout()));
00336 BOOST_CHECK( m2 >>= d );
00337 BOOST_CHECK_EQUAL(d, -3.0 );
00338
00339 any_args = new corba::CAnyArguments(3);
00340 any_args->length(3);
00341 index = 0;
00342 any_args[index] <<= (CORBA::Long) 1;
00343 ++index;
00344 any_args[index] <<= (CORBA::Double) 2.0;
00345 ++index;
00346 any_args[index] <<= CORBA::Any::from_boolean( true );
00347 CORBA::Any_var m3;
00348 BOOST_CHECK_NO_THROW( m3= co->callOperation("m3", any_args.inout()) );
00349 BOOST_CHECK( m3 >>= d );
00350 BOOST_CHECK_EQUAL(d, -4.0 );
00351
00352 any_args = new corba::CAnyArguments(4);
00353 any_args->length(4);
00354 index = 0;
00355 any_args[index] <<= (CORBA::Long) 1;
00356 ++index;
00357 any_args[index] <<= (CORBA::Double) 2.0;
00358 ++index;
00359 any_args[index] <<= CORBA::Any::from_boolean( true );
00360 ++index;
00361 any_args[index] <<= "hello";
00362 CORBA::Any_var m4;
00363 BOOST_CHECK_NO_THROW ( m4 = co->callOperation("m4", any_args.inout()) );
00364 BOOST_CHECK( m4 >>= d );
00365 BOOST_CHECK_EQUAL(d, -5.0 );
00366 }
00367
00368 BOOST_AUTO_TEST_CASE(testDataFlowInterface)
00369 {
00370 tp = corba::TaskContextProxy::Create( "peerDFI" , false);
00371 if (!tp )
00372 tp = corba::TaskContextProxy::CreateFromFile( "peerDFI.ior");
00373
00374 corba::CDataFlowInterface_var ports = tp->server()->ports();
00375
00376 corba::CDataFlowInterface::CPortNames_var names =
00377 ports->getPorts();
00378
00379 BOOST_CHECK_EQUAL(CORBA::ULong(2), names->length());
00380 BOOST_CHECK_EQUAL(string("mi"), string(names[CORBA::ULong(0)]));
00381 BOOST_CHECK_EQUAL(string("mo"), string(names[CORBA::ULong(1)]));
00382
00383
00384 BOOST_CHECK_EQUAL(RTT::corba::COutput,
00385 ports->getPortType("mo"));
00386 BOOST_CHECK_EQUAL(RTT::corba::CInput,
00387 ports->getPortType("mi"));
00388
00389
00390 CORBA::String_var cstr = ports->getDataType("mo");
00391 BOOST_CHECK_EQUAL(string("double"),
00392 string(cstr.in()));
00393 }
00394
00395 BOOST_AUTO_TEST_CASE( testPortConnections )
00396 {
00397
00398 tp = corba::TaskContextProxy::Create( "peerPC", false);
00399 if (!tp )
00400 tp = corba::TaskContextProxy::CreateFromFile( "peerPC.ior");
00401
00402 s = tp->server();
00403
00404 ts2 = corba::TaskContextServer::Create( tc, false );
00405 s2 = ts2->server();
00406
00407
00408 RTT::corba::CConnPolicy policy = toCORBA(ConnPolicy::data());
00409 policy.init = false;
00410 policy.transport = ORO_CORBA_PROTOCOL_ID;
00411
00412 corba::CDataFlowInterface_var ports = s->ports();
00413 corba::CDataFlowInterface_var ports2 = s2->ports();
00414
00415
00416 BOOST_CHECK_THROW( ports->createConnection("mo", ports2, "does_not_exist", policy), CNoSuchPortException );
00417 BOOST_CHECK_THROW( ports->createConnection("does_not_exist", ports2, "mi", policy), CNoSuchPortException );
00418 BOOST_CHECK_THROW( ports->createConnection("does_not_exist", ports2, "does_not_exist", policy), CNoSuchPortException );
00419 BOOST_CHECK_THROW( ports->createConnection("mo", ports2, "mo", policy), CNoSuchPortException );
00420 BOOST_CHECK_THROW( ports->createConnection("mi", ports2, "mi", policy), CNoSuchPortException );
00421 BOOST_CHECK_THROW( ports->createConnection("mi", ports2, "mo", policy), CNoSuchPortException );
00422
00423
00424 BOOST_CHECK( tc->start() );
00425
00426
00427
00428
00429
00430
00431 policy.type = RTT::corba::CData;
00432 policy.pull = false;
00433 BOOST_CHECK( ports->createConnection("mo", ports2, "mi", policy) );
00434 BOOST_CHECK( ports2->createConnection("mo", ports, "mi", policy) );
00435 testPortDataConnection();
00436 ports->disconnectPort("mo");
00437 ports->disconnectPort("mi");
00438 testPortDisconnected();
00439
00440 return;
00441
00442 policy.type = RTT::corba::CData;
00443 policy.pull = true;
00444 BOOST_CHECK( ports2->createConnection("mo", ports, "mi", policy) );
00445 BOOST_CHECK( ports2->createConnection("mo", ports, "mi", policy) );
00446 testPortDataConnection();
00447 ports2->disconnectPort("mi");
00448 ports2->disconnectPort("mo");
00449 testPortDisconnected();
00450
00451 policy.type = RTT::corba::CBuffer;
00452 policy.pull = false;
00453 policy.size = 3;
00454 BOOST_CHECK( ports->createConnection("mo", ports2, "mi", policy) );
00455 BOOST_CHECK( ports2->createConnection("mo", ports, "mi", policy) );
00456 testPortBufferConnection();
00457 ports->disconnectPort("mo");
00458 ports->disconnectPort("mi");
00459 testPortDisconnected();
00460
00461 policy.type = RTT::corba::CBuffer;
00462 policy.pull = true;
00463 BOOST_CHECK( ports->createConnection("mo", ports2, "mi", policy) );
00464 BOOST_CHECK( ports2->createConnection("mo", ports, "mi", policy) );
00465 testPortBufferConnection();
00466 ports2->disconnectPort("mo");
00467 ports2->disconnectPort("mi");
00468 testPortDisconnected();
00469
00470 #if 0
00471
00472
00473 mo->createConnection(*mi);
00474
00475 ports->removeConnection("mo", ports2, "mi");
00476 ports->removeConnection("mi", ports2, "mo");
00477
00478 BOOST_CHECK(mo->connected());
00479 BOOST_CHECK(mi->connected());
00480 BOOST_CHECK(!mi->connected());
00481 #endif
00482 }
00483
00484 BOOST_AUTO_TEST_CASE( testPortProxying )
00485 {
00486
00487 tp = corba::TaskContextProxy::Create( "peerPP" , false);
00488 if (!tp )
00489 tp = corba::TaskContextProxy::CreateFromFile( "peerPP.ior");
00490
00491 base::PortInterface* untyped_port;
00492
00493 untyped_port = tp->ports()->getPort("mi");
00494 BOOST_CHECK(untyped_port);
00495 base::InputPortInterface* read_port = dynamic_cast<base::InputPortInterface*>(tp->ports()->getPort("mi"));
00496 BOOST_CHECK(read_port);
00497
00498 untyped_port = tp->ports()->getPort("mo");
00499 BOOST_CHECK(untyped_port);
00500 base::OutputPortInterface* write_port = dynamic_cast<base::OutputPortInterface*>(tp->ports()->getPort("mo"));
00501 BOOST_CHECK(write_port);
00502
00503
00504
00505 BOOST_CHECK(dynamic_cast<corba::RemoteInputPort*>(read_port));
00506 BOOST_CHECK(dynamic_cast<corba::RemoteOutputPort*>(write_port));
00507
00508 BOOST_CHECK(!read_port->connected());
00509 BOOST_CHECK(read_port->getTypeInfo() == mi->getTypeInfo());
00510 BOOST_CHECK(!write_port->connected());
00511 BOOST_CHECK(write_port->getTypeInfo() == mo->getTypeInfo());
00512
00513 mo->createConnection(*read_port);
00514 write_port->createConnection(*mi);
00515 BOOST_CHECK(read_port->connected());
00516 BOOST_CHECK(write_port->connected());
00517
00518
00519
00520 read_port->disconnect();
00521 write_port->disconnect();
00522 BOOST_CHECK(!read_port->connected());
00523 BOOST_CHECK(!write_port->connected());
00524
00525 mo->createConnection(*read_port);
00526 write_port->createConnection(*mi);
00527 BOOST_CHECK(read_port->connected());
00528 BOOST_CHECK(write_port->connected());
00529 write_port->disconnect();
00530 read_port->disconnect();
00531 BOOST_CHECK(!read_port->connected());
00532 BOOST_CHECK(!write_port->connected());
00533
00534
00535 auto_ptr<base::InputPortInterface> read_clone(dynamic_cast<base::InputPortInterface*>(read_port->clone()));
00536 BOOST_CHECK(mo->createConnection(*read_clone));
00537 BOOST_CHECK(read_clone->connected());
00538 BOOST_CHECK(!read_port->connected());
00539 mo->disconnect();
00540 }
00541
00542 BOOST_AUTO_TEST_CASE( testDataHalfs )
00543 {
00544 if(std::getenv("CI") != NULL) {
00545 BOOST_TEST_MESSAGE("Skipping testAffinity because it can fail on integration servers.");
00546 return;
00547 }
00548
00549 double result;
00550
00551 tp = corba::TaskContextProxy::Create( "peerDH" , false);
00552 if (!tp )
00553 tp = corba::TaskContextProxy::CreateFromFile( "peerDH.ior");
00554
00555 s = tp->server();
00556
00557
00558 RTT::corba::CConnPolicy policy = toCORBA(ConnPolicy::data());
00559 policy.init = false;
00560 policy.transport = ORO_CORBA_PROTOCOL_ID;
00561
00562 corba::CDataFlowInterface_var ports = s->ports();
00563 BOOST_REQUIRE( ports.in() );
00564
00565
00566 policy.pull = false;
00567 mo->connectTo( tp->ports()->getPort("mi"), toRTT(policy) );
00568 CChannelElement_var cce = ports->buildChannelInput("mo", policy);
00569 CORBA::Any_var sample;
00570 BOOST_REQUIRE( cce.in() );
00571
00572
00573 mo->write( 3.33 );
00574 wait_for_equal( cce->read( sample.out(), true), CNewData, 5 );
00575 sample >>= result;
00576 BOOST_CHECK_EQUAL( result, 3.33);
00577
00578
00579 sample <<= 0.0;
00580 BOOST_CHECK_EQUAL( cce->read( sample.out(), true), COldData );
00581 sample >>= result;
00582 BOOST_CHECK_EQUAL( result, 3.33);
00583
00584 cce->disconnect();
00585 mo->disconnect();
00586
00587
00588 cce = ports->buildChannelOutput("mi", policy);
00589 ports->channelReady("mi", cce, policy);
00590
00591 mi->connectTo( tp->ports()->getPort("mo"), toRTT(policy) );
00592 sample = new CORBA::Any();
00593 BOOST_REQUIRE( cce.in() );
00594
00595
00596 result = 0.0;
00597 sample <<= 4.44;
00598 cce->write( sample.in() );
00599 wait_for_equal( mi->read( result ), NewData, 5 );
00600 BOOST_CHECK_EQUAL( result, 4.44 );
00601
00602
00603 result = 0.0;
00604 BOOST_CHECK_EQUAL( mi->read( result ), OldData );
00605 BOOST_CHECK_EQUAL( result, 4.44);
00606 }
00607
00608 BOOST_AUTO_TEST_CASE( testBufferHalfs )
00609 {
00610 if(std::getenv("CI") != NULL) {
00611 BOOST_TEST_MESSAGE("Skipping testAffinity because it can fail on integration servers.");
00612 return;
00613 }
00614
00615 double result;
00616
00617
00618 tp = corba::TaskContextProxy::Create( "peerBH" , false);
00619 if (!tp )
00620 tp = corba::TaskContextProxy::CreateFromFile( "peerBH.ior");
00621
00622 s = tp->server();
00623
00624
00625 RTT::corba::CConnPolicy policy = toCORBA(ConnPolicy::buffer(10));
00626 policy.init = false;
00627 policy.transport = ORO_CORBA_PROTOCOL_ID;
00628
00629 corba::CDataFlowInterface_var ports = s->ports();
00630 BOOST_REQUIRE( ports.in() );
00631
00632
00633 policy.pull = false;
00634 mo->connectTo( tp->ports()->getPort("mi"), toRTT(policy) );
00635 CChannelElement_var cce = ports->buildChannelInput("mo", policy);
00636 CORBA::Any_var sample;
00637 BOOST_REQUIRE( cce.in() );
00638
00639
00640 mo->write( 6.33 );
00641 mo->write( 3.33 );
00642 wait_for_equal( cce->read( sample.out(), true), CNewData, 5 );
00643 sample >>= result;
00644 BOOST_CHECK_EQUAL( result, 6.33);
00645 wait_for_equal( cce->read( sample.out(), true ), CNewData, 10 );
00646 sample >>= result;
00647 BOOST_CHECK_EQUAL( result, 3.33);
00648
00649
00650 sample <<= 0.0;
00651 BOOST_CHECK_EQUAL( cce->read( sample.out(), true ), COldData );
00652 sample >>= result;
00653 BOOST_CHECK_EQUAL( result, 3.33);
00654
00655 cce->disconnect();
00656 mo->disconnect();
00657
00658
00659 mi->connectTo( tp->ports()->getPort("mo"), toRTT(policy) );
00660 cce = ports->buildChannelOutput("mi", policy);
00661 ports->channelReady("mi", cce, policy);
00662 sample = new CORBA::Any();
00663 BOOST_REQUIRE( cce.in() );
00664
00665
00666 result = 0.0;
00667 sample <<= 6.44;
00668 cce->write( sample.in() );
00669 sample <<= 4.44;
00670 cce->write( sample.in() );
00671 wait_for_equal( mi->read( result ), NewData, 5 );
00672 BOOST_CHECK_EQUAL( result, 6.44 );
00673 wait_for_equal( mi->read( result ), NewData, 5 );
00674 BOOST_CHECK_EQUAL( result, 4.44 );
00675
00676
00677 result = 0.0;
00678 BOOST_CHECK_EQUAL( mi->read( result ), OldData );
00679 BOOST_CHECK_EQUAL( result, 4.44);
00680 }
00681
00682 BOOST_AUTO_TEST_SUITE_END()
00683