00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #include "unit.hpp"
00020
00021 #include <iostream>
00022
00023 #include <rtt/OperationCaller.hpp>
00024 #include <rtt/Service.hpp>
00025 #include <transports/corba/DataFlowI.h>
00026 #include <rtt/transports/corba/RemotePorts.hpp>
00027 #include <transports/corba/ServiceC.h>
00028 #include <transports/corba/corba.h>
00029 #include <transports/corba/CorbaConnPolicy.hpp>
00030 #include <rtt/InputPort.hpp>
00031 #include <rtt/OutputPort.hpp>
00032 #include <rtt/TaskContext.hpp>
00033 #include <transports/corba/TaskContextServer.hpp>
00034 #include <transports/corba/TaskContextProxy.hpp>
00035 #include <transports/corba/CorbaLib.hpp>
00036 #include <rtt/internal/DataSourceTypeInfo.hpp>
00037
00038 #include <string>
00039 #include <stdlib.h>
00040
00041 using namespace RTT;
00042 using namespace RTT::detail;
00043
00044 class CorbaTest : public TaskContext
00045 {
00046 public:
00047 CorbaTest() : TaskContext("CorbaTest") { this->setUp(); }
00048 ~CorbaTest() { this->tearDown(); }
00049
00050 TaskContext* tc;
00051 TaskContext* t2;
00052 TaskContextProxy* tp;
00053 corba::TaskContextServer* ts;
00054 TaskContext* tp2;
00055 corba::TaskContextServer* ts2;
00056 CTaskContext_ptr s;
00057 CTaskContext_ptr s2;
00058
00059 base::PortInterface* signalled_port;
00060 void new_data_listener(base::PortInterface* port);
00061
00062
00063 InputPort<double>* mi;
00064 OutputPort<double>* mo;
00065 bool is_calling, is_sending;
00066 SendHandle<void(TaskContext*, string const&)> handle;
00067
00068 int wait, cbcount;
00069
00070 void setUp();
00071 void tearDown();
00072
00073
00074 void testPortDataConnection();
00075 void testPortBufferConnection();
00076 void testPortDisconnected();
00077
00078 void callBackPeer(TaskContext* peer, string const& opname) {
00079 OperationCaller<void(TaskContext*, string const&)> op1( peer->getOperation(opname), this->engine());
00080 int count = ++cbcount;
00081 log(Info) << "Test executes callBackPeer():"<< count <<endlog();
00082 if (!is_calling) {
00083 is_calling = true;
00084 log(Info) << "Test calls server:" << count <<endlog();
00085 op1(this, "callBackPeer");
00086 log(Info) << "Test finishes server call:"<<count <<endlog();
00087 }
00088
00089 if (!is_sending) {
00090 is_sending = true;
00091 log(Info) << "Test sends server:"<<count <<endlog();
00092 handle = op1.send(this, "callBackPeerOwn");
00093 log(Info) << "Test finishes server send:"<< count <<endlog();
00094 }
00095 log(Info) << "Test finishes callBackPeer():"<< count <<endlog();
00096 }
00097
00098 };
00099
00100 using namespace std;
00101 using corba::TaskContextProxy;
00102
00103 void
00104 CorbaTest::setUp()
00105 {
00106
00107 mi = new InputPort<double>("mi");
00108 mo = new OutputPort<double>("mo");
00109
00110 tc = new TaskContext( "root" );
00111 tc->ports()->addEventPort( *mi,boost::bind(&CorbaTest::new_data_listener, this, _1) );
00112 tc->ports()->addPort( *mo );
00113
00114 t2 = 0;
00115 ts2 = ts = 0;
00116 tp2 = tp = 0;
00117 wait = cbcount = 0;
00118 is_calling = false, is_sending = false;
00119
00120 addOperation("callBackPeer", &CorbaTest::callBackPeer, this,ClientThread);
00121 addOperation("callBackPeerOwn", &CorbaTest::callBackPeer, this,OwnThread);
00122 }
00123
00124
00125 void
00126 CorbaTest::tearDown()
00127 {
00128 delete tp;
00129 delete ts;
00130 delete tp2;
00131 delete ts2;
00132 delete tc;
00133 delete t2;
00134
00135 delete mi;
00136 delete mo;
00137 }
00138
00139 void CorbaTest::new_data_listener(base::PortInterface* port)
00140 {
00141 signalled_port = port;
00142 }
00143
00144
00145 #define ASSERT_PORT_SIGNALLING(code, read_port) \
00146 signalled_port = 0; wait = 0;\
00147 code; \
00148 while (read_port != signalled_port && wait++ != 5) \
00149 usleep(100000); \
00150 BOOST_CHECK( read_port == signalled_port );
00151
00152 bool wait_for_helper;
00153 #define wait_for( cond, times ) \
00154 wait = 0; \
00155 while( (wait_for_helper = !(cond)) && wait++ != times ) \
00156 usleep(100000); \
00157 if (wait_for_helper) BOOST_CHECK( cond );
00158
00159 #define wait_for_equal( a, b, times ) \
00160 wait = 0; \
00161 while( (wait_for_helper = ((a) != (b))) && wait++ != times ) \
00162 usleep(100000); \
00163 if (wait_for_helper) BOOST_CHECK_EQUAL( a, b );
00164
00165
00166 void CorbaTest::testPortDataConnection()
00167 {
00168
00169
00170 BOOST_CHECK( mo->connected() );
00171 BOOST_CHECK( mi->connected() );
00172
00173 double value = 0;
00174
00175
00176 BOOST_CHECK_EQUAL( mi->read(value), NoData );
00177
00178
00179 ASSERT_PORT_SIGNALLING(mo->write(1.0), mi)
00180 BOOST_CHECK( mi->read(value) );
00181 BOOST_CHECK_EQUAL( 1.0, value );
00182 ASSERT_PORT_SIGNALLING(mo->write(2.0), mi);
00183 BOOST_CHECK( mi->read(value) );
00184 BOOST_CHECK_EQUAL( 2.0, value );
00185 }
00186
00187 void CorbaTest::testPortBufferConnection()
00188 {
00189
00190
00191 BOOST_CHECK( mo->connected() );
00192 BOOST_CHECK( mi->connected() );
00193
00194 double value = 0;
00195
00196
00197 BOOST_CHECK_EQUAL( mi->read(value), NoData );
00198
00199
00200 ASSERT_PORT_SIGNALLING(mo->write(1.0), mi);
00201 ASSERT_PORT_SIGNALLING(mo->write(2.0), mi);
00202 ASSERT_PORT_SIGNALLING(mo->write(3.0), mi);
00203 ASSERT_PORT_SIGNALLING(mo->write(4.0), 0);
00204 BOOST_CHECK( mi->read(value) );
00205 BOOST_CHECK_EQUAL( 1.0, value );
00206 BOOST_CHECK( mi->read(value) );
00207 BOOST_CHECK_EQUAL( 2.0, value );
00208 BOOST_CHECK( mi->read(value) );
00209 BOOST_CHECK_EQUAL( 3.0, value );
00210 BOOST_CHECK_EQUAL( mi->read(value), OldData );
00211 }
00212
00213 void CorbaTest::testPortDisconnected()
00214 {
00215 BOOST_CHECK( !mo->connected() );
00216 BOOST_CHECK( !mi->connected() );
00217 }
00218
00219
00220
00221 BOOST_FIXTURE_TEST_SUITE( CorbaIPCTestSuite, CorbaTest )
00222
00223
00224 BOOST_AUTO_TEST_CASE( testRemoteOperationCallerC )
00225 {
00226 tp = corba::TaskContextProxy::Create( "peerRMC", false );
00227 if (!tp )
00228 tp = corba::TaskContextProxy::CreateFromFile( "peerRMC.ior");
00229 BOOST_REQUIRE( tp );
00230
00231
00232 internal::OperationCallerC mc;
00233 double r = 0.0;
00234 mc = tp->provides("methods")->create("vm0", tc->engine() );
00235 BOOST_CHECK( mc.call() );
00236 BOOST_CHECK( r == 0.0 );
00237
00238 mc = tp->provides("methods")->create("m0", tc->engine() ).ret( r );
00239 BOOST_CHECK( mc.call() );
00240 BOOST_CHECK( r == -1.0 );
00241
00242 mc = tp->provides("methods")->create("m2", tc->engine()).argC(1).argC(2.0).ret( r );
00243 BOOST_CHECK( mc.call() );
00244 BOOST_CHECK( r == -3.0 );
00245
00246 mc = tp->provides("methods")->create("m3", tc->engine()).ret( r ).argC(1).argC(2.0).argC(true);
00247 BOOST_CHECK( mc.call() );
00248 BOOST_CHECK( r == -4.0 );
00249
00250 }
00251
00252 BOOST_AUTO_TEST_CASE( testRemoteOperationCaller )
00253 {
00254 tp = corba::TaskContextProxy::Create( "peerRM" , false);
00255 if (!tp )
00256 tp = corba::TaskContextProxy::CreateFromFile( "peerRM.ior");
00257 BOOST_REQUIRE(tp);
00258
00259
00260 RTT::OperationCaller<double(void)> m0 = tp->provides("methods")->getOperation("m0");
00261 RTT::OperationCaller<double(int)> m1 = tp->provides("methods")->getOperation("m1");
00262 RTT::OperationCaller<double(int,double)> m2 = tp->provides("methods")->getOperation("m2");
00263 RTT::OperationCaller<double(int,double,bool)> m3 = tp->provides("methods")->getOperation("m3");
00264 RTT::OperationCaller<double(int,double,bool,std::string)> m4 = tp->provides("methods")->getOperation("m4");
00265
00266 BOOST_CHECK_EQUAL( -1.0, m0() );
00267 BOOST_CHECK_EQUAL( -2.0, m1(1) );
00268 BOOST_CHECK_EQUAL( -3.0, m2(1, 2.0) );
00269 BOOST_CHECK_EQUAL( -4.0, m3(1, 2.0, true) );
00270 BOOST_CHECK_EQUAL( -5.0, m4(1, 2.0, true,"hello") );
00271 }
00272
00277 BOOST_AUTO_TEST_CASE( testRemoteOperationCallerCallback )
00278 {
00279 tp = corba::TaskContextProxy::Create( "peerRMCb" , false);
00280 if (!tp )
00281 tp = corba::TaskContextProxy::CreateFromFile( "peerRMC.ior");
00282 BOOST_REQUIRE(tp);
00283
00284 BOOST_REQUIRE( RTT::internal::DataSourceTypeInfo<TaskContext*>::getTypeInfo() != 0 );
00285 BOOST_REQUIRE( RTT::internal::DataSourceTypeInfo<TaskContext*>::getTypeInfo() != RTT::internal::DataSourceTypeInfo<UnknownType>::getTypeInfo());
00286 BOOST_REQUIRE( RTT::internal::DataSourceTypeInfo<TaskContext*>::getTypeInfo()->getProtocol(ORO_CORBA_PROTOCOL_ID) != 0 );
00287
00288 this->callBackPeer(tp, "callBackPeer");
00289 sleep(1);
00290 BOOST_CHECK( is_calling );
00291 BOOST_CHECK( is_sending );
00292 BOOST_CHECK( handle.ready() );
00293 BOOST_CHECK_EQUAL( handle.collectIfDone(), SendSuccess );
00294 }
00295
00296 BOOST_AUTO_TEST_CASE( testAnyOperationCaller )
00297 {
00298 double d;
00299 tp = corba::TaskContextProxy::Create( "peerAM" , false);
00300 if (!tp )
00301 tp = corba::TaskContextProxy::CreateFromFile( "peerAM.ior");
00302
00303 BOOST_REQUIRE(tp);
00304 s = tp->server();
00305 BOOST_REQUIRE( s );
00306
00307 corba::CService_var co = s->getProvider("methods");
00308 BOOST_CHECK( co.in() );
00309
00310 corba::CAnyArguments_var any_args = new corba::CAnyArguments(0);
00311 CORBA::Any_var vm0 = co->callOperation("vm0", any_args.inout() );
00312
00313
00314 CORBA::Any_var m0 = co->callOperation("m0", any_args.inout());
00315 BOOST_CHECK( m0 >>= d );
00316 BOOST_CHECK_EQUAL(d, -1.0 );
00317
00318 any_args = new corba::CAnyArguments(1);
00319 any_args->length(1);
00320 unsigned int index = 0;
00321 any_args[index] <<= (CORBA::Long) 1;
00322 CORBA::Any_var m1;
00323 BOOST_CHECK_NO_THROW( m1 = co->callOperation("m1", any_args.inout()));
00324 BOOST_CHECK( m1 >>= d );
00325 BOOST_CHECK_EQUAL(d, -2.0 );
00326
00327
00328 any_args = new corba::CAnyArguments(2);
00329 any_args->length(2);
00330 index = 0;
00331 any_args[index] <<= (CORBA::Long) 1;
00332 ++index;
00333 any_args[index] <<= (CORBA::Double) 2.0;
00334 CORBA::Any_var m2;
00335 BOOST_CHECK_NO_THROW( m2 = co->callOperation("m2", any_args.inout()));
00336 BOOST_CHECK( m2 >>= d );
00337 BOOST_CHECK_EQUAL(d, -3.0 );
00338
00339 any_args = new corba::CAnyArguments(3);
00340 any_args->length(3);
00341 index = 0;
00342 any_args[index] <<= (CORBA::Long) 1;
00343 ++index;
00344 any_args[index] <<= (CORBA::Double) 2.0;
00345 ++index;
00346 any_args[index] <<= CORBA::Any::from_boolean( true );
00347 CORBA::Any_var m3;
00348 BOOST_CHECK_NO_THROW( m3= co->callOperation("m3", any_args.inout()) );
00349 BOOST_CHECK( m3 >>= d );
00350 BOOST_CHECK_EQUAL(d, -4.0 );
00351
00352 any_args = new corba::CAnyArguments(4);
00353 any_args->length(4);
00354 index = 0;
00355 any_args[index] <<= (CORBA::Long) 1;
00356 ++index;
00357 any_args[index] <<= (CORBA::Double) 2.0;
00358 ++index;
00359 any_args[index] <<= CORBA::Any::from_boolean( true );
00360 ++index;
00361 any_args[index] <<= "hello";
00362 CORBA::Any_var m4;
00363 BOOST_CHECK_NO_THROW ( m4 = co->callOperation("m4", any_args.inout()) );
00364 BOOST_CHECK( m4 >>= d );
00365 BOOST_CHECK_EQUAL(d, -5.0 );
00366 }
00367
00368 BOOST_AUTO_TEST_CASE(testDataFlowInterface)
00369 {
00370 tp = corba::TaskContextProxy::Create( "peerDFI" , false);
00371 if (!tp )
00372 tp = corba::TaskContextProxy::CreateFromFile( "peerDFI.ior");
00373
00374 corba::CDataFlowInterface_var ports = tp->server()->ports();
00375
00376 corba::CDataFlowInterface::CPortNames_var names =
00377 ports->getPorts();
00378
00379 BOOST_CHECK_EQUAL(CORBA::ULong(2), names->length());
00380 BOOST_CHECK_EQUAL(string("mi"), string(names[CORBA::ULong(0)]));
00381 BOOST_CHECK_EQUAL(string("mo"), string(names[CORBA::ULong(1)]));
00382
00383
00384 BOOST_CHECK_EQUAL(RTT::corba::COutput,
00385 ports->getPortType("mo"));
00386 BOOST_CHECK_EQUAL(RTT::corba::CInput,
00387 ports->getPortType("mi"));
00388
00389
00390 CORBA::String_var cstr = ports->getDataType("mo");
00391 BOOST_CHECK_EQUAL(string("double"),
00392 string(cstr.in()));
00393 }
00394
00395 BOOST_AUTO_TEST_CASE( testPortConnections )
00396 {
00397
00398 tp = corba::TaskContextProxy::Create( "peerPC" , false);
00399 if (!tp )
00400 tp = corba::TaskContextProxy::CreateFromFile( "peerPC.ior");
00401
00402 s = tp->server();
00403
00404 ts2 = corba::TaskContextServer::Create( tc, false );
00405 s2 = ts2->server();
00406
00407
00408 RTT::corba::CConnPolicy policy;
00409 policy.type = RTT::corba::CData;
00410 policy.init = false;
00411 policy.lock_policy = RTT::corba::CLockFree;
00412 policy.size = 0;
00413 policy.transport = ORO_CORBA_PROTOCOL_ID;
00414
00415 corba::CDataFlowInterface_var ports = s->ports();
00416 corba::CDataFlowInterface_var ports2 = s2->ports();
00417
00418
00419 BOOST_CHECK_THROW( ports->createConnection("mo", ports2, "does_not_exist", policy), CNoSuchPortException );
00420 BOOST_CHECK_THROW( ports->createConnection("does_not_exist", ports2, "mi", policy), CNoSuchPortException );
00421 BOOST_CHECK_THROW( ports->createConnection("does_not_exist", ports2, "does_not_exist", policy), CNoSuchPortException );
00422 BOOST_CHECK_THROW( ports->createConnection("mo", ports2, "mo", policy), CNoSuchPortException );
00423 BOOST_CHECK_THROW( ports->createConnection("mi", ports2, "mi", policy), CNoSuchPortException );
00424 BOOST_CHECK_THROW( ports->createConnection("mi", ports2, "mo", policy), CNoSuchPortException );
00425
00426
00427 BOOST_CHECK( tc->start() );
00428
00429
00430
00431
00432
00433
00434 policy.type = RTT::corba::CData;
00435 policy.pull = false;
00436 BOOST_CHECK( ports->createConnection("mo", ports2, "mi", policy) );
00437 BOOST_CHECK( ports2->createConnection("mo", ports, "mi", policy) );
00438 testPortDataConnection();
00439 ports->disconnectPort("mo");
00440 ports->disconnectPort("mi");
00441 testPortDisconnected();
00442
00443 return;
00444
00445 policy.type = RTT::corba::CData;
00446 policy.pull = true;
00447 BOOST_CHECK( ports2->createConnection("mo", ports, "mi", policy) );
00448 BOOST_CHECK( ports2->createConnection("mo", ports, "mi", policy) );
00449 testPortDataConnection();
00450 ports2->disconnectPort("mi");
00451 ports2->disconnectPort("mo");
00452 testPortDisconnected();
00453
00454 policy.type = RTT::corba::CBuffer;
00455 policy.pull = false;
00456 policy.size = 3;
00457 BOOST_CHECK( ports->createConnection("mo", ports2, "mi", policy) );
00458 BOOST_CHECK( ports2->createConnection("mo", ports, "mi", policy) );
00459 testPortBufferConnection();
00460 ports->disconnectPort("mo");
00461 ports->disconnectPort("mi");
00462 testPortDisconnected();
00463
00464 policy.type = RTT::corba::CBuffer;
00465 policy.pull = true;
00466 BOOST_CHECK( ports->createConnection("mo", ports2, "mi", policy) );
00467 BOOST_CHECK( ports2->createConnection("mo", ports, "mi", policy) );
00468 testPortBufferConnection();
00469 ports2->disconnectPort("mo");
00470 ports2->disconnectPort("mi");
00471 testPortDisconnected();
00472
00473 #if 0
00474
00475
00476 mo->createConnection(*mi);
00477
00478 ports->removeConnection("mo", ports2, "mi");
00479 ports->removeConnection("mi", ports2, "mo");
00480
00481 BOOST_CHECK(mo->connected());
00482 BOOST_CHECK(mi->connected());
00483 BOOST_CHECK(!mi->connected());
00484 #endif
00485 }
00486
00487 BOOST_AUTO_TEST_CASE( testPortProxying )
00488 {
00489
00490 tp = corba::TaskContextProxy::Create( "peerPP" , false);
00491 if (!tp )
00492 tp = corba::TaskContextProxy::CreateFromFile( "peerPP.ior");
00493
00494 base::PortInterface* untyped_port;
00495
00496 untyped_port = tp->ports()->getPort("mi");
00497 BOOST_CHECK(untyped_port);
00498 base::InputPortInterface* read_port = dynamic_cast<base::InputPortInterface*>(tp->ports()->getPort("mi"));
00499 BOOST_CHECK(read_port);
00500
00501 untyped_port = tp->ports()->getPort("mo");
00502 BOOST_CHECK(untyped_port);
00503 base::OutputPortInterface* write_port = dynamic_cast<base::OutputPortInterface*>(tp->ports()->getPort("mo"));
00504 BOOST_CHECK(write_port);
00505
00506
00507
00508 BOOST_CHECK(dynamic_cast<corba::RemoteInputPort*>(read_port));
00509 BOOST_CHECK(dynamic_cast<corba::RemoteOutputPort*>(write_port));
00510
00511 BOOST_CHECK(!read_port->connected());
00512 BOOST_CHECK(read_port->getTypeInfo() == mi->getTypeInfo());
00513 BOOST_CHECK(!write_port->connected());
00514 BOOST_CHECK(write_port->getTypeInfo() == mo->getTypeInfo());
00515
00516 mo->createConnection(*read_port);
00517 write_port->createConnection(*mi);
00518 BOOST_CHECK(read_port->connected());
00519 BOOST_CHECK(write_port->connected());
00520
00521
00522
00523 read_port->disconnect();
00524 write_port->disconnect();
00525 BOOST_CHECK(!read_port->connected());
00526 BOOST_CHECK(!write_port->connected());
00527
00528 mo->createConnection(*read_port);
00529 write_port->createConnection(*mi);
00530 BOOST_CHECK(read_port->connected());
00531 BOOST_CHECK(write_port->connected());
00532 write_port->disconnect();
00533 read_port->disconnect();
00534 BOOST_CHECK(!read_port->connected());
00535 BOOST_CHECK(!write_port->connected());
00536
00537
00538 auto_ptr<base::InputPortInterface> read_clone(dynamic_cast<base::InputPortInterface*>(read_port->clone()));
00539 BOOST_CHECK(mo->createConnection(*read_clone));
00540 BOOST_CHECK(read_clone->connected());
00541 BOOST_CHECK(!read_port->connected());
00542 mo->disconnect();
00543 }
00544
00545 BOOST_AUTO_TEST_CASE( testDataHalfs )
00546 {
00547 if(std::getenv("CI") != NULL) {
00548 BOOST_TEST_MESSAGE("Skipping testAffinity because it can fail on integration servers.");
00549 return;
00550 }
00551
00552 double result;
00553
00554 tp = corba::TaskContextProxy::Create( "peerDH" , false);
00555 if (!tp )
00556 tp = corba::TaskContextProxy::CreateFromFile( "peerDH.ior");
00557
00558 s = tp->server();
00559
00560
00561 RTT::corba::CConnPolicy policy;
00562 policy.type = RTT::corba::CData;
00563 policy.init = false;
00564 policy.lock_policy = RTT::corba::CLockFree;
00565 policy.size = 0;
00566 policy.transport = ORO_CORBA_PROTOCOL_ID;
00567
00568 corba::CDataFlowInterface_var ports = s->ports();
00569 BOOST_REQUIRE( ports.in() );
00570
00571
00572 policy.pull = false;
00573 mo->connectTo( tp->ports()->getPort("mi"), toRTT(policy) );
00574 CChannelElement_var cce = ports->buildChannelInput("mo", policy);
00575 CORBA::Any_var sample;
00576 BOOST_REQUIRE( cce.in() );
00577
00578
00579 mo->write( 3.33 );
00580 wait_for_equal( cce->read( sample.out(), true), CNewData, 5 );
00581 sample >>= result;
00582 BOOST_CHECK_EQUAL( result, 3.33);
00583
00584
00585 sample <<= 0.0;
00586 BOOST_CHECK_EQUAL( cce->read( sample.out(), true), COldData );
00587 sample >>= result;
00588 BOOST_CHECK_EQUAL( result, 3.33);
00589
00590 cce->disconnect();
00591 mo->disconnect();
00592
00593
00594 cce = ports->buildChannelOutput("mi", policy);
00595 ports->channelReady("mi", cce);
00596
00597 mi->connectTo( tp->ports()->getPort("mo"), toRTT(policy) );
00598 sample = new CORBA::Any();
00599 BOOST_REQUIRE( cce.in() );
00600
00601
00602 result = 0.0;
00603 sample <<= 4.44;
00604 cce->write( sample.in() );
00605 wait_for_equal( mi->read( result ), NewData, 5 );
00606 BOOST_CHECK_EQUAL( result, 4.44 );
00607
00608
00609 result = 0.0;
00610 BOOST_CHECK_EQUAL( mi->read( result ), OldData );
00611 BOOST_CHECK_EQUAL( result, 4.44);
00612 }
00613
00614 BOOST_AUTO_TEST_CASE( testBufferHalfs )
00615 {
00616 if(std::getenv("CI") != NULL) {
00617 BOOST_TEST_MESSAGE("Skipping testAffinity because it can fail on integration servers.");
00618 return;
00619 }
00620
00621 double result;
00622
00623
00624 tp = corba::TaskContextProxy::Create( "peerBH" , false);
00625 if (!tp )
00626 tp = corba::TaskContextProxy::CreateFromFile( "peerBH.ior");
00627
00628 s = tp->server();
00629
00630
00631 RTT::corba::CConnPolicy policy;
00632 policy.type = RTT::corba::CBuffer;
00633 policy.init = false;
00634 policy.lock_policy = RTT::corba::CLockFree;
00635 policy.size = 10;
00636 policy.transport = ORO_CORBA_PROTOCOL_ID;
00637
00638 corba::CDataFlowInterface_var ports = s->ports();
00639 BOOST_REQUIRE( ports.in() );
00640
00641
00642 policy.pull = false;
00643 mo->connectTo( tp->ports()->getPort("mi"), toRTT(policy) );
00644 CChannelElement_var cce = ports->buildChannelInput("mo", policy);
00645 CORBA::Any_var sample;
00646 BOOST_REQUIRE( cce.in() );
00647
00648
00649 mo->write( 6.33 );
00650 mo->write( 3.33 );
00651 wait_for_equal( cce->read( sample.out(), true), CNewData, 5 );
00652 sample >>= result;
00653 BOOST_CHECK_EQUAL( result, 6.33);
00654 wait_for_equal( cce->read( sample.out(), true ), CNewData, 10 );
00655 sample >>= result;
00656 BOOST_CHECK_EQUAL( result, 3.33);
00657
00658
00659 sample <<= 0.0;
00660 BOOST_CHECK_EQUAL( cce->read( sample.out(), true ), COldData );
00661 sample >>= result;
00662 BOOST_CHECK_EQUAL( result, 3.33);
00663
00664 cce->disconnect();
00665 mo->disconnect();
00666
00667
00668 mi->connectTo( tp->ports()->getPort("mo"), toRTT(policy) );
00669 cce = ports->buildChannelOutput("mi", policy);
00670 ports->channelReady("mi", cce);
00671 sample = new CORBA::Any();
00672 BOOST_REQUIRE( cce.in() );
00673
00674
00675 result = 0.0;
00676 sample <<= 6.44;
00677 cce->write( sample.in() );
00678 sample <<= 4.44;
00679 cce->write( sample.in() );
00680 wait_for_equal( mi->read( result ), NewData, 5 );
00681 BOOST_CHECK_EQUAL( result, 6.44 );
00682 wait_for_equal( mi->read( result ), NewData, 5 );
00683 BOOST_CHECK_EQUAL( result, 4.44 );
00684
00685
00686 result = 0.0;
00687 BOOST_CHECK_EQUAL( mi->read( result ), OldData );
00688 BOOST_CHECK_EQUAL( result, 4.44);
00689 }
00690
00691 BOOST_AUTO_TEST_SUITE_END()
00692