test_poll_set.cpp
Go to the documentation of this file.
00001 /*
00002  * Copyright (c) 2008, Willow Garage, Inc.
00003  * All rights reserved.
00004  *
00005  * Redistribution and use in source and binary forms, with or without
00006  * modification, are permitted provided that the following conditions are met:
00007  *
00008  *     * Redistributions of source code must retain the above copyright
00009  *       notice, this list of conditions and the following disclaimer.
00010  *     * Redistributions in binary form must reproduce the above copyright
00011  *       notice, this list of conditions and the following disclaimer in the
00012  *       documentation and/or other materials provided with the distribution.
00013  *     * Neither the name of Willow Garage, Inc. nor the names of its
00014  *       contributors may be used to endorse or promote products derived from
00015  *       this software without specific prior written permission.
00016  *
00017  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00018  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00019  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00020  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00021  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00022  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00023  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00024  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00025  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00026  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00027  * POSSIBILITY OF SUCH DAMAGE.
00028  */
00029 
00030 /* Author: Josh Faust */
00031 
00032 /*
00033  * Test version macros
00034  */
00035 
00036 #include <gtest/gtest.h>
00037 #include "ros/poll_set.h"
00038 #include <sys/socket.h>
00039 
00040 #include <fcntl.h>
00041 
00042 #include <boost/bind.hpp>
00043 #include <boost/thread.hpp>
00044 
00045 using namespace ros;
00046 
00047 class Poller : public testing::Test
00048 {
00049 public:
00050   Poller()
00051   {
00052   }
00053 
00054   ~Poller()
00055   {
00056     ::close(sockets_[0]);
00057     ::close(sockets_[1]);
00058   }
00059 
00060   void waitThenSignal()
00061   {
00062     usleep(100000);
00063 
00064     poll_set_.signal();
00065   }
00066 
00067 protected:
00068 
00069   virtual void SetUp()
00070   {
00071     if(socketpair(AF_UNIX, SOCK_STREAM, 0, sockets_) != 0)
00072     {
00073       FAIL();
00074     }
00075     if(fcntl(sockets_[0], F_SETFL, O_NONBLOCK) == -1)
00076     {
00077       FAIL();
00078     }
00079     if(fcntl(sockets_[1], F_SETFL, O_NONBLOCK) == -1)
00080     {
00081       FAIL();
00082     }
00083   }
00084 
00085   PollSet poll_set_;
00086 
00087   int sockets_[2];
00088 
00089 
00090 };
00091 
00092 class SocketHelper
00093 {
00094 public:
00095   SocketHelper(int sock)
00096   : bytes_read_(0)
00097   , bytes_written_(0)
00098   , pollouts_received_(0)
00099   , socket_(sock)
00100   {}
00101 
00102   void processEvents(int events)
00103   {
00104     if (events & POLLIN)
00105     {
00106       char b;
00107       while(read(socket_, &b, 1) > 0)
00108       {
00109         ++bytes_read_;
00110       };
00111     }
00112 
00113     if (events & POLLOUT)
00114     {
00115       ++pollouts_received_;
00116 
00117       write();
00118     }
00119   }
00120 
00121   void write()
00122   {
00123     char b = 0;
00124     if (::write(socket_, &b, 1) > 0)
00125     {
00126       ++bytes_written_;
00127     }
00128   }
00129 
00130   int bytes_read_;
00131   int bytes_written_;
00132   int pollouts_received_;
00133   int socket_;
00134 };
00135 
00136 TEST_F(Poller, read)
00137 {
00138   SocketHelper sh(sockets_[0]);
00139   ASSERT_TRUE(poll_set_.addSocket(sh.socket_, boost::bind(&SocketHelper::processEvents, &sh, _1)));
00140 
00141   char b = 0;
00142 
00143   write(sockets_[1], &b, 1);
00144   poll_set_.update(1);
00145 
00146   ASSERT_EQ(sh.bytes_read_, 0);
00147 
00148   ASSERT_TRUE(poll_set_.addEvents(sh.socket_, POLLIN));
00149   poll_set_.update(1);
00150   ASSERT_EQ(sh.bytes_read_, 1);
00151 
00152   write(sockets_[1], &b, 1);
00153   poll_set_.update(1);
00154   ASSERT_EQ(sh.bytes_read_, 2);
00155 
00156   ASSERT_TRUE(poll_set_.delEvents(sh.socket_, POLLIN));
00157   write(sockets_[1], &b, 1);
00158   poll_set_.update(1);
00159   ASSERT_EQ(sh.bytes_read_, 2);
00160 
00161   ASSERT_TRUE(poll_set_.addEvents(sh.socket_, POLLIN));
00162   poll_set_.update(1);
00163   ASSERT_EQ(sh.bytes_read_, 3);
00164 
00165   ASSERT_TRUE(poll_set_.delSocket(sockets_[0]));
00166   poll_set_.update(1);
00167   ASSERT_EQ(sh.bytes_read_, 3);
00168 }
00169 
00170 TEST_F(Poller, write)
00171 {
00172   SocketHelper sh(sockets_[0]);
00173   ASSERT_TRUE(poll_set_.addSocket(sh.socket_, boost::bind(&SocketHelper::processEvents, &sh, _1)));
00174   ASSERT_TRUE(poll_set_.addEvents(sh.socket_, POLLOUT));
00175 
00176   poll_set_.update(1);
00177 
00178   ASSERT_EQ(sh.pollouts_received_, 1);
00179   ASSERT_EQ(sh.bytes_written_, 1);
00180 
00181   ASSERT_TRUE(poll_set_.delEvents(sh.socket_, POLLOUT));
00182   poll_set_.update(1);
00183   ASSERT_EQ(sh.pollouts_received_, 1);
00184   ASSERT_EQ(sh.bytes_written_, 1);
00185 }
00186 
00187 TEST_F(Poller, readAndWrite)
00188 {
00189   SocketHelper sh1(sockets_[0]);
00190   SocketHelper sh2(sockets_[1]);
00191   ASSERT_TRUE(poll_set_.addSocket(sh1.socket_, boost::bind(&SocketHelper::processEvents, &sh1, _1)));
00192   ASSERT_TRUE(poll_set_.addSocket(sh2.socket_, boost::bind(&SocketHelper::processEvents, &sh2, _1)));
00193 
00194   ASSERT_TRUE(poll_set_.addEvents(sh1.socket_, POLLIN));
00195   ASSERT_TRUE(poll_set_.addEvents(sh2.socket_, POLLIN));
00196 
00197   sh1.write();
00198   sh2.write();
00199 
00200   ASSERT_EQ(sh1.bytes_written_, 1);
00201   ASSERT_EQ(sh2.bytes_written_, 1);
00202 
00203   poll_set_.update(1);
00204 
00205   ASSERT_EQ(sh1.bytes_read_, 1);
00206   ASSERT_EQ(sh2.bytes_read_, 1);
00207 
00208   ASSERT_TRUE(poll_set_.addEvents(sh1.socket_, POLLOUT));
00209   ASSERT_TRUE(poll_set_.addEvents(sh2.socket_, POLLOUT));
00210 
00211   poll_set_.update(1);
00212 
00213   ASSERT_EQ(sh1.bytes_written_, 2);
00214   ASSERT_EQ(sh2.bytes_written_, 2);
00215 
00216   ASSERT_TRUE(poll_set_.delEvents(sh1.socket_, POLLOUT));
00217   ASSERT_TRUE(poll_set_.delEvents(sh2.socket_, POLLOUT));
00218 
00219   poll_set_.update(1);
00220 
00221   ASSERT_EQ(sh1.bytes_read_, 2);
00222   ASSERT_EQ(sh2.bytes_read_, 2);
00223 }
00224 
00225 TEST_F(Poller, multiAddDel)
00226 {
00227   SocketHelper sh(sockets_[0]);
00228   ASSERT_TRUE(poll_set_.addSocket(sh.socket_, boost::bind(&SocketHelper::processEvents, &sh, _1)));
00229   ASSERT_FALSE(poll_set_.addSocket(sh.socket_, boost::bind(&SocketHelper::processEvents, &sh, _1)));
00230 
00231   ASSERT_TRUE(poll_set_.addEvents(sh.socket_, 0));
00232   ASSERT_FALSE(poll_set_.addEvents(sh.socket_ + 1, 0));
00233 
00234   ASSERT_TRUE(poll_set_.delEvents(sh.socket_, 0));
00235   ASSERT_FALSE(poll_set_.delEvents(sh.socket_ + 1, 0));
00236 
00237   ASSERT_FALSE(poll_set_.delSocket(sh.socket_ + 1));
00238   ASSERT_TRUE(poll_set_.delSocket(sh.socket_));
00239 }
00240 
00241 void addThread(PollSet* ps, SocketHelper* sh, boost::barrier* barrier)
00242 {
00243   barrier->wait();
00244 
00245   ps->addSocket(sh->socket_, boost::bind(&SocketHelper::processEvents, sh, _1));
00246   ps->addEvents(sh->socket_, POLLIN);
00247   ps->addEvents(sh->socket_, POLLOUT);
00248 }
00249 
00250 void delThread(PollSet* ps, SocketHelper* sh, boost::barrier* barrier)
00251 {
00252   barrier->wait();
00253 
00254   ps->delEvents(sh->socket_, POLLIN);
00255   ps->delEvents(sh->socket_, POLLOUT);
00256   ps->delSocket(sh->socket_);
00257 }
00258 
00259 TEST_F(Poller, addDelMultiThread)
00260 {
00261   for (int i = 0; i < 100; ++i)
00262   {
00263     SocketHelper sh1(sockets_[0]);
00264     SocketHelper sh2(sockets_[1]);
00265 
00266     const int thread_count = 100;
00267 
00268     {
00269       boost::barrier barrier(thread_count + 1);
00270 
00271       boost::thread_group tg;
00272       for (int i = 0; i < thread_count/2; ++i)
00273       {
00274         tg.create_thread(boost::bind(addThread, &poll_set_, &sh1, &barrier));
00275         tg.create_thread(boost::bind(addThread, &poll_set_, &sh2, &barrier));
00276       }
00277 
00278       barrier.wait();
00279 
00280       tg.join_all();
00281 
00282       poll_set_.update(1);
00283 
00284       ASSERT_TRUE(sh1.bytes_read_ == 0 || sh1.bytes_read_ == 1);
00285       ASSERT_TRUE(sh2.bytes_read_ == 0 || sh2.bytes_read_ == 1);
00286       ASSERT_EQ(sh1.bytes_written_, 1);
00287       ASSERT_EQ(sh2.bytes_written_, 1);
00288 
00289       poll_set_.update(1);
00290 
00291       ASSERT_TRUE(sh1.bytes_read_ == 1 || sh1.bytes_read_ == 2);
00292       ASSERT_TRUE(sh2.bytes_read_ == 1 || sh2.bytes_read_ == 2);
00293       ASSERT_EQ(sh1.bytes_written_, 2);
00294       ASSERT_EQ(sh2.bytes_written_, 2);
00295     }
00296 
00297     {
00298       boost::barrier barrier(thread_count + 1);
00299 
00300       boost::thread_group tg;
00301       for (int i = 0; i < thread_count/2; ++i)
00302       {
00303         tg.create_thread(boost::bind(delThread, &poll_set_, &sh1, &barrier));
00304         tg.create_thread(boost::bind(delThread, &poll_set_, &sh2, &barrier));
00305       }
00306 
00307       barrier.wait();
00308 
00309       tg.join_all();
00310 
00311       poll_set_.update(1);
00312 
00313       ASSERT_TRUE(sh1.bytes_read_ == 1 || sh1.bytes_read_ == 2);
00314       ASSERT_TRUE(sh2.bytes_read_ == 1 || sh2.bytes_read_ == 2);
00315       ASSERT_EQ(sh1.bytes_written_, 2);
00316       ASSERT_EQ(sh2.bytes_written_, 2);
00317     }
00318   }
00319 }
00320 
00321 void addDelManyTimesThread(PollSet* ps, SocketHelper* sh1, SocketHelper* sh2, boost::barrier* barrier, int count, volatile bool* done)
00322 {
00323   *done = false;
00324 
00325   barrier->wait();
00326 
00327   for (int i = 0; i < count; ++i)
00328   {
00329     ps->addSocket(sh1->socket_, boost::bind(&SocketHelper::processEvents, sh1, _1));
00330     ps->addEvents(sh1->socket_, POLLIN);
00331     ps->addEvents(sh1->socket_, POLLOUT);
00332 
00333     ps->addSocket(sh2->socket_, boost::bind(&SocketHelper::processEvents, sh2, _1));
00334     ps->addEvents(sh2->socket_, POLLIN);
00335     ps->addEvents(sh2->socket_, POLLOUT);
00336 
00337     boost::this_thread::sleep(boost::posix_time::microseconds(100));
00338 
00339     ps->delEvents(sh1->socket_, POLLIN);
00340     ps->delEvents(sh1->socket_, POLLOUT);
00341     ps->delSocket(sh1->socket_);
00342 
00343     ps->delEvents(sh2->socket_, POLLIN);
00344     ps->delEvents(sh2->socket_, POLLOUT);
00345     ps->delSocket(sh2->socket_);
00346   }
00347 
00348   *done = true;
00349 }
00350 
00351 TEST_F(Poller, updateWhileAddDel)
00352 {
00353   SocketHelper sh1(sockets_[0]);
00354   SocketHelper sh2(sockets_[1]);
00355 
00356   boost::barrier barrier(2);
00357   volatile bool done = false;
00358   const int count = 1000;
00359 
00360   boost::thread t(boost::bind(addDelManyTimesThread, &poll_set_, &sh1, &sh2, &barrier, count, &done));
00361 
00362   barrier.wait();
00363 
00364   while (!done)
00365   {
00366     poll_set_.update(1);
00367   }
00368 
00369   ASSERT_TRUE(sh1.bytes_read_ > 0);
00370   ASSERT_TRUE(sh1.bytes_written_ > 0);
00371   ASSERT_TRUE(sh2.bytes_read_ > 0);
00372   ASSERT_TRUE(sh2.bytes_written_ > 0);
00373 }
00374 
00375 TEST_F(Poller, signal)
00376 {
00377   // first one clears out any calls to signal() caused by construction
00378   poll_set_.update(0);
00379 
00380   boost::thread t(boost::bind(&Poller::waitThenSignal, this));
00381   poll_set_.update(-1);
00382 }
00383 
00384 
00385 int main(int argc, char** argv)
00386 {
00387   testing::InitGoogleTest(&argc, argv);
00388 
00389   signal(SIGPIPE, SIG_IGN);
00390 
00391   return RUN_ALL_TESTS();
00392 }
00393 


test_roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Mon Oct 6 2014 11:47:21