corba_mqueue_ipc_test.cpp
Go to the documentation of this file.
00001 /***************************************************************************
00002   tag: The SourceWorks  Tue Sep 7 00:54:57 CEST 2010  corba_mqueue_ipc_test.cpp
00003 
00004                         corba_mqueue_ipc_test.cpp -  description
00005                            -------------------
00006     begin                : Tue September 07 2010
00007     copyright            : (C) 2010 The SourceWorks
00008     email                : peter@thesourceworks.com
00009 
00010  ***************************************************************************
00011  *                                                                         *
00012  *   This program is free software; you can redistribute it and/or modify  *
00013  *   it under the terms of the GNU General Public License as published by  *
00014  *   the Free Software Foundation; either version 2 of the License, or     *
00015  *   (at your option) any later version.                                   *
00016  *                                                                         *
00017  ***************************************************************************/
00018 
00019 #include "unit.hpp"
00020 
00021 #include "corba_mqueue_test.hpp"
00022 
00023 #include <iostream>
00024 
00025 #include <transports/corba/DataFlowI.h>
00026 #include <rtt/transports/corba/RemotePorts.hpp>
00027 #include <rtt/transports/mqueue/MQLib.hpp>
00028 #include <rtt/transports/corba/CorbaConnPolicy.hpp>
00029 #include <transports/corba/corba.h>
00030 #include <rtt/InputPort.hpp>
00031 #include <rtt/OutputPort.hpp>
00032 #include <rtt/TaskContext.hpp>
00033 #include <transports/corba/TaskContextServer.hpp>
00034 #include <transports/corba/TaskContextProxy.hpp>
00035 #include <string>
00036 #include <cstdlib>
00037 
00038 using namespace RTT;
00039 using namespace RTT::detail;
00040 
00041 class CorbaMQueueIPCTest
00042 {
00043 public:
00044     CorbaMQueueIPCTest() { this->setUp(); }
00045     ~CorbaMQueueIPCTest() { this->tearDown(); }
00046 
00047     TaskContext* tc;
00048     corba::TaskContextProxy* tp, *tp2;
00049     corba::TaskContextServer* ts, *ts2;
00050 
00051     base::PortInterface* signalled_port;
00052     void new_data_listener(base::PortInterface* port);
00053 
00054     // Ports
00055     InputPort<double>*  mr1;
00056     OutputPort<double>* mw1;
00057 
00058     void setUp();
00059     void tearDown();
00060 
00061     // helper test functions
00062     void testPortDataConnection();
00063     void testPortBufferConnection();
00064     void testPortDisconnected();
00065 };
00066 
00067 using namespace std;
00068 using corba::TaskContextProxy;
00069 
00070 void
00071 CorbaMQueueIPCTest::setUp()
00072 {
00073     // setup DataPorts: we write into mw1, the server roundtrips it to mr1
00074     mr1 = new InputPort<double>("mr");
00075     mw1 = new OutputPort<double>("mw");
00076 
00077     tc =  new TaskContext( "root" );
00078     tc->ports()->addEventPort( *mr1, boost::bind(&CorbaMQueueIPCTest::new_data_listener, this, _1) );
00079     tc->ports()->addPort( *mw1 );
00080     tc->start();
00081 
00082     ts2 = ts = 0;
00083     tp2 = tp = 0;
00084 }
00085 
00086 
00087 void
00088 CorbaMQueueIPCTest::tearDown()
00089 {
00090     delete tp;
00091     delete ts;
00092     delete tp2;
00093     delete tc;
00094 
00095     delete mr1;
00096     delete mw1;
00097 }
00098 
00099 void CorbaMQueueIPCTest::new_data_listener(base::PortInterface* port)
00100 {
00101     signalled_port = port;
00102 }
00103 
00104 
00105 #define ASSERT_PORT_SIGNALLING(code, read_port) \
00106     signalled_port = 0; \
00107     code; \
00108     usleep(100000); \
00109     BOOST_CHECK( read_port == signalled_port );
00110 
00111 bool wait_for_helper;
00112 #define wait_for( cond, times ) \
00113     wait = 0; \
00114     while( (wait_for_helper = !(cond)) && wait++ != times ) \
00115       usleep(100000); \
00116     if (wait_for_helper) BOOST_CHECK( cond );
00117 
00118 #define wait_for_equal( a, b, times ) \
00119     wait = 0; \
00120     while( (wait_for_helper = ((a) != (b))) && wait++ != times ) \
00121       usleep(100000); \
00122     if (wait_for_helper) BOOST_CHECK_EQUAL( a, b );
00123 
00124 void CorbaMQueueIPCTest::testPortDataConnection()
00125 {
00126     // This test assumes that there is a data connection mw1 => server => mr1
00127     // Check if connection succeeded both ways:
00128     BOOST_CHECK( mw1->connected() );
00129     BOOST_CHECK( mr1->connected() );
00130 
00131     double value = 0;
00132 
00133     // Check if no-data works
00134     BOOST_CHECK( !mr1->read(value) );
00135 
00136     // Check if writing works (including signalling)
00137     ASSERT_PORT_SIGNALLING(mw1->write(1.0), mr1)
00138     BOOST_CHECK( mr1->read(value) );
00139     BOOST_CHECK_EQUAL( 1.0, value );
00140     ASSERT_PORT_SIGNALLING(mw1->write(2.0), mr1);
00141     BOOST_CHECK( mr1->read(value) );
00142     BOOST_CHECK_EQUAL( 2.0, value );
00143 }
00144 
00145 void CorbaMQueueIPCTest::testPortBufferConnection()
00146 {
00147     // This test assumes that there is a buffer connection mw1 => server => mr1 of size 3
00148     // Check if connection succeeded both ways:
00149     BOOST_CHECK( mw1->connected() );
00150     BOOST_CHECK( mr1->connected() );
00151 
00152     double value = 0;
00153 
00154     // Check if no-data works
00155     BOOST_CHECK( !mr1->read(value) );
00156 
00157     // Check if writing works
00158     ASSERT_PORT_SIGNALLING(mw1->write(1.0), mr1);
00159     ASSERT_PORT_SIGNALLING(mw1->write(2.0), mr1);
00160     ASSERT_PORT_SIGNALLING(mw1->write(3.0), mr1);
00161     // it will be emptied too fast by mqueue.
00162     //ASSERT_PORT_SIGNALLING(mw1->write(4.0), 0);
00163     BOOST_CHECK( mr1->read(value) );
00164     BOOST_CHECK_EQUAL( 1.0, value );
00165     BOOST_CHECK( mr1->read(value) );
00166     BOOST_CHECK_EQUAL( 2.0, value );
00167     BOOST_CHECK( mr1->read(value) );
00168     BOOST_CHECK_EQUAL( 3.0, value );
00169     BOOST_CHECK_EQUAL( mr1->read(value), OldData );
00170 }
00171 
00172 void CorbaMQueueIPCTest::testPortDisconnected()
00173 {
00174     BOOST_CHECK( !mw1->connected() );
00175     BOOST_CHECK( !mr1->connected() );
00176 }
00177 
00178 
00179 // Registers the fixture into the 'registry'
00180 BOOST_FIXTURE_TEST_SUITE(  CorbaMQueueIPCTestSuite,  CorbaMQueueIPCTest )
00181 
00182 BOOST_AUTO_TEST_CASE( testPortConnections )
00183 {
00184     // This test tests the different port-to-port connections.
00185     ts = corba::TaskContextServer::Create( tc, /* use_naming = */ false );
00186     tp = corba::TaskContextProxy::Create( "other", /* is_ior = */ false );
00187     if (!tp)
00188         tp = corba::TaskContextProxy::CreateFromFile( "other.ior");
00189 
00190     // Create a default CORBA policy specification
00191     RTT::corba::CConnPolicy policy = toCORBA( RTT::ConnPolicy() );
00192     policy.type = RTT::corba::CData;
00193     policy.init = false;
00194     policy.lock_policy = RTT::corba::CLockFree;
00195     policy.size = 0;
00196     policy.data_size = 0;
00197 
00198     corba::CDataFlowInterface_var ports  = ts->server()->ports();
00199     corba::CDataFlowInterface_var ports2 = tp->server()->ports();
00200 
00201     // WARNING: in the following, there is four configuration tested. There is
00202     // also three different ways to disconnect. We need to test those three
00203     // "disconnection methods", so beware when you change something ...
00204 
00205     policy.type = RTT::corba::CData;
00206     policy.pull = false;
00207     policy.transport = ORO_MQUEUE_PROTOCOL_ID;
00208     BOOST_CHECK( ports->createConnection("mw", ports2, "mr", policy) );
00209     BOOST_CHECK( ports2->createConnection("mw", ports, "mr", policy) );
00210     usleep(100000); // gives dispatcher time to catch up.
00211     testPortDataConnection();
00212     ports->disconnectPort("mw");
00213     ports2->disconnectPort("mw");
00214     testPortDisconnected();
00215 
00216     policy.type = RTT::corba::CData;
00217     policy.pull = true;
00218     policy.transport = ORO_MQUEUE_PROTOCOL_ID;
00219     BOOST_CHECK( ports->createConnection("mw", ports2, "mr", policy) );
00220     BOOST_CHECK( ports2->createConnection("mw", ports, "mr", policy) );
00221     usleep(100000);
00222     testPortDataConnection();
00223     ports2->disconnectPort("mr");
00224     ports->disconnectPort("mr");
00225     testPortDisconnected();
00226 
00227 #if 1
00228 
00229     policy.type = RTT::corba::CBuffer;
00230     policy.pull = false;
00231     policy.size = 3;
00232     policy.transport = ORO_MQUEUE_PROTOCOL_ID;
00233     BOOST_CHECK( ports->createConnection("mw", ports2, "mr", policy) );
00234     BOOST_CHECK( ports2->createConnection("mw", ports, "mr", policy) );
00235     testPortBufferConnection();
00236     ports->disconnectPort("mw");
00237     ports2->disconnectPort("mw");
00238     testPortDisconnected();
00239 
00240     policy.type = RTT::corba::CBuffer;
00241     policy.pull = true;
00242     policy.size = 3;
00243     policy.transport = ORO_MQUEUE_PROTOCOL_ID;
00244     BOOST_CHECK( ports->createConnection("mw", ports2, "mr", policy) );
00245     BOOST_CHECK( ports2->createConnection("mw", ports, "mr", policy) );
00246     testPortBufferConnection();
00247     ports->disconnectPort("mw");
00248     ports2->disconnectPort("mw");
00249     testPortDisconnected();
00250 #endif
00251 }
00252 
00253 BOOST_AUTO_TEST_SUITE_END()
00254 


rtt
Author(s): RTT Developers
autogenerated on Fri Sep 9 2016 04:01:51