test_poll_set.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2008, Willow Garage, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * * Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  * * Redistributions in binary form must reproduce the above copyright
11  * notice, this list of conditions and the following disclaimer in the
12  * documentation and/or other materials provided with the distribution.
13  * * Neither the name of Willow Garage, Inc. nor the names of its
14  * contributors may be used to endorse or promote products derived from
15  * this software without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 /* Author: Josh Faust */
31 
32 /*
33  * Test version macros
34  */
35 
36 #include <gtest/gtest.h>
37 #include "ros/poll_set.h"
38 #ifndef _WIN32
39 # include <sys/socket.h>
40 #endif
41 
42 #include <fcntl.h>
43 
44 #include <boost/bind.hpp>
45 #include <boost/thread.hpp>
46 
47 using namespace ros;
48 
49 int set_nonblocking(int &socket)
50 {
51 #ifndef _WIN32
52  if (fcntl(socket, F_SETFL, O_NONBLOCK) == -1)
53  {
54  return errno;
55  }
56 #else
57  u_long non_blocking = 1;
58  if (ioctlsocket(socket, FIONBIO, &non_blocking) != 0)
59  {
60  return WSAGetLastError();
61  }
62 #endif
63  return 0;
64 }
65 
66 int create_socket_pair(int socket_pair[2])
67 {
68 #ifndef _WIN32
69  return socketpair(AF_UNIX, SOCK_STREAM, 0, socket_pair);
70 #else
71  socket_pair[0] = INVALID_SOCKET;
72  socket_pair[1] = INVALID_SOCKET;
73 
74  /*********************
75  ** Listen Socket
76  **********************/
77  socket_fd_t listen_socket = INVALID_SOCKET;
78  listen_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
79  if (listen_socket == INVALID_SOCKET)
80  {
81  return WSAGetLastError();
82  }
83 
84  // allow it to be bound to an address already in use - do we actually need this?
85  int reuse = 1;
86  if (setsockopt(listen_socket, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<const char*>(&reuse), static_cast<socklen_t>(sizeof(reuse))) == SOCKET_ERROR)
87  {
88  ::closesocket(listen_socket);
89  return WSAGetLastError();
90  }
91 
92  union
93  {
94  struct sockaddr_in inaddr;
95  struct sockaddr addr;
96  } a;
97 
98  memset(&a, 0, sizeof(a));
99  a.inaddr.sin_family = AF_INET;
100  a.inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
101  // For TCP/IP, if the port is specified as zero, the service provider assigns
102  // a unique port to the application from the dynamic client port range.
103  a.inaddr.sin_port = 0;
104 
105  if (bind(listen_socket, &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR)
106  {
107  ::closesocket(listen_socket);
108  return WSAGetLastError();
109  }
110 
111  // we need this below because the system auto filled in some entries, e.g. port #
112  socklen_t addrlen = static_cast<socklen_t>(sizeof(a.inaddr));
113  if (getsockname(listen_socket, &a.addr, &addrlen) == SOCKET_ERROR)
114  {
115  ::closesocket(listen_socket);
116  return WSAGetLastError();
117  }
118  // max 1 connection permitted
119  if (listen(listen_socket, 1) == SOCKET_ERROR)
120  {
121  ::closesocket(listen_socket);
122  return WSAGetLastError();
123  }
124 
125  /*********************
126  ** Connection
127  **********************/
128  DWORD overlapped_flag = 0;
129  socket_pair[0] = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, overlapped_flag);
130  if (socket_pair[0] == INVALID_SOCKET)
131  {
132  ::closesocket(listen_socket);
133  ::closesocket(socket_pair[0]);
134  return WSAGetLastError();
135  }
136 
137  // reusing the information from above to connect to the listener
138  if (connect(socket_pair[0], &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR)
139  {
140  ::closesocket(listen_socket);
141  ::closesocket(socket_pair[0]);
142  return WSAGetLastError();
143  }
144 
145  /*********************
146  ** Accept
147  **********************/
148  socket_pair[1] = accept(listen_socket, NULL, NULL);
149  if (socket_pair[1] == INVALID_SOCKET)
150  {
151  ::closesocket(listen_socket);
152  ::closesocket(socket_pair[0]);
153  return WSAGetLastError();
154  }
155 
156  /*********************
157  ** Cleanup
158  **********************/
159  ::closesocket(listen_socket); // the listener has done its job.
160  return 0;
161 #endif
162 }
163 
164 class Poller : public testing::Test
165 {
166 public:
168  {
169 #ifdef _WIN32
170  WSADATA wsaData;
171  WSAStartup(MAKEWORD(2, 0), &wsaData);
172 #endif
173  }
174 
176  {
177  ::close(sockets_[0]);
178  ::close(sockets_[1]);
179 
180 #ifdef _WIN32
181  WSACleanup();
182 #endif
183  }
184 
186  {
187  boost::this_thread::sleep(boost::posix_time::microseconds(100000));
188 
189  poll_set_.signal();
190  }
191 
192 protected:
193 
194  virtual void SetUp()
195  {
196  if (create_socket_pair(sockets_) != 0)
197  {
198  FAIL();
199  }
200  if(set_nonblocking(sockets_[0]) != 0)
201  {
202  FAIL();
203  }
204  if(set_nonblocking(sockets_[1]) != 0)
205  {
206  FAIL();
207  }
208  }
209 
211 
212  int sockets_[2];
213 
214 
215 };
216 
218 {
219 public:
220  SocketHelper(int sock)
221  : bytes_read_(0)
222  , bytes_written_(0)
223  , pollouts_received_(0)
224  , socket_(sock)
225  {}
226 
227  void processEvents(int events)
228  {
229  if (events & POLLIN)
230  {
231  char b;
232  while(read(socket_, &b, 1) > 0)
233  {
234  ++bytes_read_;
235  };
236  }
237 
238  if (events & POLLOUT)
239  {
240  ++pollouts_received_;
241 
242  write();
243  }
244  }
245 
246  void write()
247  {
248  char b = 0;
249  if (::write(socket_, &b, 1) > 0)
250  {
251  ++bytes_written_;
252  }
253  }
254 
258  int socket_;
259 };
260 
262 {
263  SocketHelper sh(sockets_[0]);
264  ASSERT_TRUE(poll_set_.addSocket(sh.socket_, boost::bind(&SocketHelper::processEvents, &sh, boost::placeholders::_1)));
265 
266  char b = 0;
267 
268  write(sockets_[1], &b, 1);
269  poll_set_.update(1);
270 
271  ASSERT_EQ(sh.bytes_read_, 0);
272 
273  ASSERT_TRUE(poll_set_.addEvents(sh.socket_, POLLIN));
274  poll_set_.update(1);
275  ASSERT_EQ(sh.bytes_read_, 1);
276 
277  write(sockets_[1], &b, 1);
278  poll_set_.update(1);
279  ASSERT_EQ(sh.bytes_read_, 2);
280 
281  ASSERT_TRUE(poll_set_.delEvents(sh.socket_, POLLIN));
282  write(sockets_[1], &b, 1);
283  poll_set_.update(1);
284  ASSERT_EQ(sh.bytes_read_, 2);
285 
286  ASSERT_TRUE(poll_set_.addEvents(sh.socket_, POLLIN));
287  poll_set_.update(1);
288  ASSERT_EQ(sh.bytes_read_, 3);
289 
290  ASSERT_TRUE(poll_set_.delSocket(sockets_[0]));
291  poll_set_.update(1);
292  ASSERT_EQ(sh.bytes_read_, 3);
293 }
294 
295 TEST_F(Poller, write)
296 {
297  SocketHelper sh(sockets_[0]);
298  ASSERT_TRUE(poll_set_.addSocket(sh.socket_, boost::bind(&SocketHelper::processEvents, &sh, boost::placeholders::_1)));
299  ASSERT_TRUE(poll_set_.addEvents(sh.socket_, POLLOUT));
300 
301  poll_set_.update(1);
302 
303  ASSERT_EQ(sh.pollouts_received_, 1);
304  ASSERT_EQ(sh.bytes_written_, 1);
305 
306  ASSERT_TRUE(poll_set_.delEvents(sh.socket_, POLLOUT));
307  poll_set_.update(1);
308  ASSERT_EQ(sh.pollouts_received_, 1);
309  ASSERT_EQ(sh.bytes_written_, 1);
310 }
311 
312 TEST_F(Poller, readAndWrite)
313 {
314  SocketHelper sh1(sockets_[0]);
315  SocketHelper sh2(sockets_[1]);
316  ASSERT_TRUE(poll_set_.addSocket(sh1.socket_, boost::bind(&SocketHelper::processEvents, &sh1, boost::placeholders::_1)));
317  ASSERT_TRUE(poll_set_.addSocket(sh2.socket_, boost::bind(&SocketHelper::processEvents, &sh2, boost::placeholders::_1)));
318 
319  ASSERT_TRUE(poll_set_.addEvents(sh1.socket_, POLLIN));
320  ASSERT_TRUE(poll_set_.addEvents(sh2.socket_, POLLIN));
321 
322  sh1.write();
323  sh2.write();
324 
325  ASSERT_EQ(sh1.bytes_written_, 1);
326  ASSERT_EQ(sh2.bytes_written_, 1);
327 
328  poll_set_.update(1);
329 
330  ASSERT_EQ(sh1.bytes_read_, 1);
331  ASSERT_EQ(sh2.bytes_read_, 1);
332 
333  ASSERT_TRUE(poll_set_.addEvents(sh1.socket_, POLLOUT));
334  ASSERT_TRUE(poll_set_.addEvents(sh2.socket_, POLLOUT));
335 
336  poll_set_.update(1);
337 
338  ASSERT_EQ(sh1.bytes_written_, 2);
339  ASSERT_EQ(sh2.bytes_written_, 2);
340 
341  ASSERT_TRUE(poll_set_.delEvents(sh1.socket_, POLLOUT));
342  ASSERT_TRUE(poll_set_.delEvents(sh2.socket_, POLLOUT));
343 
344  poll_set_.update(1);
345 
346  ASSERT_EQ(sh1.bytes_read_, 2);
347  ASSERT_EQ(sh2.bytes_read_, 2);
348 }
349 
350 TEST_F(Poller, multiAddDel)
351 {
352  SocketHelper sh(sockets_[0]);
353  ASSERT_TRUE(poll_set_.addSocket(sh.socket_, boost::bind(&SocketHelper::processEvents, &sh, boost::placeholders::_1)));
354  ASSERT_FALSE(poll_set_.addSocket(sh.socket_, boost::bind(&SocketHelper::processEvents, &sh, boost::placeholders::_1)));
355 
356  ASSERT_TRUE(poll_set_.addEvents(sh.socket_, 0));
357  ASSERT_FALSE(poll_set_.addEvents(sh.socket_ + 1, 0));
358 
359  ASSERT_TRUE(poll_set_.delEvents(sh.socket_, 0));
360  ASSERT_FALSE(poll_set_.delEvents(sh.socket_ + 1, 0));
361 
362  ASSERT_FALSE(poll_set_.delSocket(sh.socket_ + 1));
363  ASSERT_TRUE(poll_set_.delSocket(sh.socket_));
364 }
365 
366 void addThread(PollSet* ps, SocketHelper* sh, boost::barrier* barrier)
367 {
368  barrier->wait();
369 
370  ps->addSocket(sh->socket_, boost::bind(&SocketHelper::processEvents, sh, boost::placeholders::_1));
371  ps->addEvents(sh->socket_, POLLIN);
372  ps->addEvents(sh->socket_, POLLOUT);
373 }
374 
375 void delThread(PollSet* ps, SocketHelper* sh, boost::barrier* barrier)
376 {
377  barrier->wait();
378 
379  ps->delEvents(sh->socket_, POLLIN);
380  ps->delEvents(sh->socket_, POLLOUT);
381  ps->delSocket(sh->socket_);
382 }
383 
393 TEST_F(Poller, DISABLED_addDelMultiThread)
394 {
395  for (int i = 0; i < 100; ++i)
396  {
397  SocketHelper sh1(sockets_[0]);
398  SocketHelper sh2(sockets_[1]);
399 
400  const int thread_count = 100;
401 
402  {
403  boost::barrier barrier(thread_count + 1);
404 
405  boost::thread_group tg;
406  for (int i = 0; i < thread_count/2; ++i)
407  {
408  tg.create_thread(boost::bind(addThread, &poll_set_, &sh1, &barrier));
409  tg.create_thread(boost::bind(addThread, &poll_set_, &sh2, &barrier));
410  }
411 
412  barrier.wait();
413 
414  tg.join_all();
415 
416  poll_set_.update(1);
417 
418  ASSERT_TRUE(sh1.bytes_read_ == 0 || sh1.bytes_read_ == 1);
419  ASSERT_TRUE(sh2.bytes_read_ == 0 || sh2.bytes_read_ == 1);
420  ASSERT_EQ(sh1.bytes_written_, 1);
421  ASSERT_EQ(sh2.bytes_written_, 1);
422 
423  poll_set_.update(1);
424 
425  ASSERT_TRUE(sh1.bytes_read_ == 1 || sh1.bytes_read_ == 2);
426  ASSERT_TRUE(sh2.bytes_read_ == 1 || sh2.bytes_read_ == 2);
427  ASSERT_EQ(sh1.bytes_written_, 2);
428  ASSERT_EQ(sh2.bytes_written_, 2);
429  }
430 
431  {
432  boost::barrier barrier(thread_count + 1);
433 
434  boost::thread_group tg;
435  for (int i = 0; i < thread_count/2; ++i)
436  {
437  tg.create_thread(boost::bind(delThread, &poll_set_, &sh1, &barrier));
438  tg.create_thread(boost::bind(delThread, &poll_set_, &sh2, &barrier));
439  }
440 
441  barrier.wait();
442 
443  tg.join_all();
444 
445  poll_set_.update(1);
446 
447  ASSERT_TRUE(sh1.bytes_read_ == 1 || sh1.bytes_read_ == 2);
448  ASSERT_TRUE(sh2.bytes_read_ == 1 || sh2.bytes_read_ == 2);
449  ASSERT_EQ(sh1.bytes_written_, 2);
450  ASSERT_EQ(sh2.bytes_written_, 2);
451  }
452  }
453 }
454 
455 void addDelManyTimesThread(PollSet* ps, SocketHelper* sh1, SocketHelper* sh2, boost::barrier* barrier, int count, volatile bool* done)
456 {
457  *done = false;
458 
459  barrier->wait();
460 
461  for (int i = 0; i < count; ++i)
462  {
463  ps->addSocket(sh1->socket_, boost::bind(&SocketHelper::processEvents, sh1, boost::placeholders::_1));
464  ps->addEvents(sh1->socket_, POLLIN);
465  ps->addEvents(sh1->socket_, POLLOUT);
466 
467  ps->addSocket(sh2->socket_, boost::bind(&SocketHelper::processEvents, sh2, boost::placeholders::_1));
468  ps->addEvents(sh2->socket_, POLLIN);
469  ps->addEvents(sh2->socket_, POLLOUT);
470 
471  boost::this_thread::sleep(boost::posix_time::microseconds(100));
472 
473  ps->delEvents(sh1->socket_, POLLIN);
474  ps->delEvents(sh1->socket_, POLLOUT);
475  ps->delSocket(sh1->socket_);
476 
477  ps->delEvents(sh2->socket_, POLLIN);
478  ps->delEvents(sh2->socket_, POLLOUT);
479  ps->delSocket(sh2->socket_);
480  }
481 
482  *done = true;
483 }
484 
485 TEST_F(Poller, updateWhileAddDel)
486 {
487  SocketHelper sh1(sockets_[0]);
488  SocketHelper sh2(sockets_[1]);
489 
490  boost::barrier barrier(2);
491  volatile bool done = false;
492  const int count = 1000;
493 
494  boost::thread t(boost::bind(addDelManyTimesThread, &poll_set_, &sh1, &sh2, &barrier, count, &done));
495 
496  barrier.wait();
497 
498  while (!done)
499  {
500  poll_set_.update(1);
501  }
502 
503  ASSERT_TRUE(sh1.bytes_read_ > 0);
504  ASSERT_TRUE(sh1.bytes_written_ > 0);
505  ASSERT_TRUE(sh2.bytes_read_ > 0);
506  ASSERT_TRUE(sh2.bytes_written_ > 0);
507 }
508 
509 TEST_F(Poller, signal)
510 {
511  // first one clears out any calls to signal() caused by construction
512  poll_set_.update(0);
513 
514  boost::thread t(boost::bind(&Poller::waitThenSignal, this));
515  poll_set_.update(-1);
516 
517  // wait for poll_set_.signal_mutex_ to be unlocked after invoking signal()
518  boost::this_thread::sleep(boost::posix_time::microseconds(50000));
519 }
520 
521 
522 int main(int argc, char** argv)
523 {
524  testing::InitGoogleTest(&argc, argv);
525 
526 #ifndef _WIN32
527  signal(SIGPIPE, SIG_IGN);
528 #endif
529 
530  return RUN_ALL_TESTS();
531 }
532 
t
ros::WallTime t
Definition: pointcloud_serdes.cpp:41
ros::PollSet::delSocket
bool delSocket(int sock)
Poller::SetUp
virtual void SetUp()
Definition: test_poll_set.cpp:194
ros
set_nonblocking
int set_nonblocking(int &socket)
Definition: test_poll_set.cpp:49
ros::PollSet::addEvents
bool addEvents(int sock, int events)
Poller::waitThenSignal
void waitThenSignal()
Definition: test_poll_set.cpp:185
ros::PollSet::addSocket
bool addSocket(int sock, const SocketUpdateFunc &update_func, const TransportPtr &transport=TransportPtr())
SocketHelper::write
void write()
Definition: test_poll_set.cpp:246
SocketHelper
Definition: test_poll_set.cpp:217
addThread
void addThread(PollSet *ps, SocketHelper *sh, boost::barrier *barrier)
Definition: test_poll_set.cpp:366
SocketHelper::bytes_written_
int bytes_written_
Definition: test_poll_set.cpp:256
ros::PollSet::delEvents
bool delEvents(int sock, int events)
SocketHelper::processEvents
void processEvents(int events)
Definition: test_poll_set.cpp:227
SocketHelper::SocketHelper
SocketHelper(int sock)
Definition: test_poll_set.cpp:220
SocketHelper::socket_
int socket_
Definition: test_poll_set.cpp:258
socket_fd_t
int socket_fd_t
Poller::~Poller
~Poller()
Definition: test_poll_set.cpp:175
create_socket_pair
int create_socket_pair(int socket_pair[2])
Definition: test_poll_set.cpp:66
Poller::Poller
Poller()
Definition: test_poll_set.cpp:167
Poller::poll_set_
PollSet poll_set_
Definition: test_poll_set.cpp:210
SocketHelper::pollouts_received_
int pollouts_received_
Definition: test_poll_set.cpp:257
delThread
void delThread(PollSet *ps, SocketHelper *sh, boost::barrier *barrier)
Definition: test_poll_set.cpp:375
poll_set.h
main
int main(int argc, char **argv)
Definition: test_poll_set.cpp:522
Poller
Definition: test_poll_set.cpp:164
SocketHelper::bytes_read_
int bytes_read_
Definition: test_poll_set.cpp:255
ros::PollSet
addDelManyTimesThread
void addDelManyTimesThread(PollSet *ps, SocketHelper *sh1, SocketHelper *sh2, boost::barrier *barrier, int count, volatile bool *done)
Definition: test_poll_set.cpp:455
TEST_F
TEST_F(Poller, read)
Definition: test_poll_set.cpp:261


test_roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas , Jacob Perron
autogenerated on Thu Nov 23 2023 04:02:02