00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include <gtest/gtest.h>
00037 #include "ros/poll_set.h"
00038 #include <sys/socket.h>
00039
00040 #include <fcntl.h>
00041
00042 #include <boost/bind.hpp>
00043 #include <boost/thread.hpp>
00044
00045 using namespace ros;
00046
00047 class Poller : public testing::Test
00048 {
00049 public:
00050 Poller()
00051 {
00052 }
00053
00054 ~Poller()
00055 {
00056 ::close(sockets_[0]);
00057 ::close(sockets_[1]);
00058 }
00059
00060 void waitThenSignal()
00061 {
00062 usleep(100000);
00063
00064 poll_set_.signal();
00065 }
00066
00067 protected:
00068
00069 virtual void SetUp()
00070 {
00071 if(socketpair(AF_UNIX, SOCK_STREAM, 0, sockets_) != 0)
00072 {
00073 FAIL();
00074 }
00075 if(fcntl(sockets_[0], F_SETFL, O_NONBLOCK) == -1)
00076 {
00077 FAIL();
00078 }
00079 if(fcntl(sockets_[1], F_SETFL, O_NONBLOCK) == -1)
00080 {
00081 FAIL();
00082 }
00083 }
00084
00085 PollSet poll_set_;
00086
00087 int sockets_[2];
00088
00089
00090 };
00091
00092 class SocketHelper
00093 {
00094 public:
00095 SocketHelper(int sock)
00096 : bytes_read_(0)
00097 , bytes_written_(0)
00098 , pollouts_received_(0)
00099 , socket_(sock)
00100 {}
00101
00102 void processEvents(int events)
00103 {
00104 if (events & POLLIN)
00105 {
00106 char b;
00107 while(read(socket_, &b, 1) > 0)
00108 {
00109 ++bytes_read_;
00110 };
00111 }
00112
00113 if (events & POLLOUT)
00114 {
00115 ++pollouts_received_;
00116
00117 write();
00118 }
00119 }
00120
00121 void write()
00122 {
00123 char b = 0;
00124 if (::write(socket_, &b, 1) > 0)
00125 {
00126 ++bytes_written_;
00127 }
00128 }
00129
00130 int bytes_read_;
00131 int bytes_written_;
00132 int pollouts_received_;
00133 int socket_;
00134 };
00135
00136 TEST_F(Poller, read)
00137 {
00138 SocketHelper sh(sockets_[0]);
00139 ASSERT_TRUE(poll_set_.addSocket(sh.socket_, boost::bind(&SocketHelper::processEvents, &sh, _1)));
00140
00141 char b = 0;
00142
00143 write(sockets_[1], &b, 1);
00144 poll_set_.update(1);
00145
00146 ASSERT_EQ(sh.bytes_read_, 0);
00147
00148 ASSERT_TRUE(poll_set_.addEvents(sh.socket_, POLLIN));
00149 poll_set_.update(1);
00150 ASSERT_EQ(sh.bytes_read_, 1);
00151
00152 write(sockets_[1], &b, 1);
00153 poll_set_.update(1);
00154 ASSERT_EQ(sh.bytes_read_, 2);
00155
00156 ASSERT_TRUE(poll_set_.delEvents(sh.socket_, POLLIN));
00157 write(sockets_[1], &b, 1);
00158 poll_set_.update(1);
00159 ASSERT_EQ(sh.bytes_read_, 2);
00160
00161 ASSERT_TRUE(poll_set_.addEvents(sh.socket_, POLLIN));
00162 poll_set_.update(1);
00163 ASSERT_EQ(sh.bytes_read_, 3);
00164
00165 ASSERT_TRUE(poll_set_.delSocket(sockets_[0]));
00166 poll_set_.update(1);
00167 ASSERT_EQ(sh.bytes_read_, 3);
00168 }
00169
00170 TEST_F(Poller, write)
00171 {
00172 SocketHelper sh(sockets_[0]);
00173 ASSERT_TRUE(poll_set_.addSocket(sh.socket_, boost::bind(&SocketHelper::processEvents, &sh, _1)));
00174 ASSERT_TRUE(poll_set_.addEvents(sh.socket_, POLLOUT));
00175
00176 poll_set_.update(1);
00177
00178 ASSERT_EQ(sh.pollouts_received_, 1);
00179 ASSERT_EQ(sh.bytes_written_, 1);
00180
00181 ASSERT_TRUE(poll_set_.delEvents(sh.socket_, POLLOUT));
00182 poll_set_.update(1);
00183 ASSERT_EQ(sh.pollouts_received_, 1);
00184 ASSERT_EQ(sh.bytes_written_, 1);
00185 }
00186
00187 TEST_F(Poller, readAndWrite)
00188 {
00189 SocketHelper sh1(sockets_[0]);
00190 SocketHelper sh2(sockets_[1]);
00191 ASSERT_TRUE(poll_set_.addSocket(sh1.socket_, boost::bind(&SocketHelper::processEvents, &sh1, _1)));
00192 ASSERT_TRUE(poll_set_.addSocket(sh2.socket_, boost::bind(&SocketHelper::processEvents, &sh2, _1)));
00193
00194 ASSERT_TRUE(poll_set_.addEvents(sh1.socket_, POLLIN));
00195 ASSERT_TRUE(poll_set_.addEvents(sh2.socket_, POLLIN));
00196
00197 sh1.write();
00198 sh2.write();
00199
00200 ASSERT_EQ(sh1.bytes_written_, 1);
00201 ASSERT_EQ(sh2.bytes_written_, 1);
00202
00203 poll_set_.update(1);
00204
00205 ASSERT_EQ(sh1.bytes_read_, 1);
00206 ASSERT_EQ(sh2.bytes_read_, 1);
00207
00208 ASSERT_TRUE(poll_set_.addEvents(sh1.socket_, POLLOUT));
00209 ASSERT_TRUE(poll_set_.addEvents(sh2.socket_, POLLOUT));
00210
00211 poll_set_.update(1);
00212
00213 ASSERT_EQ(sh1.bytes_written_, 2);
00214 ASSERT_EQ(sh2.bytes_written_, 2);
00215
00216 ASSERT_TRUE(poll_set_.delEvents(sh1.socket_, POLLOUT));
00217 ASSERT_TRUE(poll_set_.delEvents(sh2.socket_, POLLOUT));
00218
00219 poll_set_.update(1);
00220
00221 ASSERT_EQ(sh1.bytes_read_, 2);
00222 ASSERT_EQ(sh2.bytes_read_, 2);
00223 }
00224
00225 TEST_F(Poller, multiAddDel)
00226 {
00227 SocketHelper sh(sockets_[0]);
00228 ASSERT_TRUE(poll_set_.addSocket(sh.socket_, boost::bind(&SocketHelper::processEvents, &sh, _1)));
00229 ASSERT_FALSE(poll_set_.addSocket(sh.socket_, boost::bind(&SocketHelper::processEvents, &sh, _1)));
00230
00231 ASSERT_TRUE(poll_set_.addEvents(sh.socket_, 0));
00232 ASSERT_FALSE(poll_set_.addEvents(sh.socket_ + 1, 0));
00233
00234 ASSERT_TRUE(poll_set_.delEvents(sh.socket_, 0));
00235 ASSERT_FALSE(poll_set_.delEvents(sh.socket_ + 1, 0));
00236
00237 ASSERT_FALSE(poll_set_.delSocket(sh.socket_ + 1));
00238 ASSERT_TRUE(poll_set_.delSocket(sh.socket_));
00239 }
00240
00241 void addThread(PollSet* ps, SocketHelper* sh, boost::barrier* barrier)
00242 {
00243 barrier->wait();
00244
00245 ps->addSocket(sh->socket_, boost::bind(&SocketHelper::processEvents, sh, _1));
00246 ps->addEvents(sh->socket_, POLLIN);
00247 ps->addEvents(sh->socket_, POLLOUT);
00248 }
00249
00250 void delThread(PollSet* ps, SocketHelper* sh, boost::barrier* barrier)
00251 {
00252 barrier->wait();
00253
00254 ps->delEvents(sh->socket_, POLLIN);
00255 ps->delEvents(sh->socket_, POLLOUT);
00256 ps->delSocket(sh->socket_);
00257 }
00258
00259 TEST_F(Poller, addDelMultiThread)
00260 {
00261 for (int i = 0; i < 100; ++i)
00262 {
00263 SocketHelper sh1(sockets_[0]);
00264 SocketHelper sh2(sockets_[1]);
00265
00266 const int thread_count = 100;
00267
00268 {
00269 boost::barrier barrier(thread_count + 1);
00270
00271 boost::thread_group tg;
00272 for (int i = 0; i < thread_count/2; ++i)
00273 {
00274 tg.create_thread(boost::bind(addThread, &poll_set_, &sh1, &barrier));
00275 tg.create_thread(boost::bind(addThread, &poll_set_, &sh2, &barrier));
00276 }
00277
00278 barrier.wait();
00279
00280 tg.join_all();
00281
00282 poll_set_.update(1);
00283
00284 ASSERT_TRUE(sh1.bytes_read_ == 0 || sh1.bytes_read_ == 1);
00285 ASSERT_TRUE(sh2.bytes_read_ == 0 || sh2.bytes_read_ == 1);
00286 ASSERT_EQ(sh1.bytes_written_, 1);
00287 ASSERT_EQ(sh2.bytes_written_, 1);
00288
00289 poll_set_.update(1);
00290
00291 ASSERT_TRUE(sh1.bytes_read_ == 1 || sh1.bytes_read_ == 2);
00292 ASSERT_TRUE(sh2.bytes_read_ == 1 || sh2.bytes_read_ == 2);
00293 ASSERT_EQ(sh1.bytes_written_, 2);
00294 ASSERT_EQ(sh2.bytes_written_, 2);
00295 }
00296
00297 {
00298 boost::barrier barrier(thread_count + 1);
00299
00300 boost::thread_group tg;
00301 for (int i = 0; i < thread_count/2; ++i)
00302 {
00303 tg.create_thread(boost::bind(delThread, &poll_set_, &sh1, &barrier));
00304 tg.create_thread(boost::bind(delThread, &poll_set_, &sh2, &barrier));
00305 }
00306
00307 barrier.wait();
00308
00309 tg.join_all();
00310
00311 poll_set_.update(1);
00312
00313 ASSERT_TRUE(sh1.bytes_read_ == 1 || sh1.bytes_read_ == 2);
00314 ASSERT_TRUE(sh2.bytes_read_ == 1 || sh2.bytes_read_ == 2);
00315 ASSERT_EQ(sh1.bytes_written_, 2);
00316 ASSERT_EQ(sh2.bytes_written_, 2);
00317 }
00318 }
00319 }
00320
00321 void addDelManyTimesThread(PollSet* ps, SocketHelper* sh1, SocketHelper* sh2, boost::barrier* barrier, int count, volatile bool* done)
00322 {
00323 *done = false;
00324
00325 barrier->wait();
00326
00327 for (int i = 0; i < count; ++i)
00328 {
00329 ps->addSocket(sh1->socket_, boost::bind(&SocketHelper::processEvents, sh1, _1));
00330 ps->addEvents(sh1->socket_, POLLIN);
00331 ps->addEvents(sh1->socket_, POLLOUT);
00332
00333 ps->addSocket(sh2->socket_, boost::bind(&SocketHelper::processEvents, sh2, _1));
00334 ps->addEvents(sh2->socket_, POLLIN);
00335 ps->addEvents(sh2->socket_, POLLOUT);
00336
00337 boost::this_thread::sleep(boost::posix_time::microseconds(100));
00338
00339 ps->delEvents(sh1->socket_, POLLIN);
00340 ps->delEvents(sh1->socket_, POLLOUT);
00341 ps->delSocket(sh1->socket_);
00342
00343 ps->delEvents(sh2->socket_, POLLIN);
00344 ps->delEvents(sh2->socket_, POLLOUT);
00345 ps->delSocket(sh2->socket_);
00346 }
00347
00348 *done = true;
00349 }
00350
00351 TEST_F(Poller, updateWhileAddDel)
00352 {
00353 SocketHelper sh1(sockets_[0]);
00354 SocketHelper sh2(sockets_[1]);
00355
00356 boost::barrier barrier(2);
00357 volatile bool done = false;
00358 const int count = 1000;
00359
00360 boost::thread t(boost::bind(addDelManyTimesThread, &poll_set_, &sh1, &sh2, &barrier, count, &done));
00361
00362 barrier.wait();
00363
00364 while (!done)
00365 {
00366 poll_set_.update(1);
00367 }
00368
00369 ASSERT_TRUE(sh1.bytes_read_ > 0);
00370 ASSERT_TRUE(sh1.bytes_written_ > 0);
00371 ASSERT_TRUE(sh2.bytes_read_ > 0);
00372 ASSERT_TRUE(sh2.bytes_written_ > 0);
00373 }
00374
00375 TEST_F(Poller, signal)
00376 {
00377
00378 poll_set_.update(0);
00379
00380 boost::thread t(boost::bind(&Poller::waitThenSignal, this));
00381 poll_set_.update(-1);
00382 }
00383
00384
00385 int main(int argc, char** argv)
00386 {
00387 testing::InitGoogleTest(&argc, argv);
00388
00389 signal(SIGPIPE, SIG_IGN);
00390
00391 return RUN_ALL_TESTS();
00392 }
00393