test_poll_set.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2008, Willow Garage, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * * Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  * * Redistributions in binary form must reproduce the above copyright
11  * notice, this list of conditions and the following disclaimer in the
12  * documentation and/or other materials provided with the distribution.
13  * * Neither the name of Willow Garage, Inc. nor the names of its
14  * contributors may be used to endorse or promote products derived from
15  * this software without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 /* Author: Josh Faust */
31 
32 /*
33  * Test version macros
34  */
35 
36 #include <gtest/gtest.h>
37 #include "ros/poll_set.h"
38 #include <sys/socket.h>
39 
40 #include <fcntl.h>
41 
42 #include <boost/bind.hpp>
43 #include <boost/thread.hpp>
44 
45 using namespace ros;
46 
47 class Poller : public testing::Test
48 {
49 public:
51  {
52  }
53 
55  {
56  ::close(sockets_[0]);
57  ::close(sockets_[1]);
58  }
59 
61  {
62  usleep(100000);
63 
64  poll_set_.signal();
65  }
66 
67 protected:
68 
69  virtual void SetUp()
70  {
71  if(socketpair(AF_UNIX, SOCK_STREAM, 0, sockets_) != 0)
72  {
73  FAIL();
74  }
75  if(fcntl(sockets_[0], F_SETFL, O_NONBLOCK) == -1)
76  {
77  FAIL();
78  }
79  if(fcntl(sockets_[1], F_SETFL, O_NONBLOCK) == -1)
80  {
81  FAIL();
82  }
83  }
84 
86 
87  int sockets_[2];
88 
89 
90 };
91 
93 {
94 public:
95  SocketHelper(int sock)
96  : bytes_read_(0)
97  , bytes_written_(0)
98  , pollouts_received_(0)
99  , socket_(sock)
100  {}
101 
102  void processEvents(int events)
103  {
104  if (events & POLLIN)
105  {
106  char b;
107  while(read(socket_, &b, 1) > 0)
108  {
109  ++bytes_read_;
110  };
111  }
112 
113  if (events & POLLOUT)
114  {
115  ++pollouts_received_;
116 
117  write();
118  }
119  }
120 
121  void write()
122  {
123  char b = 0;
124  if (::write(socket_, &b, 1) > 0)
125  {
126  ++bytes_written_;
127  }
128  }
129 
133  int socket_;
134 };
135 
137 {
138  SocketHelper sh(sockets_[0]);
139  ASSERT_TRUE(poll_set_.addSocket(sh.socket_, boost::bind(&SocketHelper::processEvents, &sh, _1)));
140 
141  char b = 0;
142 
143  write(sockets_[1], &b, 1);
144  poll_set_.update(1);
145 
146  ASSERT_EQ(sh.bytes_read_, 0);
147 
148  ASSERT_TRUE(poll_set_.addEvents(sh.socket_, POLLIN));
149  poll_set_.update(1);
150  ASSERT_EQ(sh.bytes_read_, 1);
151 
152  write(sockets_[1], &b, 1);
153  poll_set_.update(1);
154  ASSERT_EQ(sh.bytes_read_, 2);
155 
156  ASSERT_TRUE(poll_set_.delEvents(sh.socket_, POLLIN));
157  write(sockets_[1], &b, 1);
158  poll_set_.update(1);
159  ASSERT_EQ(sh.bytes_read_, 2);
160 
161  ASSERT_TRUE(poll_set_.addEvents(sh.socket_, POLLIN));
162  poll_set_.update(1);
163  ASSERT_EQ(sh.bytes_read_, 3);
164 
165  ASSERT_TRUE(poll_set_.delSocket(sockets_[0]));
166  poll_set_.update(1);
167  ASSERT_EQ(sh.bytes_read_, 3);
168 }
169 
170 TEST_F(Poller, write)
171 {
172  SocketHelper sh(sockets_[0]);
173  ASSERT_TRUE(poll_set_.addSocket(sh.socket_, boost::bind(&SocketHelper::processEvents, &sh, _1)));
174  ASSERT_TRUE(poll_set_.addEvents(sh.socket_, POLLOUT));
175 
176  poll_set_.update(1);
177 
178  ASSERT_EQ(sh.pollouts_received_, 1);
179  ASSERT_EQ(sh.bytes_written_, 1);
180 
181  ASSERT_TRUE(poll_set_.delEvents(sh.socket_, POLLOUT));
182  poll_set_.update(1);
183  ASSERT_EQ(sh.pollouts_received_, 1);
184  ASSERT_EQ(sh.bytes_written_, 1);
185 }
186 
187 TEST_F(Poller, readAndWrite)
188 {
189  SocketHelper sh1(sockets_[0]);
190  SocketHelper sh2(sockets_[1]);
191  ASSERT_TRUE(poll_set_.addSocket(sh1.socket_, boost::bind(&SocketHelper::processEvents, &sh1, _1)));
192  ASSERT_TRUE(poll_set_.addSocket(sh2.socket_, boost::bind(&SocketHelper::processEvents, &sh2, _1)));
193 
194  ASSERT_TRUE(poll_set_.addEvents(sh1.socket_, POLLIN));
195  ASSERT_TRUE(poll_set_.addEvents(sh2.socket_, POLLIN));
196 
197  sh1.write();
198  sh2.write();
199 
200  ASSERT_EQ(sh1.bytes_written_, 1);
201  ASSERT_EQ(sh2.bytes_written_, 1);
202 
203  poll_set_.update(1);
204 
205  ASSERT_EQ(sh1.bytes_read_, 1);
206  ASSERT_EQ(sh2.bytes_read_, 1);
207 
208  ASSERT_TRUE(poll_set_.addEvents(sh1.socket_, POLLOUT));
209  ASSERT_TRUE(poll_set_.addEvents(sh2.socket_, POLLOUT));
210 
211  poll_set_.update(1);
212 
213  ASSERT_EQ(sh1.bytes_written_, 2);
214  ASSERT_EQ(sh2.bytes_written_, 2);
215 
216  ASSERT_TRUE(poll_set_.delEvents(sh1.socket_, POLLOUT));
217  ASSERT_TRUE(poll_set_.delEvents(sh2.socket_, POLLOUT));
218 
219  poll_set_.update(1);
220 
221  ASSERT_EQ(sh1.bytes_read_, 2);
222  ASSERT_EQ(sh2.bytes_read_, 2);
223 }
224 
225 TEST_F(Poller, multiAddDel)
226 {
227  SocketHelper sh(sockets_[0]);
228  ASSERT_TRUE(poll_set_.addSocket(sh.socket_, boost::bind(&SocketHelper::processEvents, &sh, _1)));
229  ASSERT_FALSE(poll_set_.addSocket(sh.socket_, boost::bind(&SocketHelper::processEvents, &sh, _1)));
230 
231  ASSERT_TRUE(poll_set_.addEvents(sh.socket_, 0));
232  ASSERT_FALSE(poll_set_.addEvents(sh.socket_ + 1, 0));
233 
234  ASSERT_TRUE(poll_set_.delEvents(sh.socket_, 0));
235  ASSERT_FALSE(poll_set_.delEvents(sh.socket_ + 1, 0));
236 
237  ASSERT_FALSE(poll_set_.delSocket(sh.socket_ + 1));
238  ASSERT_TRUE(poll_set_.delSocket(sh.socket_));
239 }
240 
241 void addThread(PollSet* ps, SocketHelper* sh, boost::barrier* barrier)
242 {
243  barrier->wait();
244 
245  ps->addSocket(sh->socket_, boost::bind(&SocketHelper::processEvents, sh, _1));
246  ps->addEvents(sh->socket_, POLLIN);
247  ps->addEvents(sh->socket_, POLLOUT);
248 }
249 
250 void delThread(PollSet* ps, SocketHelper* sh, boost::barrier* barrier)
251 {
252  barrier->wait();
253 
254  ps->delEvents(sh->socket_, POLLIN);
255  ps->delEvents(sh->socket_, POLLOUT);
256  ps->delSocket(sh->socket_);
257 }
258 
268 TEST_F(Poller, DISABLED_addDelMultiThread)
269 {
270  for (int i = 0; i < 100; ++i)
271  {
272  SocketHelper sh1(sockets_[0]);
273  SocketHelper sh2(sockets_[1]);
274 
275  const int thread_count = 100;
276 
277  {
278  boost::barrier barrier(thread_count + 1);
279 
280  boost::thread_group tg;
281  for (int i = 0; i < thread_count/2; ++i)
282  {
283  tg.create_thread(boost::bind(addThread, &poll_set_, &sh1, &barrier));
284  tg.create_thread(boost::bind(addThread, &poll_set_, &sh2, &barrier));
285  }
286 
287  barrier.wait();
288 
289  tg.join_all();
290 
291  poll_set_.update(1);
292 
293  ASSERT_TRUE(sh1.bytes_read_ == 0 || sh1.bytes_read_ == 1);
294  ASSERT_TRUE(sh2.bytes_read_ == 0 || sh2.bytes_read_ == 1);
295  ASSERT_EQ(sh1.bytes_written_, 1);
296  ASSERT_EQ(sh2.bytes_written_, 1);
297 
298  poll_set_.update(1);
299 
300  ASSERT_TRUE(sh1.bytes_read_ == 1 || sh1.bytes_read_ == 2);
301  ASSERT_TRUE(sh2.bytes_read_ == 1 || sh2.bytes_read_ == 2);
302  ASSERT_EQ(sh1.bytes_written_, 2);
303  ASSERT_EQ(sh2.bytes_written_, 2);
304  }
305 
306  {
307  boost::barrier barrier(thread_count + 1);
308 
309  boost::thread_group tg;
310  for (int i = 0; i < thread_count/2; ++i)
311  {
312  tg.create_thread(boost::bind(delThread, &poll_set_, &sh1, &barrier));
313  tg.create_thread(boost::bind(delThread, &poll_set_, &sh2, &barrier));
314  }
315 
316  barrier.wait();
317 
318  tg.join_all();
319 
320  poll_set_.update(1);
321 
322  ASSERT_TRUE(sh1.bytes_read_ == 1 || sh1.bytes_read_ == 2);
323  ASSERT_TRUE(sh2.bytes_read_ == 1 || sh2.bytes_read_ == 2);
324  ASSERT_EQ(sh1.bytes_written_, 2);
325  ASSERT_EQ(sh2.bytes_written_, 2);
326  }
327  }
328 }
329 
330 void addDelManyTimesThread(PollSet* ps, SocketHelper* sh1, SocketHelper* sh2, boost::barrier* barrier, int count, volatile bool* done)
331 {
332  *done = false;
333 
334  barrier->wait();
335 
336  for (int i = 0; i < count; ++i)
337  {
338  ps->addSocket(sh1->socket_, boost::bind(&SocketHelper::processEvents, sh1, _1));
339  ps->addEvents(sh1->socket_, POLLIN);
340  ps->addEvents(sh1->socket_, POLLOUT);
341 
342  ps->addSocket(sh2->socket_, boost::bind(&SocketHelper::processEvents, sh2, _1));
343  ps->addEvents(sh2->socket_, POLLIN);
344  ps->addEvents(sh2->socket_, POLLOUT);
345 
346  boost::this_thread::sleep(boost::posix_time::microseconds(100));
347 
348  ps->delEvents(sh1->socket_, POLLIN);
349  ps->delEvents(sh1->socket_, POLLOUT);
350  ps->delSocket(sh1->socket_);
351 
352  ps->delEvents(sh2->socket_, POLLIN);
353  ps->delEvents(sh2->socket_, POLLOUT);
354  ps->delSocket(sh2->socket_);
355  }
356 
357  *done = true;
358 }
359 
360 TEST_F(Poller, updateWhileAddDel)
361 {
362  SocketHelper sh1(sockets_[0]);
363  SocketHelper sh2(sockets_[1]);
364 
365  boost::barrier barrier(2);
366  volatile bool done = false;
367  const int count = 1000;
368 
369  boost::thread t(boost::bind(addDelManyTimesThread, &poll_set_, &sh1, &sh2, &barrier, count, &done));
370 
371  barrier.wait();
372 
373  while (!done)
374  {
375  poll_set_.update(1);
376  }
377 
378  ASSERT_TRUE(sh1.bytes_read_ > 0);
379  ASSERT_TRUE(sh1.bytes_written_ > 0);
380  ASSERT_TRUE(sh2.bytes_read_ > 0);
381  ASSERT_TRUE(sh2.bytes_written_ > 0);
382 }
383 
384 TEST_F(Poller, signal)
385 {
386  // first one clears out any calls to signal() caused by construction
387  poll_set_.update(0);
388 
389  boost::thread t(boost::bind(&Poller::waitThenSignal, this));
390  poll_set_.update(-1);
391 
392  // wait for poll_set_.signal_mutex_ to be unlocked after invoking signal()
393  usleep(50000);
394 }
395 
396 
397 int main(int argc, char** argv)
398 {
399  testing::InitGoogleTest(&argc, argv);
400 
401  signal(SIGPIPE, SIG_IGN);
402 
403  return RUN_ALL_TESTS();
404 }
405 
bool addEvents(int sock, int events)
void addThread(PollSet *ps, SocketHelper *sh, boost::barrier *barrier)
TEST_F(Poller, read)
void processEvents(int events)
void waitThenSignal()
bool delEvents(int sock, int events)
bool delSocket(int sock)
void delThread(PollSet *ps, SocketHelper *sh, boost::barrier *barrier)
PollSet poll_set_
void addDelManyTimesThread(PollSet *ps, SocketHelper *sh1, SocketHelper *sh2, boost::barrier *barrier, int count, volatile bool *done)
bool addSocket(int sock, const SocketUpdateFunc &update_func, const TransportPtr &transport=TransportPtr())
ros::WallTime t
SocketHelper(int sock)
virtual void SetUp()
int main(int argc, char **argv)


test_roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas
autogenerated on Mon Nov 2 2020 03:52:46