00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include <gtest/gtest.h>
00037 #include "ros/poll_set.h"
00038 #include "ros/transport/transport_tcp.h"
00039
00040 #include <boost/bind.hpp>
00041 #include <boost/thread.hpp>
00042
00043 using namespace ros;
00044
00045 class Synchronous : public testing::Test
00046 {
00047 public:
00048 Synchronous()
00049 {
00050 }
00051
00052 ~Synchronous()
00053 {
00054 }
00055
00056
00057 protected:
00058
00059 virtual void SetUp()
00060 {
00061 transports_[0] = TransportTCPPtr(new TransportTCP(NULL, TransportTCP::SYNCHRONOUS));
00062 transports_[1] = TransportTCPPtr(new TransportTCP(NULL, TransportTCP::SYNCHRONOUS));
00063
00064 if (!transports_[0]->listen(0, 100, TransportTCP::AcceptCallback()))
00065 {
00066 FAIL();
00067 }
00068
00069 if (!transports_[1]->connect("localhost", transports_[0]->getServerPort()))
00070 {
00071 FAIL();
00072 }
00073
00074 transports_[2] = transports_[0]->accept();
00075 if (!transports_[2])
00076 {
00077 FAIL();
00078 }
00079 }
00080
00081 virtual void TearDown()
00082 {
00083 for (int i = 0; i < 3; ++i)
00084 {
00085 if (transports_[i])
00086 {
00087 transports_[i]->close();
00088 }
00089 }
00090 }
00091
00092 TransportTCPPtr transports_[3];
00093 };
00094
00095 TEST_F(Synchronous, writeThenRead)
00096 {
00097 std::string msg = "test";
00098 int32_t written = transports_[1]->write((uint8_t*)msg.c_str(), msg.length());
00099 ASSERT_EQ(written, (int32_t)msg.length());
00100
00101 uint8_t buf[5];
00102 memset(buf, 0, sizeof(buf));
00103 int32_t read = transports_[2]->read(buf, msg.length());
00104 ASSERT_EQ(read, (int32_t)msg.length());
00105 ASSERT_STREQ((const char*)buf, msg.c_str());
00106 }
00107
00108 TEST_F(Synchronous, writeThenReadPartial)
00109 {
00110 std::string msg = "test";
00111 int32_t written = transports_[1]->write((uint8_t*)msg.c_str(), msg.length());
00112 ASSERT_EQ(written, (int32_t)msg.length());
00113
00114 uint8_t buf[5];
00115 memset(buf, 0, sizeof(buf));
00116 int32_t read = transports_[2]->read(buf, 1);
00117 ASSERT_EQ(read, 1);
00118 ASSERT_STREQ((const char*)buf, msg.substr(0, 1).c_str());
00119 }
00120
00121 void readThread(TransportTCPPtr transport, uint8_t* buf, uint32_t size, volatile int32_t* read_out, volatile bool* done_read)
00122 {
00123 while (*read_out < (int32_t)size)
00124 {
00125 *read_out += transport->read(buf + *read_out, size - *read_out);
00126 }
00127 *done_read = true;
00128 }
00129
00130 TEST_F(Synchronous, readWhileWriting)
00131 {
00132 for (int i = 0; i < 10; ++i)
00133 {
00134 const uint32_t buf_size = 1024*1024;
00135 std::auto_ptr<uint8_t> read_buf(new uint8_t[buf_size]);
00136
00137 std::stringstream ss;
00138 for (int i = 0; i < 100000; ++i)
00139 {
00140 ss << i;
00141 }
00142
00143 std::string msg = ss.str();
00144
00145 ASSERT_TRUE(msg.size() < buf_size);
00146
00147 volatile int32_t read_out = 0;
00148 volatile bool done_read = false;
00149 boost::thread t(boost::bind(readThread, transports_[2], read_buf.get(), msg.size(), &read_out, &done_read));
00150
00151 boost::this_thread::sleep(boost::posix_time::milliseconds(50));
00152
00153 int32_t written = transports_[1]->write((uint8_t*)msg.c_str(), msg.length());
00154 ASSERT_EQ(written, (int32_t)msg.length());
00155
00156 while (!done_read)
00157 {
00158 boost::this_thread::sleep(boost::posix_time::milliseconds(1));
00159 }
00160
00161 ASSERT_EQ(done_read, true);
00162 ASSERT_EQ(read_out, (int32_t)msg.length());
00163 ASSERT_STREQ((const char*)read_buf.get(), msg.c_str());
00164 }
00165 }
00166
00167 TEST_F(Synchronous, readAfterClose)
00168 {
00169 transports_[1]->close();
00170
00171 uint8_t buf[5];
00172 int32_t read = transports_[1]->read(buf, 1);
00173 ASSERT_EQ(read, -1);
00174 }
00175
00176 TEST_F(Synchronous, writeAfterClose)
00177 {
00178 transports_[1]->close();
00179
00180 std::string msg = "test";
00181 int32_t written = transports_[1]->write((uint8_t*)msg.c_str(), msg.length());
00182 ASSERT_EQ(written, -1);
00183 }
00184
00185 class Polled : public testing::Test
00186 {
00187 public:
00188 Polled()
00189 {
00190 }
00191
00192 ~Polled()
00193 {
00194 }
00195
00196
00197 protected:
00198
00199 void connectionReceived(const TransportTCPPtr& transport)
00200 {
00201 transports_[2] = transport;
00202 }
00203
00204 void pollThread()
00205 {
00206 while (continue_)
00207 {
00208 poll_set_.update(10);
00209 }
00210 }
00211
00212 void onReadable(const TransportPtr& transport, int index)
00213 {
00214 ASSERT_EQ(transport, transports_[index]);
00215
00216 uint8_t b = 0;
00217 while (transport->read(&b, 1) > 0)
00218 {
00219 ++bytes_read_[index];
00220 }
00221 }
00222
00223 void onWriteable(const TransportPtr& transport, int index)
00224 {
00225 ASSERT_EQ(transport, transports_[index]);
00226
00227 uint8_t b = 0;
00228 transport->write(&b, 1);
00229
00230 ++bytes_written_[index];
00231 }
00232
00233 void onDisconnect(const TransportPtr& transport, int index)
00234 {
00235 ASSERT_EQ(transport, transports_[index]);
00236
00237 disconnected_[index] = true;
00238 }
00239
00240 virtual void SetUp()
00241 {
00242 bytes_read_[0] = 0;
00243 bytes_read_[1] = 0;
00244 bytes_read_[2] = 0;
00245
00246 bytes_written_[0] = 0;
00247 bytes_written_[1] = 0;
00248 bytes_written_[2] = 0;
00249
00250 disconnected_[0] = false;
00251 disconnected_[1] = false;
00252 disconnected_[2] = false;
00253
00254 transports_[0] = TransportTCPPtr(new TransportTCP(&poll_set_));
00255 transports_[1] = TransportTCPPtr(new TransportTCP(&poll_set_));
00256
00257 if (!transports_[0]->listen(0, 100, boost::bind(&Polled::connectionReceived, this, _1)))
00258 {
00259 FAIL();
00260 }
00261
00262 if (!transports_[1]->connect("localhost", transports_[0]->getServerPort()))
00263 {
00264 FAIL();
00265 }
00266
00267 continue_ = true;
00268 poll_thread_ = boost::thread(boost::bind(&Polled::pollThread, this));
00269
00270 int count = 0;
00271 while (!transports_[2] && count < 100)
00272 {
00273 boost::this_thread::sleep(boost::posix_time::milliseconds(50));
00274 }
00275
00276 if (!transports_[2])
00277 {
00278 FAIL();
00279 }
00280
00281 transports_[1]->setReadCallback(boost::bind(&Polled::onReadable, this, _1, 1));
00282 transports_[2]->setReadCallback(boost::bind(&Polled::onReadable, this, _1, 2));
00283 transports_[1]->setWriteCallback(boost::bind(&Polled::onWriteable, this, _1, 1));
00284 transports_[2]->setWriteCallback(boost::bind(&Polled::onWriteable, this, _1, 2));
00285 transports_[1]->setDisconnectCallback(boost::bind(&Polled::onDisconnect, this, _1, 1));
00286 transports_[2]->setDisconnectCallback(boost::bind(&Polled::onDisconnect, this, _1, 2));
00287
00288 transports_[1]->enableRead();
00289 transports_[2]->enableRead();
00290 }
00291
00292 virtual void TearDown()
00293 {
00294 for (int i = 0; i < 3; ++i)
00295 {
00296 transports_[i]->close();
00297 }
00298
00299 continue_ = false;
00300 poll_thread_.join();
00301 }
00302
00303 TransportTCPPtr transports_[3];
00304 int bytes_read_[3];
00305 int bytes_written_[3];
00306 bool disconnected_[3];
00307
00308 PollSet poll_set_;
00309
00310 boost::thread poll_thread_;
00311 volatile bool continue_;
00312 };
00313
00314 TEST_F(Polled, readAndWrite)
00315 {
00316 transports_[1]->enableWrite();
00317 boost::this_thread::sleep(boost::posix_time::milliseconds(50));
00318 transports_[1]->disableWrite();
00319
00320 boost::this_thread::sleep(boost::posix_time::milliseconds(50));
00321 ASSERT_GT(bytes_read_[2], 0);
00322 ASSERT_EQ(bytes_read_[2], bytes_written_[1]);
00323
00324 int old_read_val = bytes_read_[2];
00325
00326 transports_[2]->enableWrite();
00327 boost::this_thread::sleep(boost::posix_time::milliseconds(50));
00328 transports_[2]->disableWrite();
00329
00330 boost::this_thread::sleep(boost::posix_time::milliseconds(50));
00331 ASSERT_EQ(bytes_read_[1], bytes_written_[2]);
00332 ASSERT_EQ(old_read_val, bytes_read_[2]);
00333
00334 transports_[1]->enableWrite();
00335 transports_[2]->enableWrite();
00336 boost::this_thread::sleep(boost::posix_time::milliseconds(50));
00337 transports_[1]->disableWrite();
00338 transports_[2]->disableWrite();
00339
00340 boost::this_thread::sleep(boost::posix_time::milliseconds(50));
00341 ASSERT_GT(bytes_read_[2], 0);
00342 ASSERT_EQ(bytes_read_[2], bytes_written_[1]);
00343 ASSERT_GT(bytes_read_[1], 0);
00344 ASSERT_EQ(bytes_read_[1], bytes_written_[2]);
00345 }
00346
00347 TEST_F(Polled, enableDisableWrite)
00348 {
00349 boost::this_thread::sleep(boost::posix_time::milliseconds(50));
00350 ASSERT_EQ(bytes_read_[1], 0);
00351 ASSERT_EQ(bytes_read_[2], 0);
00352 ASSERT_EQ(bytes_written_[1], 0);
00353 ASSERT_EQ(bytes_written_[2], 0);
00354
00355 transports_[1]->enableWrite();
00356 boost::this_thread::sleep(boost::posix_time::milliseconds(50));
00357 transports_[1]->disableWrite();
00358
00359 boost::this_thread::sleep(boost::posix_time::milliseconds(50));
00360 ASSERT_GT(bytes_read_[2], 0);
00361 ASSERT_GT(bytes_written_[1], 0);
00362 int old_read_val = bytes_read_[2];
00363 int old_written_val = bytes_written_[1];
00364 boost::this_thread::sleep(boost::posix_time::milliseconds(50));
00365 ASSERT_EQ(bytes_read_[2], old_read_val);
00366 ASSERT_EQ(bytes_written_[1], old_written_val);
00367 }
00368
00369 TEST_F(Polled, disconnectNoTraffic)
00370 {
00371 ASSERT_EQ(disconnected_[1], false);
00372 ASSERT_EQ(disconnected_[2], false);
00373
00374 transports_[1]->close();
00375 ASSERT_EQ(disconnected_[1], true);
00376
00377 boost::this_thread::sleep(boost::posix_time::milliseconds(50));
00378
00379 ASSERT_EQ(disconnected_[2], true);
00380 }
00381
00382 TEST_F(Polled, disconnectWriter)
00383 {
00384 ASSERT_EQ(disconnected_[1], false);
00385 ASSERT_EQ(disconnected_[2], false);
00386
00387 transports_[1]->enableWrite();
00388 boost::this_thread::sleep(boost::posix_time::milliseconds(50));
00389 ASSERT_GT(bytes_read_[2], 0);
00390
00391 transports_[1]->close();
00392 ASSERT_EQ(disconnected_[1], true);
00393
00394 boost::this_thread::sleep(boost::posix_time::milliseconds(50));
00395
00396 ASSERT_EQ(disconnected_[2], true);
00397 }
00398
00399 TEST_F(Polled, disconnectReader)
00400 {
00401 ASSERT_EQ(disconnected_[1], false);
00402 ASSERT_EQ(disconnected_[2], false);
00403
00404 transports_[2]->enableWrite();
00405 boost::this_thread::sleep(boost::posix_time::milliseconds(50));
00406 ASSERT_GT(bytes_read_[1], 0);
00407
00408 transports_[1]->close();
00409 ASSERT_EQ(disconnected_[1], true);
00410
00411 boost::this_thread::sleep(boost::posix_time::milliseconds(50));
00412
00413 ASSERT_EQ(disconnected_[2], true);
00414 }
00415
00416 int main(int argc, char** argv)
00417 {
00418 testing::InitGoogleTest(&argc, argv);
00419
00420 signal(SIGPIPE, SIG_IGN);
00421
00422 return RUN_ALL_TESTS();
00423 }