$search
00001 /* 00002 * Copyright (c) 2008, Willow Garage, Inc. 00003 * All rights reserved. 00004 * 00005 * Redistribution and use in source and binary forms, with or without 00006 * modification, are permitted provided that the following conditions are met: 00007 * 00008 * * Redistributions of source code must retain the above copyright 00009 * notice, this list of conditions and the following disclaimer. 00010 * * Redistributions in binary form must reproduce the above copyright 00011 * notice, this list of conditions and the following disclaimer in the 00012 * documentation and/or other materials provided with the distribution. 00013 * * Neither the name of Willow Garage, Inc. nor the names of its 00014 * contributors may be used to endorse or promote products derived from 00015 * this software without specific prior written permission. 00016 * 00017 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 00018 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 00019 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 00020 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 00021 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 00022 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 00023 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00024 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 00025 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 00026 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00027 * POSSIBILITY OF SUCH DAMAGE. 00028 */ 00029 00030 /* Author: Josh Faust */ 00031 00032 /* 00033 * Test version macros 00034 */ 00035 00036 #include <gtest/gtest.h> 00037 #include "ros/poll_set.h" 00038 #include <sys/socket.h> 00039 00040 #include <fcntl.h> 00041 00042 #include <boost/bind.hpp> 00043 #include <boost/thread.hpp> 00044 00045 using namespace ros; 00046 00047 class Poller : public testing::Test 00048 { 00049 public: 00050 Poller() 00051 { 00052 } 00053 00054 ~Poller() 00055 { 00056 ::close(sockets_[0]); 00057 ::close(sockets_[1]); 00058 } 00059 00060 void waitThenSignal() 00061 { 00062 usleep(100000); 00063 00064 poll_set_.signal(); 00065 } 00066 00067 protected: 00068 00069 virtual void SetUp() 00070 { 00071 if(socketpair(AF_UNIX, SOCK_STREAM, 0, sockets_) != 0) 00072 { 00073 FAIL(); 00074 } 00075 if(fcntl(sockets_[0], F_SETFL, O_NONBLOCK) == -1) 00076 { 00077 FAIL(); 00078 } 00079 if(fcntl(sockets_[1], F_SETFL, O_NONBLOCK) == -1) 00080 { 00081 FAIL(); 00082 } 00083 } 00084 00085 PollSet poll_set_; 00086 00087 int sockets_[2]; 00088 00089 00090 }; 00091 00092 class SocketHelper 00093 { 00094 public: 00095 SocketHelper(int sock) 00096 : bytes_read_(0) 00097 , bytes_written_(0) 00098 , pollouts_received_(0) 00099 , socket_(sock) 00100 {} 00101 00102 void processEvents(int events) 00103 { 00104 if (events & POLLIN) 00105 { 00106 char b; 00107 while(read(socket_, &b, 1) > 0) 00108 { 00109 ++bytes_read_; 00110 }; 00111 } 00112 00113 if (events & POLLOUT) 00114 { 00115 ++pollouts_received_; 00116 00117 write(); 00118 } 00119 } 00120 00121 void write() 00122 { 00123 char b = 0; 00124 if (::write(socket_, &b, 1) > 0) 00125 { 00126 ++bytes_written_; 00127 } 00128 } 00129 00130 int bytes_read_; 00131 int bytes_written_; 00132 int pollouts_received_; 00133 int socket_; 00134 }; 00135 00136 TEST_F(Poller, read) 00137 { 00138 SocketHelper sh(sockets_[0]); 00139 ASSERT_TRUE(poll_set_.addSocket(sh.socket_, boost::bind(&SocketHelper::processEvents, &sh, _1))); 00140 00141 char b = 0; 00142 00143 write(sockets_[1], &b, 1); 00144 poll_set_.update(1); 00145 00146 ASSERT_EQ(sh.bytes_read_, 0); 00147 00148 ASSERT_TRUE(poll_set_.addEvents(sh.socket_, POLLIN)); 00149 poll_set_.update(1); 00150 ASSERT_EQ(sh.bytes_read_, 1); 00151 00152 write(sockets_[1], &b, 1); 00153 poll_set_.update(1); 00154 ASSERT_EQ(sh.bytes_read_, 2); 00155 00156 ASSERT_TRUE(poll_set_.delEvents(sh.socket_, POLLIN)); 00157 write(sockets_[1], &b, 1); 00158 poll_set_.update(1); 00159 ASSERT_EQ(sh.bytes_read_, 2); 00160 00161 ASSERT_TRUE(poll_set_.addEvents(sh.socket_, POLLIN)); 00162 poll_set_.update(1); 00163 ASSERT_EQ(sh.bytes_read_, 3); 00164 00165 ASSERT_TRUE(poll_set_.delSocket(sockets_[0])); 00166 poll_set_.update(1); 00167 ASSERT_EQ(sh.bytes_read_, 3); 00168 } 00169 00170 TEST_F(Poller, write) 00171 { 00172 SocketHelper sh(sockets_[0]); 00173 ASSERT_TRUE(poll_set_.addSocket(sh.socket_, boost::bind(&SocketHelper::processEvents, &sh, _1))); 00174 ASSERT_TRUE(poll_set_.addEvents(sh.socket_, POLLOUT)); 00175 00176 poll_set_.update(1); 00177 00178 ASSERT_EQ(sh.pollouts_received_, 1); 00179 ASSERT_EQ(sh.bytes_written_, 1); 00180 00181 ASSERT_TRUE(poll_set_.delEvents(sh.socket_, POLLOUT)); 00182 poll_set_.update(1); 00183 ASSERT_EQ(sh.pollouts_received_, 1); 00184 ASSERT_EQ(sh.bytes_written_, 1); 00185 } 00186 00187 TEST_F(Poller, readAndWrite) 00188 { 00189 SocketHelper sh1(sockets_[0]); 00190 SocketHelper sh2(sockets_[1]); 00191 ASSERT_TRUE(poll_set_.addSocket(sh1.socket_, boost::bind(&SocketHelper::processEvents, &sh1, _1))); 00192 ASSERT_TRUE(poll_set_.addSocket(sh2.socket_, boost::bind(&SocketHelper::processEvents, &sh2, _1))); 00193 00194 ASSERT_TRUE(poll_set_.addEvents(sh1.socket_, POLLIN)); 00195 ASSERT_TRUE(poll_set_.addEvents(sh2.socket_, POLLIN)); 00196 00197 sh1.write(); 00198 sh2.write(); 00199 00200 ASSERT_EQ(sh1.bytes_written_, 1); 00201 ASSERT_EQ(sh2.bytes_written_, 1); 00202 00203 poll_set_.update(1); 00204 00205 ASSERT_EQ(sh1.bytes_read_, 1); 00206 ASSERT_EQ(sh2.bytes_read_, 1); 00207 00208 ASSERT_TRUE(poll_set_.addEvents(sh1.socket_, POLLOUT)); 00209 ASSERT_TRUE(poll_set_.addEvents(sh2.socket_, POLLOUT)); 00210 00211 poll_set_.update(1); 00212 00213 ASSERT_EQ(sh1.bytes_written_, 2); 00214 ASSERT_EQ(sh2.bytes_written_, 2); 00215 00216 ASSERT_TRUE(poll_set_.delEvents(sh1.socket_, POLLOUT)); 00217 ASSERT_TRUE(poll_set_.delEvents(sh2.socket_, POLLOUT)); 00218 00219 poll_set_.update(1); 00220 00221 ASSERT_EQ(sh1.bytes_read_, 2); 00222 ASSERT_EQ(sh2.bytes_read_, 2); 00223 } 00224 00225 TEST_F(Poller, multiAddDel) 00226 { 00227 SocketHelper sh(sockets_[0]); 00228 ASSERT_TRUE(poll_set_.addSocket(sh.socket_, boost::bind(&SocketHelper::processEvents, &sh, _1))); 00229 ASSERT_FALSE(poll_set_.addSocket(sh.socket_, boost::bind(&SocketHelper::processEvents, &sh, _1))); 00230 00231 ASSERT_TRUE(poll_set_.addEvents(sh.socket_, 0)); 00232 ASSERT_FALSE(poll_set_.addEvents(sh.socket_ + 1, 0)); 00233 00234 ASSERT_TRUE(poll_set_.delEvents(sh.socket_, 0)); 00235 ASSERT_FALSE(poll_set_.delEvents(sh.socket_ + 1, 0)); 00236 00237 ASSERT_FALSE(poll_set_.delSocket(sh.socket_ + 1)); 00238 ASSERT_TRUE(poll_set_.delSocket(sh.socket_)); 00239 } 00240 00241 void addThread(PollSet* ps, SocketHelper* sh, boost::barrier* barrier) 00242 { 00243 barrier->wait(); 00244 00245 ps->addSocket(sh->socket_, boost::bind(&SocketHelper::processEvents, sh, _1)); 00246 ps->addEvents(sh->socket_, POLLIN); 00247 ps->addEvents(sh->socket_, POLLOUT); 00248 } 00249 00250 void delThread(PollSet* ps, SocketHelper* sh, boost::barrier* barrier) 00251 { 00252 barrier->wait(); 00253 00254 ps->delEvents(sh->socket_, POLLIN); 00255 ps->delEvents(sh->socket_, POLLOUT); 00256 ps->delSocket(sh->socket_); 00257 } 00258 00259 TEST_F(Poller, addDelMultiThread) 00260 { 00261 for (int i = 0; i < 100; ++i) 00262 { 00263 SocketHelper sh1(sockets_[0]); 00264 SocketHelper sh2(sockets_[1]); 00265 00266 const int thread_count = 100; 00267 00268 { 00269 boost::barrier barrier(thread_count + 1); 00270 00271 boost::thread_group tg; 00272 for (int i = 0; i < thread_count/2; ++i) 00273 { 00274 tg.create_thread(boost::bind(addThread, &poll_set_, &sh1, &barrier)); 00275 tg.create_thread(boost::bind(addThread, &poll_set_, &sh2, &barrier)); 00276 } 00277 00278 barrier.wait(); 00279 00280 tg.join_all(); 00281 00282 poll_set_.update(1); 00283 00284 ASSERT_TRUE(sh1.bytes_read_ == 0 || sh1.bytes_read_ == 1); 00285 ASSERT_TRUE(sh2.bytes_read_ == 0 || sh2.bytes_read_ == 1); 00286 ASSERT_EQ(sh1.bytes_written_, 1); 00287 ASSERT_EQ(sh2.bytes_written_, 1); 00288 00289 poll_set_.update(1); 00290 00291 ASSERT_TRUE(sh1.bytes_read_ == 1 || sh1.bytes_read_ == 2); 00292 ASSERT_TRUE(sh2.bytes_read_ == 1 || sh2.bytes_read_ == 2); 00293 ASSERT_EQ(sh1.bytes_written_, 2); 00294 ASSERT_EQ(sh2.bytes_written_, 2); 00295 } 00296 00297 { 00298 boost::barrier barrier(thread_count + 1); 00299 00300 boost::thread_group tg; 00301 for (int i = 0; i < thread_count/2; ++i) 00302 { 00303 tg.create_thread(boost::bind(delThread, &poll_set_, &sh1, &barrier)); 00304 tg.create_thread(boost::bind(delThread, &poll_set_, &sh2, &barrier)); 00305 } 00306 00307 barrier.wait(); 00308 00309 tg.join_all(); 00310 00311 poll_set_.update(1); 00312 00313 ASSERT_TRUE(sh1.bytes_read_ == 1 || sh1.bytes_read_ == 2); 00314 ASSERT_TRUE(sh2.bytes_read_ == 1 || sh2.bytes_read_ == 2); 00315 ASSERT_EQ(sh1.bytes_written_, 2); 00316 ASSERT_EQ(sh2.bytes_written_, 2); 00317 } 00318 } 00319 } 00320 00321 void addDelManyTimesThread(PollSet* ps, SocketHelper* sh1, SocketHelper* sh2, boost::barrier* barrier, int count, volatile bool* done) 00322 { 00323 *done = false; 00324 00325 barrier->wait(); 00326 00327 for (int i = 0; i < count; ++i) 00328 { 00329 ps->addSocket(sh1->socket_, boost::bind(&SocketHelper::processEvents, sh1, _1)); 00330 ps->addEvents(sh1->socket_, POLLIN); 00331 ps->addEvents(sh1->socket_, POLLOUT); 00332 00333 ps->addSocket(sh2->socket_, boost::bind(&SocketHelper::processEvents, sh2, _1)); 00334 ps->addEvents(sh2->socket_, POLLIN); 00335 ps->addEvents(sh2->socket_, POLLOUT); 00336 00337 boost::this_thread::sleep(boost::posix_time::microseconds(100)); 00338 00339 ps->delEvents(sh1->socket_, POLLIN); 00340 ps->delEvents(sh1->socket_, POLLOUT); 00341 ps->delSocket(sh1->socket_); 00342 00343 ps->delEvents(sh2->socket_, POLLIN); 00344 ps->delEvents(sh2->socket_, POLLOUT); 00345 ps->delSocket(sh2->socket_); 00346 } 00347 00348 *done = true; 00349 } 00350 00351 TEST_F(Poller, updateWhileAddDel) 00352 { 00353 SocketHelper sh1(sockets_[0]); 00354 SocketHelper sh2(sockets_[1]); 00355 00356 boost::barrier barrier(2); 00357 volatile bool done = false; 00358 const int count = 1000; 00359 00360 boost::thread t(boost::bind(addDelManyTimesThread, &poll_set_, &sh1, &sh2, &barrier, count, &done)); 00361 00362 barrier.wait(); 00363 00364 while (!done) 00365 { 00366 poll_set_.update(1); 00367 } 00368 00369 ASSERT_TRUE(sh1.bytes_read_ > 0); 00370 ASSERT_TRUE(sh1.bytes_written_ > 0); 00371 ASSERT_TRUE(sh2.bytes_read_ > 0); 00372 ASSERT_TRUE(sh2.bytes_written_ > 0); 00373 } 00374 00375 TEST_F(Poller, signal) 00376 { 00377 // first one clears out any calls to signal() caused by construction 00378 poll_set_.update(0); 00379 00380 boost::thread t(boost::bind(&Poller::waitThenSignal, this)); 00381 poll_set_.update(-1); 00382 } 00383 00384 00385 int main(int argc, char** argv) 00386 { 00387 testing::InitGoogleTest(&argc, argv); 00388 00389 signal(SIGPIPE, SIG_IGN); 00390 00391 return RUN_ALL_TESTS(); 00392 } 00393