corba_mqueue_ipc_test.cpp
Go to the documentation of this file.
00001 /***************************************************************************
00002   tag: The SourceWorks  Tue Sep 7 00:54:57 CEST 2010  corba_mqueue_ipc_test.cpp
00003 
00004                         corba_mqueue_ipc_test.cpp -  description
00005                            -------------------
00006     begin                : Tue September 07 2010
00007     copyright            : (C) 2010 The SourceWorks
00008     email                : peter@thesourceworks.com
00009 
00010  ***************************************************************************
00011  *                                                                         *
00012  *   This program is free software; you can redistribute it and/or modify  *
00013  *   it under the terms of the GNU General Public License as published by  *
00014  *   the Free Software Foundation; either version 2 of the License, or     *
00015  *   (at your option) any later version.                                   *
00016  *                                                                         *
00017  ***************************************************************************/
00018 
00019 #include "unit.hpp"
00020 
00021 #include "corba_mqueue_test.hpp"
00022 
00023 #include <iostream>
00024 
00025 #include <transports/corba/DataFlowI.h>
00026 #include <rtt/transports/corba/RemotePorts.hpp>
00027 #include <rtt/transports/mqueue/MQLib.hpp>
00028 #include <rtt/transports/corba/CorbaConnPolicy.hpp>
00029 #include <transports/corba/corba.h>
00030 #include <rtt/InputPort.hpp>
00031 #include <rtt/OutputPort.hpp>
00032 #include <rtt/TaskContext.hpp>
00033 #include <transports/corba/TaskContextServer.hpp>
00034 #include <transports/corba/TaskContextProxy.hpp>
00035 #include <string>
00036 #include <cstdlib>
00037 
00038 using namespace RTT;
00039 using namespace RTT::detail;
00040 
00041 class CorbaMQueueIPCTest
00042 {
00043 public:
00044     CorbaMQueueIPCTest() { this->setUp(); }
00045     ~CorbaMQueueIPCTest() { this->tearDown(); }
00046 
00047     TaskContext* tc;
00048     corba::TaskContextProxy* tp, *tp2;
00049     corba::TaskContextServer* ts, *ts2;
00050 
00051     base::PortInterface* signalled_port;
00052     void new_data_listener(base::PortInterface* port);
00053 
00054     // Ports
00055     InputPort<double>*  mr1;
00056     OutputPort<double>* mw1;
00057 
00058     void setUp();
00059     void tearDown();
00060 
00061     // helper test functions
00062     void testPortDataConnection();
00063     void testPortBufferConnection();
00064     void testPortDisconnected();
00065 };
00066 
00067 using namespace std;
00068 using corba::TaskContextProxy;
00069 
00070 void
00071 CorbaMQueueIPCTest::setUp()
00072 {
00073     // setup DataPorts: we write into mw1, the server roundtrips it to mr1
00074     mr1 = new InputPort<double>("mr");
00075     mw1 = new OutputPort<double>("mw");
00076 
00077     tc =  new TaskContext( "root" );
00078     tc->ports()->addEventPort( *mr1, boost::bind(&CorbaMQueueIPCTest::new_data_listener, this, _1) );
00079     tc->ports()->addPort( *mw1 );
00080     tc->start();
00081 
00082     ts2 = ts = 0;
00083     tp2 = tp = 0;
00084 }
00085 
00086 
00087 void
00088 CorbaMQueueIPCTest::tearDown()
00089 {
00090     delete tp;
00091     delete ts;
00092     delete tp2;
00093     delete tc;
00094 
00095     delete mr1;
00096     delete mw1;
00097 }
00098 
00099 void CorbaMQueueIPCTest::new_data_listener(base::PortInterface* port)
00100 {
00101     signalled_port = port;
00102 }
00103 
00104 
00105 #define ASSERT_PORT_SIGNALLING(code, read_port) \
00106     signalled_port = 0; \
00107     code; \
00108     usleep(100000); \
00109     BOOST_CHECK( read_port == signalled_port );
00110 
00111 bool wait_for_helper;
00112 #define wait_for( cond, times ) \
00113     wait = 0; \
00114     while( (wait_for_helper = !(cond)) && wait++ != times ) \
00115       usleep(100000); \
00116     if (wait_for_helper) BOOST_CHECK( cond );
00117 
00118 #define wait_for_equal( a, b, times ) \
00119     wait = 0; \
00120     while( (wait_for_helper = ((a) != (b))) && wait++ != times ) \
00121       usleep(100000); \
00122     if (wait_for_helper) BOOST_CHECK_EQUAL( a, b );
00123 
00124 void CorbaMQueueIPCTest::testPortDataConnection()
00125 {
00126     // This test assumes that there is a data connection mw1 => server => mr1
00127     // Check if connection succeeded both ways:
00128     BOOST_CHECK( mw1->connected() );
00129     BOOST_CHECK( mr1->connected() );
00130 
00131     double value = 0;
00132 
00133     // Check if no-data works
00134     BOOST_CHECK( !mr1->read(value) );
00135 
00136     // Check if writing works (including signalling)
00137     ASSERT_PORT_SIGNALLING(mw1->write(1.0), mr1)
00138     BOOST_CHECK( mr1->read(value) );
00139     BOOST_CHECK_EQUAL( 1.0, value );
00140     ASSERT_PORT_SIGNALLING(mw1->write(2.0), mr1);
00141     BOOST_CHECK( mr1->read(value) );
00142     BOOST_CHECK_EQUAL( 2.0, value );
00143 }
00144 
00145 void CorbaMQueueIPCTest::testPortBufferConnection()
00146 {
00147     // This test assumes that there is a buffer connection mw1 => server => mr1 of size 3
00148     // Check if connection succeeded both ways:
00149     BOOST_CHECK( mw1->connected() );
00150     BOOST_CHECK( mr1->connected() );
00151 
00152     double value = 0;
00153 
00154     // Check if no-data works
00155     BOOST_CHECK( !mr1->read(value) );
00156 
00157     // Check if writing works
00158     ASSERT_PORT_SIGNALLING(mw1->write(1.0), mr1);
00159     ASSERT_PORT_SIGNALLING(mw1->write(2.0), mr1);
00160     ASSERT_PORT_SIGNALLING(mw1->write(3.0), mr1);
00161     // There are two connections between mw1 and mr1. We first flush one of the
00162     // two, and then the second one (normal multi-writer single-reader
00163     // behaviour)
00164     BOOST_CHECK( mr1->read(value) );
00165     BOOST_CHECK_EQUAL( 1.0, value );
00166     BOOST_CHECK( mr1->read(value) );
00167     BOOST_CHECK_EQUAL( 2.0, value );
00168     BOOST_CHECK( mr1->read(value) );
00169     BOOST_CHECK_EQUAL( 3.0, value );
00170     BOOST_CHECK( mr1->read(value) );
00171     BOOST_CHECK_EQUAL( 1.0, value );
00172     BOOST_CHECK( mr1->read(value) );
00173     BOOST_CHECK_EQUAL( 2.0, value );
00174     BOOST_CHECK( mr1->read(value) );
00175     BOOST_CHECK_EQUAL( 3.0, value );
00176     BOOST_CHECK_EQUAL( mr1->read(value), OldData );
00177 }
00178 
00179 void CorbaMQueueIPCTest::testPortDisconnected()
00180 {
00181     BOOST_CHECK( !mw1->connected() );
00182     BOOST_CHECK( !mr1->connected() );
00183 }
00184 
00185 
00186 // Registers the fixture into the 'registry'
00187 BOOST_FIXTURE_TEST_SUITE(  CorbaMQueueIPCTestSuite,  CorbaMQueueIPCTest )
00188 
00189 BOOST_AUTO_TEST_CASE( testPortConnections )
00190 {
00191     // This test tests the different port-to-port connections.
00192     ts = corba::TaskContextServer::Create( tc, false ); //no-naming
00193     //tp = corba::TaskContextProxy::Create("other");
00194     //if (!tp )
00195     tp = corba::TaskContextProxy::CreateFromFile( "root.ior");
00196     BOOST_CHECK( tp );
00197 
00198     // Create a default CORBA policy specification
00199     RTT::corba::CConnPolicy policy = toCORBA( RTT::ConnPolicy() );
00200     policy.type = RTT::corba::CData;
00201     policy.init = false;
00202     policy.lock_policy = RTT::corba::CLockFree;
00203     policy.size = 0;
00204     policy.data_size = 0;
00205 
00206     corba::CDataFlowInterface_var ports  = ts->server()->ports();
00207     corba::CDataFlowInterface_var ports2 = tp->server()->ports();
00208 
00209     // WARNING: in the following, there is four configuration tested. There is
00210     // also three different ways to disconnect. We need to test those three
00211     // "disconnection methods", so beware when you change something ...
00212 
00213     policy.type = RTT::corba::CData;
00214     policy.pull = false;
00215     policy.transport = ORO_MQUEUE_PROTOCOL_ID;
00216     BOOST_CHECK( ports->createConnection("mw", ports2, "mr", policy) );
00217     BOOST_CHECK( ports2->createConnection("mw", ports, "mr", policy) );
00218     usleep(100000); // gives dispatcher time to catch up.
00219     testPortDataConnection();
00220     ports->disconnectPort("mw");
00221     ports2->disconnectPort("mw");
00222     testPortDisconnected();
00223 
00224     policy.type = RTT::corba::CData;
00225     policy.pull = true;
00226     policy.transport = ORO_MQUEUE_PROTOCOL_ID;
00227     BOOST_CHECK( ports->createConnection("mw", ports2, "mr", policy) );
00228     BOOST_CHECK( ports2->createConnection("mw", ports, "mr", policy) );
00229     usleep(100000);
00230     testPortDataConnection();
00231     ports2->disconnectPort("mr");
00232     ports->disconnectPort("mr");
00233     testPortDisconnected();
00234 
00235 #if 1
00236 
00237     policy.type = RTT::corba::CBuffer;
00238     policy.pull = false;
00239     policy.size = 3;
00240     policy.transport = ORO_MQUEUE_PROTOCOL_ID;
00241     BOOST_CHECK( ports->createConnection("mw", ports2, "mr", policy) );
00242     BOOST_CHECK( ports2->createConnection("mw", ports, "mr", policy) );
00243     testPortBufferConnection();
00244     ports->disconnectPort("mw");
00245     ports2->disconnectPort("mw");
00246     testPortDisconnected();
00247 
00248     policy.type = RTT::corba::CBuffer;
00249     policy.pull = true;
00250     policy.size = 3;
00251     policy.transport = ORO_MQUEUE_PROTOCOL_ID;
00252     BOOST_CHECK( ports->createConnection("mw", ports2, "mr", policy) );
00253     BOOST_CHECK( ports2->createConnection("mw", ports, "mr", policy) );
00254     testPortBufferConnection();
00255     ports->disconnectPort("mw");
00256     ports2->disconnectPort("mw");
00257     testPortDisconnected();
00258 #endif
00259 }
00260 
00261 BOOST_AUTO_TEST_SUITE_END()
00262 


rtt
Author(s): RTT Developers
autogenerated on Wed Aug 26 2015 16:15:46