$search
00001 /* 00002 * Copyright (c) 2008, Willow Garage, Inc. 00003 * All rights reserved. 00004 * 00005 * Redistribution and use in source and binary forms, with or without 00006 * modification, are permitted provided that the following conditions are met: 00007 * 00008 * * Redistributions of source code must retain the above copyright 00009 * notice, this list of conditions and the following disclaimer. 00010 * * Redistributions in binary form must reproduce the above copyright 00011 * notice, this list of conditions and the following disclaimer in the 00012 * documentation and/or other materials provided with the distribution. 00013 * * Neither the name of Willow Garage, Inc. nor the names of its 00014 * contributors may be used to endorse or promote products derived from 00015 * this software without specific prior written permission. 00016 * 00017 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 00018 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 00019 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 00020 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 00021 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 00022 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 00023 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00024 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 00025 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 00026 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00027 * POSSIBILITY OF SUCH DAMAGE. 00028 */ 00029 00030 /* Author: Josh Faust */ 00031 00032 /* 00033 * Test version macros 00034 */ 00035 00036 #include <gtest/gtest.h> 00037 #include "ros/poll_set.h" 00038 #include "ros/transport/transport_tcp.h" 00039 00040 #include <boost/bind.hpp> 00041 #include <boost/thread.hpp> 00042 00043 using namespace ros; 00044 00045 class Synchronous : public testing::Test 00046 { 00047 public: 00048 Synchronous() 00049 { 00050 } 00051 00052 ~Synchronous() 00053 { 00054 } 00055 00056 00057 protected: 00058 00059 virtual void SetUp() 00060 { 00061 transports_[0] = TransportTCPPtr(new TransportTCP(NULL, TransportTCP::SYNCHRONOUS)); 00062 transports_[1] = TransportTCPPtr(new TransportTCP(NULL, TransportTCP::SYNCHRONOUS)); 00063 00064 if (!transports_[0]->listen(0, 100, TransportTCP::AcceptCallback())) 00065 { 00066 FAIL(); 00067 } 00068 00069 if (!transports_[1]->connect("localhost", transports_[0]->getServerPort())) 00070 { 00071 FAIL(); 00072 } 00073 00074 transports_[2] = transports_[0]->accept(); 00075 if (!transports_[2]) 00076 { 00077 FAIL(); 00078 } 00079 } 00080 00081 virtual void TearDown() 00082 { 00083 for (int i = 0; i < 3; ++i) 00084 { 00085 if (transports_[i]) 00086 { 00087 transports_[i]->close(); 00088 } 00089 } 00090 } 00091 00092 TransportTCPPtr transports_[3]; 00093 }; 00094 00095 TEST_F(Synchronous, writeThenRead) 00096 { 00097 std::string msg = "test"; 00098 int32_t written = transports_[1]->write((uint8_t*)msg.c_str(), msg.length()); 00099 ASSERT_EQ(written, (int32_t)msg.length()); 00100 00101 uint8_t buf[5]; 00102 memset(buf, 0, sizeof(buf)); 00103 int32_t read = transports_[2]->read(buf, msg.length()); 00104 ASSERT_EQ(read, (int32_t)msg.length()); 00105 ASSERT_STREQ((const char*)buf, msg.c_str()); 00106 } 00107 00108 TEST_F(Synchronous, writeThenReadPartial) 00109 { 00110 std::string msg = "test"; 00111 int32_t written = transports_[1]->write((uint8_t*)msg.c_str(), msg.length()); 00112 ASSERT_EQ(written, (int32_t)msg.length()); 00113 00114 uint8_t buf[5]; 00115 memset(buf, 0, sizeof(buf)); 00116 int32_t read = transports_[2]->read(buf, 1); 00117 ASSERT_EQ(read, 1); 00118 ASSERT_STREQ((const char*)buf, msg.substr(0, 1).c_str()); 00119 } 00120 00121 void readThread(TransportTCPPtr transport, uint8_t* buf, uint32_t size, volatile int32_t* read_out, volatile bool* done_read) 00122 { 00123 while (*read_out < (int32_t)size) 00124 { 00125 *read_out += transport->read(buf + *read_out, size - *read_out); 00126 } 00127 *done_read = true; 00128 } 00129 00130 TEST_F(Synchronous, readWhileWriting) 00131 { 00132 for (int i = 0; i < 10; ++i) 00133 { 00134 const uint32_t buf_size = 1024*1024; 00135 std::auto_ptr<uint8_t> read_buf(new uint8_t[buf_size]); 00136 00137 std::stringstream ss; 00138 for (int i = 0; i < 100000; ++i) 00139 { 00140 ss << i; 00141 } 00142 00143 std::string msg = ss.str(); 00144 00145 ASSERT_TRUE(msg.size() < buf_size); 00146 00147 volatile int32_t read_out = 0; 00148 volatile bool done_read = false; 00149 boost::thread t(boost::bind(readThread, transports_[2], read_buf.get(), msg.size(), &read_out, &done_read)); 00150 00151 boost::this_thread::sleep(boost::posix_time::milliseconds(50)); 00152 00153 int32_t written = transports_[1]->write((uint8_t*)msg.c_str(), msg.length()); 00154 ASSERT_EQ(written, (int32_t)msg.length()); 00155 00156 while (!done_read) 00157 { 00158 boost::this_thread::sleep(boost::posix_time::milliseconds(1)); 00159 } 00160 00161 ASSERT_EQ(done_read, true); 00162 ASSERT_EQ(read_out, (int32_t)msg.length()); 00163 ASSERT_STREQ((const char*)read_buf.get(), msg.c_str()); 00164 } 00165 } 00166 00167 TEST_F(Synchronous, readAfterClose) 00168 { 00169 transports_[1]->close(); 00170 00171 uint8_t buf[5]; 00172 int32_t read = transports_[1]->read(buf, 1); 00173 ASSERT_EQ(read, -1); 00174 } 00175 00176 TEST_F(Synchronous, writeAfterClose) 00177 { 00178 transports_[1]->close(); 00179 00180 std::string msg = "test"; 00181 int32_t written = transports_[1]->write((uint8_t*)msg.c_str(), msg.length()); 00182 ASSERT_EQ(written, -1); 00183 } 00184 00185 class Polled : public testing::Test 00186 { 00187 public: 00188 Polled() 00189 { 00190 } 00191 00192 ~Polled() 00193 { 00194 } 00195 00196 00197 protected: 00198 00199 void connectionReceived(const TransportTCPPtr& transport) 00200 { 00201 transports_[2] = transport; 00202 } 00203 00204 void pollThread() 00205 { 00206 while (continue_) 00207 { 00208 poll_set_.update(10); 00209 } 00210 } 00211 00212 void onReadable(const TransportPtr& transport, int index) 00213 { 00214 ASSERT_EQ(transport, transports_[index]); 00215 00216 uint8_t b = 0; 00217 while (transport->read(&b, 1) > 0) 00218 { 00219 ++bytes_read_[index]; 00220 } 00221 } 00222 00223 void onWriteable(const TransportPtr& transport, int index) 00224 { 00225 ASSERT_EQ(transport, transports_[index]); 00226 00227 uint8_t b = 0; 00228 transport->write(&b, 1); 00229 00230 ++bytes_written_[index]; 00231 } 00232 00233 void onDisconnect(const TransportPtr& transport, int index) 00234 { 00235 ASSERT_EQ(transport, transports_[index]); 00236 00237 disconnected_[index] = true; 00238 } 00239 00240 virtual void SetUp() 00241 { 00242 bytes_read_[0] = 0; 00243 bytes_read_[1] = 0; 00244 bytes_read_[2] = 0; 00245 00246 bytes_written_[0] = 0; 00247 bytes_written_[1] = 0; 00248 bytes_written_[2] = 0; 00249 00250 disconnected_[0] = false; 00251 disconnected_[1] = false; 00252 disconnected_[2] = false; 00253 00254 transports_[0] = TransportTCPPtr(new TransportTCP(&poll_set_)); 00255 transports_[1] = TransportTCPPtr(new TransportTCP(&poll_set_)); 00256 00257 if (!transports_[0]->listen(0, 100, boost::bind(&Polled::connectionReceived, this, _1))) 00258 { 00259 FAIL(); 00260 } 00261 00262 if (!transports_[1]->connect("localhost", transports_[0]->getServerPort())) 00263 { 00264 FAIL(); 00265 } 00266 00267 continue_ = true; 00268 poll_thread_ = boost::thread(boost::bind(&Polled::pollThread, this)); 00269 00270 int count = 0; 00271 while (!transports_[2] && count < 100) 00272 { 00273 boost::this_thread::sleep(boost::posix_time::milliseconds(50)); 00274 } 00275 00276 if (!transports_[2]) 00277 { 00278 FAIL(); 00279 } 00280 00281 transports_[1]->setReadCallback(boost::bind(&Polled::onReadable, this, _1, 1)); 00282 transports_[2]->setReadCallback(boost::bind(&Polled::onReadable, this, _1, 2)); 00283 transports_[1]->setWriteCallback(boost::bind(&Polled::onWriteable, this, _1, 1)); 00284 transports_[2]->setWriteCallback(boost::bind(&Polled::onWriteable, this, _1, 2)); 00285 transports_[1]->setDisconnectCallback(boost::bind(&Polled::onDisconnect, this, _1, 1)); 00286 transports_[2]->setDisconnectCallback(boost::bind(&Polled::onDisconnect, this, _1, 2)); 00287 00288 transports_[1]->enableRead(); 00289 transports_[2]->enableRead(); 00290 } 00291 00292 virtual void TearDown() 00293 { 00294 for (int i = 0; i < 3; ++i) 00295 { 00296 transports_[i]->close(); 00297 } 00298 00299 continue_ = false; 00300 poll_thread_.join(); 00301 } 00302 00303 TransportTCPPtr transports_[3]; 00304 int bytes_read_[3]; 00305 int bytes_written_[3]; 00306 bool disconnected_[3]; 00307 00308 PollSet poll_set_; 00309 00310 boost::thread poll_thread_; 00311 volatile bool continue_; 00312 }; 00313 00314 TEST_F(Polled, readAndWrite) 00315 { 00316 transports_[1]->enableWrite(); 00317 boost::this_thread::sleep(boost::posix_time::milliseconds(50)); 00318 transports_[1]->disableWrite(); 00319 00320 boost::this_thread::sleep(boost::posix_time::milliseconds(50)); 00321 ASSERT_GT(bytes_read_[2], 0); 00322 ASSERT_EQ(bytes_read_[2], bytes_written_[1]); 00323 00324 int old_read_val = bytes_read_[2]; 00325 00326 transports_[2]->enableWrite(); 00327 boost::this_thread::sleep(boost::posix_time::milliseconds(50)); 00328 transports_[2]->disableWrite(); 00329 00330 boost::this_thread::sleep(boost::posix_time::milliseconds(50)); 00331 ASSERT_EQ(bytes_read_[1], bytes_written_[2]); 00332 ASSERT_EQ(old_read_val, bytes_read_[2]); 00333 00334 transports_[1]->enableWrite(); 00335 transports_[2]->enableWrite(); 00336 boost::this_thread::sleep(boost::posix_time::milliseconds(50)); 00337 transports_[1]->disableWrite(); 00338 transports_[2]->disableWrite(); 00339 00340 boost::this_thread::sleep(boost::posix_time::milliseconds(50)); 00341 ASSERT_GT(bytes_read_[2], 0); 00342 ASSERT_EQ(bytes_read_[2], bytes_written_[1]); 00343 ASSERT_GT(bytes_read_[1], 0); 00344 ASSERT_EQ(bytes_read_[1], bytes_written_[2]); 00345 } 00346 00347 TEST_F(Polled, enableDisableWrite) 00348 { 00349 boost::this_thread::sleep(boost::posix_time::milliseconds(50)); 00350 ASSERT_EQ(bytes_read_[1], 0); 00351 ASSERT_EQ(bytes_read_[2], 0); 00352 ASSERT_EQ(bytes_written_[1], 0); 00353 ASSERT_EQ(bytes_written_[2], 0); 00354 00355 transports_[1]->enableWrite(); 00356 boost::this_thread::sleep(boost::posix_time::milliseconds(50)); 00357 transports_[1]->disableWrite(); 00358 00359 boost::this_thread::sleep(boost::posix_time::milliseconds(50)); 00360 ASSERT_GT(bytes_read_[2], 0); 00361 ASSERT_GT(bytes_written_[1], 0); 00362 int old_read_val = bytes_read_[2]; 00363 int old_written_val = bytes_written_[1]; 00364 boost::this_thread::sleep(boost::posix_time::milliseconds(50)); 00365 ASSERT_EQ(bytes_read_[2], old_read_val); 00366 ASSERT_EQ(bytes_written_[1], old_written_val); 00367 } 00368 00369 TEST_F(Polled, disconnectNoTraffic) 00370 { 00371 ASSERT_EQ(disconnected_[1], false); 00372 ASSERT_EQ(disconnected_[2], false); 00373 00374 transports_[1]->close(); 00375 ASSERT_EQ(disconnected_[1], true); 00376 00377 boost::this_thread::sleep(boost::posix_time::milliseconds(50)); 00378 00379 ASSERT_EQ(disconnected_[2], true); 00380 } 00381 00382 TEST_F(Polled, disconnectWriter) 00383 { 00384 ASSERT_EQ(disconnected_[1], false); 00385 ASSERT_EQ(disconnected_[2], false); 00386 00387 transports_[1]->enableWrite(); 00388 boost::this_thread::sleep(boost::posix_time::milliseconds(50)); 00389 ASSERT_GT(bytes_read_[2], 0); 00390 00391 transports_[1]->close(); 00392 ASSERT_EQ(disconnected_[1], true); 00393 00394 boost::this_thread::sleep(boost::posix_time::milliseconds(50)); 00395 00396 ASSERT_EQ(disconnected_[2], true); 00397 } 00398 00399 TEST_F(Polled, disconnectReader) 00400 { 00401 ASSERT_EQ(disconnected_[1], false); 00402 ASSERT_EQ(disconnected_[2], false); 00403 00404 transports_[2]->enableWrite(); 00405 boost::this_thread::sleep(boost::posix_time::milliseconds(50)); 00406 ASSERT_GT(bytes_read_[1], 0); 00407 00408 transports_[1]->close(); 00409 ASSERT_EQ(disconnected_[1], true); 00410 00411 boost::this_thread::sleep(boost::posix_time::milliseconds(50)); 00412 00413 ASSERT_EQ(disconnected_[2], true); 00414 } 00415 00416 int main(int argc, char** argv) 00417 { 00418 testing::InitGoogleTest(&argc, argv); 00419 00420 signal(SIGPIPE, SIG_IGN); 00421 00422 return RUN_ALL_TESTS(); 00423 }