test_transport_tcp.cpp
Go to the documentation of this file.
00001 /*
00002  * Copyright (c) 2008, Willow Garage, Inc.
00003  * All rights reserved.
00004  *
00005  * Redistribution and use in source and binary forms, with or without
00006  * modification, are permitted provided that the following conditions are met:
00007  *
00008  *     * Redistributions of source code must retain the above copyright
00009  *       notice, this list of conditions and the following disclaimer.
00010  *     * Redistributions in binary form must reproduce the above copyright
00011  *       notice, this list of conditions and the following disclaimer in the
00012  *       documentation and/or other materials provided with the distribution.
00013  *     * Neither the name of Willow Garage, Inc. nor the names of its
00014  *       contributors may be used to endorse or promote products derived from
00015  *       this software without specific prior written permission.
00016  *
00017  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00018  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00019  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00020  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00021  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00022  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00023  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00024  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00025  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00026  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00027  * POSSIBILITY OF SUCH DAMAGE.
00028  */
00029 
00030 /* Author: Josh Faust */
00031 
00032 /*
00033  * Test version macros
00034  */
00035 
00036 #include <gtest/gtest.h>
00037 #include "ros/poll_set.h"
00038 #include "ros/transport/transport_tcp.h"
00039 
00040 #include <boost/bind.hpp>
00041 #include <boost/thread.hpp>
00042 
00043 using namespace ros;
00044 
00045 class Synchronous : public testing::Test
00046 {
00047 public:
00048   Synchronous()
00049   {
00050   }
00051 
00052   ~Synchronous()
00053   {
00054   }
00055 
00056 
00057 protected:
00058 
00059   virtual void SetUp()
00060   {
00061     transports_[0] = boost::make_shared<TransportTCP>(static_cast<ros::PollSet*>(NULL), TransportTCP::SYNCHRONOUS);
00062     transports_[1] = boost::make_shared<TransportTCP>(static_cast<ros::PollSet*>(NULL), TransportTCP::SYNCHRONOUS);
00063 
00064     if (!transports_[0]->listen(0, 100, TransportTCP::AcceptCallback()))
00065     {
00066       FAIL();
00067     }
00068 
00069     if (!transports_[1]->connect("localhost", transports_[0]->getServerPort()))
00070     {
00071       FAIL();
00072     }
00073 
00074     transports_[2] = transports_[0]->accept();
00075     if (!transports_[2])
00076     {
00077       FAIL();
00078     }
00079   }
00080 
00081   virtual void TearDown()
00082   {
00083     for (int i = 0; i < 3; ++i)
00084     {
00085       if (transports_[i])
00086       {
00087         transports_[i]->close();
00088       }
00089     }
00090   }
00091 
00092   TransportTCPPtr transports_[3];
00093 };
00094 
00095 TEST_F(Synchronous, writeThenRead)
00096 {
00097   std::string msg = "test";
00098   int32_t written = transports_[1]->write((uint8_t*)msg.c_str(), msg.length());
00099   ASSERT_EQ(written, (int32_t)msg.length());
00100 
00101   uint8_t buf[5];
00102   memset(buf, 0, sizeof(buf));
00103   int32_t read = transports_[2]->read(buf, msg.length());
00104   ASSERT_EQ(read, (int32_t)msg.length());
00105   ASSERT_STREQ((const char*)buf, msg.c_str());
00106 }
00107 
00108 TEST_F(Synchronous, writeThenReadPartial)
00109 {
00110   std::string msg = "test";
00111   int32_t written = transports_[1]->write((uint8_t*)msg.c_str(), msg.length());
00112   ASSERT_EQ(written, (int32_t)msg.length());
00113 
00114   uint8_t buf[5];
00115   memset(buf, 0, sizeof(buf));
00116   int32_t read = transports_[2]->read(buf, 1);
00117   ASSERT_EQ(read, 1);
00118   ASSERT_STREQ((const char*)buf, msg.substr(0, 1).c_str());
00119 }
00120 
00121 void readThread(TransportTCPPtr transport, uint8_t* buf, uint32_t size, volatile int32_t* read_out, volatile bool* done_read)
00122 {
00123   while (*read_out < (int32_t)size)
00124   {
00125     *read_out += transport->read(buf + *read_out, size - *read_out);
00126   }
00127   *done_read = true;
00128 }
00129 
00130 TEST_F(Synchronous, readWhileWriting)
00131 {
00132   for (int i = 0; i < 10; ++i)
00133   {
00134     const uint32_t buf_size = 1024*1024;
00135     std::auto_ptr<uint8_t> read_buf(new uint8_t[buf_size]);
00136 
00137     std::stringstream ss;
00138     for (int i = 0; i < 100000; ++i)
00139     {
00140       ss << i;
00141     }
00142 
00143     std::string msg = ss.str();
00144 
00145     ASSERT_TRUE(msg.size() < buf_size);
00146 
00147     volatile int32_t read_out = 0;
00148     volatile bool done_read = false;
00149     boost::thread t(boost::bind(readThread, transports_[2], read_buf.get(), msg.size(), &read_out, &done_read));
00150 
00151     boost::this_thread::sleep(boost::posix_time::milliseconds(50));
00152 
00153     int32_t written = transports_[1]->write((uint8_t*)msg.c_str(), msg.length());
00154     ASSERT_EQ(written, (int32_t)msg.length());
00155 
00156     while (!done_read)
00157     {
00158       boost::this_thread::sleep(boost::posix_time::milliseconds(1));
00159     }
00160 
00161     ASSERT_EQ(done_read, true);
00162     ASSERT_EQ(read_out, (int32_t)msg.length());
00163     ASSERT_STREQ((const char*)read_buf.get(), msg.c_str());
00164   }
00165 }
00166 
00167 TEST_F(Synchronous, readAfterClose)
00168 {
00169   transports_[1]->close();
00170 
00171   uint8_t buf[5];
00172   int32_t read = transports_[1]->read(buf, 1);
00173   ASSERT_EQ(read, -1);
00174 }
00175 
00176 TEST_F(Synchronous, writeAfterClose)
00177 {
00178   transports_[1]->close();
00179 
00180   std::string msg = "test";
00181   int32_t written = transports_[1]->write((uint8_t*)msg.c_str(), msg.length());
00182   ASSERT_EQ(written, -1);
00183 }
00184 
00185 class Polled : public testing::Test
00186 {
00187 public:
00188   Polled()
00189   {
00190   }
00191 
00192   ~Polled()
00193   {
00194   }
00195 
00196 
00197 protected:
00198 
00199   void connectionReceived(const TransportTCPPtr& transport)
00200   {
00201     transports_[2] = transport;
00202   }
00203 
00204   void pollThread()
00205   {
00206     while (continue_)
00207     {
00208       poll_set_.update(10);
00209     }
00210   }
00211 
00212   void onReadable(const TransportPtr& transport, int index)
00213   {
00214     ASSERT_EQ(transport, transports_[index]);
00215 
00216     uint8_t b = 0;
00217     while (transport->read(&b, 1) > 0)
00218     {
00219       ++bytes_read_[index];
00220     }
00221   }
00222 
00223   void onWriteable(const TransportPtr& transport, int index)
00224   {
00225     ASSERT_EQ(transport, transports_[index]);
00226 
00227     uint8_t b = 0;
00228     transport->write(&b, 1);
00229 
00230     ++bytes_written_[index];
00231   }
00232 
00233   void onDisconnect(const TransportPtr& transport, int index)
00234   {
00235     ASSERT_EQ(transport, transports_[index]);
00236 
00237     disconnected_[index] = true;
00238   }
00239 
00240   virtual void SetUp()
00241   {
00242     bytes_read_[0] = 0;
00243     bytes_read_[1] = 0;
00244     bytes_read_[2] = 0;
00245 
00246     bytes_written_[0] = 0;
00247     bytes_written_[1] = 0;
00248     bytes_written_[2] = 0;
00249 
00250     disconnected_[0] = false;
00251     disconnected_[1] = false;
00252     disconnected_[2] = false;
00253 
00254     transports_[0] = boost::make_shared<TransportTCP>(&poll_set_);
00255     transports_[1] = boost::make_shared<TransportTCP>(&poll_set_);
00256 
00257     if (!transports_[0]->listen(0, 100, boost::bind(&Polled::connectionReceived, this, _1)))
00258     {
00259       FAIL();
00260     }
00261 
00262     if (!transports_[1]->connect("localhost", transports_[0]->getServerPort()))
00263     {
00264       FAIL();
00265     }
00266 
00267     continue_ = true;
00268     poll_thread_ = boost::thread(boost::bind(&Polled::pollThread, this));
00269 
00270     int count = 0;
00271     while (!transports_[2] && count < 100)
00272     {
00273       boost::this_thread::sleep(boost::posix_time::milliseconds(50));
00274     }
00275 
00276     if (!transports_[2])
00277     {
00278       FAIL();
00279     }
00280 
00281     transports_[1]->setReadCallback(boost::bind(&Polled::onReadable, this, _1, 1));
00282     transports_[2]->setReadCallback(boost::bind(&Polled::onReadable, this, _1, 2));
00283     transports_[1]->setWriteCallback(boost::bind(&Polled::onWriteable, this, _1, 1));
00284     transports_[2]->setWriteCallback(boost::bind(&Polled::onWriteable, this, _1, 2));
00285     transports_[1]->setDisconnectCallback(boost::bind(&Polled::onDisconnect, this, _1, 1));
00286     transports_[2]->setDisconnectCallback(boost::bind(&Polled::onDisconnect, this, _1, 2));
00287 
00288     transports_[1]->enableRead();
00289     transports_[2]->enableRead();
00290   }
00291 
00292   virtual void TearDown()
00293   {
00294     for (int i = 0; i < 3; ++i)
00295     {
00296       transports_[i]->close();
00297     }
00298 
00299     continue_ = false;
00300     poll_thread_.join();
00301   }
00302 
00303   TransportTCPPtr transports_[3];
00304   int bytes_read_[3];
00305   int bytes_written_[3];
00306   bool disconnected_[3];
00307 
00308   PollSet poll_set_;
00309 
00310   boost::thread poll_thread_;
00311   volatile bool continue_;
00312 };
00313 
00314 TEST_F(Polled, readAndWrite)
00315 {
00316   transports_[1]->enableWrite();
00317   boost::this_thread::sleep(boost::posix_time::milliseconds(50));
00318   transports_[1]->disableWrite();
00319 
00320   boost::this_thread::sleep(boost::posix_time::milliseconds(50));
00321   ASSERT_GT(bytes_read_[2], 0);
00322   ASSERT_EQ(bytes_read_[2], bytes_written_[1]);
00323 
00324   int old_read_val = bytes_read_[2];
00325 
00326   transports_[2]->enableWrite();
00327   boost::this_thread::sleep(boost::posix_time::milliseconds(50));
00328   transports_[2]->disableWrite();
00329 
00330   boost::this_thread::sleep(boost::posix_time::milliseconds(50));
00331   ASSERT_EQ(bytes_read_[1], bytes_written_[2]);
00332   ASSERT_EQ(old_read_val, bytes_read_[2]);
00333 
00334   transports_[1]->enableWrite();
00335   transports_[2]->enableWrite();
00336   boost::this_thread::sleep(boost::posix_time::milliseconds(50));
00337   transports_[1]->disableWrite();
00338   transports_[2]->disableWrite();
00339 
00340   boost::this_thread::sleep(boost::posix_time::milliseconds(50));
00341   ASSERT_GT(bytes_read_[2], 0);
00342   ASSERT_EQ(bytes_read_[2], bytes_written_[1]);
00343   ASSERT_GT(bytes_read_[1], 0);
00344   ASSERT_EQ(bytes_read_[1], bytes_written_[2]);
00345 }
00346 
00347 TEST_F(Polled, enableDisableWrite)
00348 {
00349   boost::this_thread::sleep(boost::posix_time::milliseconds(50));
00350   ASSERT_EQ(bytes_read_[1], 0);
00351   ASSERT_EQ(bytes_read_[2], 0);
00352   ASSERT_EQ(bytes_written_[1], 0);
00353   ASSERT_EQ(bytes_written_[2], 0);
00354 
00355   transports_[1]->enableWrite();
00356   boost::this_thread::sleep(boost::posix_time::milliseconds(50));
00357   transports_[1]->disableWrite();
00358 
00359   boost::this_thread::sleep(boost::posix_time::milliseconds(50));
00360   ASSERT_GT(bytes_read_[2], 0);
00361   ASSERT_GT(bytes_written_[1], 0);
00362   int old_read_val = bytes_read_[2];
00363   int old_written_val = bytes_written_[1];
00364   boost::this_thread::sleep(boost::posix_time::milliseconds(50));
00365   ASSERT_EQ(bytes_read_[2], old_read_val);
00366   ASSERT_EQ(bytes_written_[1], old_written_val);
00367 }
00368 
00369 TEST_F(Polled, disconnectNoTraffic)
00370 {
00371   ASSERT_EQ(disconnected_[1], false);
00372   ASSERT_EQ(disconnected_[2], false);
00373 
00374   transports_[1]->close();
00375   ASSERT_EQ(disconnected_[1], true);
00376 
00377   boost::this_thread::sleep(boost::posix_time::milliseconds(50));
00378 
00379   ASSERT_EQ(disconnected_[2], true);
00380 }
00381 
00382 TEST_F(Polled, disconnectWriter)
00383 {
00384   ASSERT_EQ(disconnected_[1], false);
00385   ASSERT_EQ(disconnected_[2], false);
00386 
00387   transports_[1]->enableWrite();
00388   boost::this_thread::sleep(boost::posix_time::milliseconds(50));
00389   ASSERT_GT(bytes_read_[2], 0);
00390 
00391   transports_[1]->close();
00392   ASSERT_EQ(disconnected_[1], true);
00393 
00394   boost::this_thread::sleep(boost::posix_time::milliseconds(50));
00395 
00396   ASSERT_EQ(disconnected_[2], true);
00397 }
00398 
00399 TEST_F(Polled, disconnectReader)
00400 {
00401   ASSERT_EQ(disconnected_[1], false);
00402   ASSERT_EQ(disconnected_[2], false);
00403 
00404   transports_[2]->enableWrite();
00405   boost::this_thread::sleep(boost::posix_time::milliseconds(50));
00406   ASSERT_GT(bytes_read_[1], 0);
00407 
00408   transports_[1]->close();
00409   ASSERT_EQ(disconnected_[1], true);
00410 
00411   boost::this_thread::sleep(boost::posix_time::milliseconds(50));
00412 
00413   ASSERT_EQ(disconnected_[2], true);
00414 }
00415 
00416 int main(int argc, char** argv)
00417 {
00418   testing::InitGoogleTest(&argc, argv);
00419 
00420   signal(SIGPIPE, SIG_IGN);
00421 
00422   return RUN_ALL_TESTS();
00423 }


test_roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Tue Mar 7 2017 03:45:23