io.cpp
Go to the documentation of this file.
1 /*
2  * Software License Agreement (BSD License)
3  *
4  * Copyright (c) 2008, Willow Garage, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above
14  * copyright notice, this list of conditions and the following
15  * disclaimer in the documentation and/or other materials provided
16  * with the distribution.
17  * * Neither the name of Willow Garage, Inc. nor the names of its
18  * contributors may be used to endorse or promote products derived
19  * from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  */
34 /*****************************************************************************
35 ** Includes
36 *****************************************************************************/
37 
38 #include "config.h"
39 
40 #include <ros/io.h>
41 #include <ros/assert.h> // don't need if we dont call the pipe functions.
42 #include <errno.h> // for EFAULT and co.
43 #include <iostream>
44 #include <sstream>
45 #ifdef WIN32
46 #else
47  #include <cstring> // strerror
48  #include <fcntl.h> // for non-blocking configuration
49 #endif
50 
51 #ifdef HAVE_EPOLL
52  #include <sys/epoll.h>
53 #endif
54 
55 /*****************************************************************************
56 ** Macros
57 *****************************************************************************/
58 
59 #define UNUSED(expr) do { (void)(expr); } while (0)
60 
61 /*****************************************************************************
62 ** Namespaces
63 *****************************************************************************/
64 
65 namespace ros {
66 
68  #ifdef WIN32
69  return WSAGetLastError();
70  #else
71  return errno;
72  #endif
73 }
74 const char* last_socket_error_string() {
75  #ifdef WIN32
76  // could fix this to use FORMAT_MESSAGE and print a real string later,
77  // but not high priority.
78  std::stringstream ostream;
79  ostream << "WSA Error: " << WSAGetLastError();
80  return ostream.str().c_str();
81  #else
82  return strerror(errno);
83  #endif
84 }
85 
87 #if defined(WIN32)
88  if ( WSAGetLastError() == WSAEWOULDBLOCK ) {
89  return true;
90  } else {
91  return false;
92  }
93 #else
94  if ( ( errno == EAGAIN ) || ( errno == EWOULDBLOCK ) ) { // posix permits either
95  return true;
96  } else {
97  return false;
98  }
99 #endif
100 }
101 
103  int epfd = -1;
104 #if defined(HAVE_EPOLL)
105  epfd = ::epoll_create1(0);
106  if (epfd < 0)
107  {
108  ROS_ERROR("Unable to create epoll watcher: %s", strerror(errno));
109  }
110 #endif
111  return epfd;
112 }
113 
114 void close_socket_watcher(int fd) {
115  if (fd >= 0)
116  ::close(fd);
117 }
118 
119 void add_socket_to_watcher(int epfd, int fd) {
120 #if defined(HAVE_EPOLL)
121  struct epoll_event ev;
122  ev.events = 0;
123  ev.data.fd = fd;
124 
125  if (::epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev))
126  {
127  ROS_ERROR("Unable to add FD to epoll: %s", strerror(errno));
128  }
129 #else
130  UNUSED(epfd);
131  UNUSED(fd);
132 #endif
133 }
134 
135 void del_socket_from_watcher(int epfd, int fd) {
136 #if defined(HAVE_EPOLL)
137  if (::epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL))
138  {
139  ROS_ERROR("Unable to remove FD to epoll: %s", strerror(errno));
140  }
141 #else
142  UNUSED(epfd);
143  UNUSED(fd);
144 #endif
145 }
146 
147 void set_events_on_socket(int epfd, int fd, int events) {
148 #if defined(HAVE_EPOLL)
149  struct epoll_event ev;
150  ev.events = events;
151  ev.data.fd = fd;
152  if (::epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev))
153  {
154  ROS_ERROR("Unable to modify FD epoll: %s", strerror(errno));
155  }
156 #else
157  UNUSED(epfd);
158  UNUSED(fd);
159  UNUSED(events);
160 #endif
161 }
162 
163 
164 
165 /*****************************************************************************
166 ** Service Robotics/Libssh Functions
167 *****************************************************************************/
180 pollfd_vector_ptr poll_sockets(int epfd, socket_pollfd *fds, nfds_t nfds, int timeout) {
181 #if defined(WIN32)
182  fd_set readfds, writefds, exceptfds;
183  struct timeval tv, *ptv;
184  socket_fd_t max_fd;
185  int rc;
186  nfds_t i;
188 
189  UNUSED(epfd);
190 
191  if (fds == NULL) {
192  errno = EFAULT;
193  return ofds;
194  }
195 
196  FD_ZERO (&readfds);
197  FD_ZERO (&writefds);
198  FD_ZERO (&exceptfds);
199 
200  /*********************
201  ** Compute fd sets
202  **********************/
203  // also find the largest descriptor.
204  for (rc = -1, max_fd = 0, i = 0; i < nfds; i++) {
205  if (fds[i].fd == INVALID_SOCKET) {
206  continue;
207  }
208  if (fds[i].events & (POLLIN | POLLRDNORM)) {
209  FD_SET (fds[i].fd, &readfds);
210  }
211  if (fds[i].events & (POLLOUT | POLLWRNORM | POLLWRBAND)) {
212  FD_SET (fds[i].fd, &writefds);
213  }
214  if (fds[i].events & (POLLPRI | POLLRDBAND)) {
215  FD_SET (fds[i].fd, &exceptfds);
216  }
217  if (fds[i].fd > max_fd &&
218  (fds[i].events & (POLLIN | POLLOUT | POLLPRI |
219  POLLRDNORM | POLLRDBAND |
220  POLLWRNORM | POLLWRBAND))) {
221  max_fd = fds[i].fd;
222  rc = 0;
223  }
224  }
225 
226  if (rc == -1) {
227  errno = EINVAL;
228  return ofds;
229  }
230  /*********************
231  ** Setting the timeout
232  **********************/
233  if (timeout < 0) {
234  ptv = NULL;
235  } else {
236  ptv = &tv;
237  if (timeout == 0) {
238  tv.tv_sec = 0;
239  tv.tv_usec = 0;
240  } else {
241  tv.tv_sec = timeout / 1000;
242  tv.tv_usec = (timeout % 1000) * 1000;
243  }
244  }
245 
246  rc = select (max_fd + 1, &readfds, &writefds, &exceptfds, ptv);
247  if (rc < 0) {
248  return ofds;
249  }
250  ofds.reset(new std::vector<socket_pollfd>);
251  if ( rc == 0 ) {
252  return ofds;
253  }
254 
255  for (rc = 0, i = 0; i < nfds; i++) {
256  if (fds[i].fd != INVALID_SOCKET) {
257  fds[i].revents = 0;
258 
259  if (FD_ISSET(fds[i].fd, &readfds)) {
260  int save_errno = errno;
261  char data[64] = {0};
262  int ret;
263 
264  /* support for POLLHUP */
265  // just check if there's incoming data, without removing it from the queue.
266  ret = recv(fds[i].fd, data, 64, MSG_PEEK);
267  #ifdef WIN32
268  if ((ret == -1) &&
269  (errno == WSAESHUTDOWN || errno == WSAECONNRESET ||
270  (errno == WSAECONNABORTED) || errno == WSAENETRESET))
271  #else
272  if ((ret == -1) &&
273  (errno == ESHUTDOWN || errno == ECONNRESET ||
274  (errno == ECONNABORTED) || errno == ENETRESET))
275  #endif
276  {
277  fds[i].revents |= POLLHUP;
278  } else {
279  fds[i].revents |= fds[i].events & (POLLIN | POLLRDNORM);
280  }
281  errno = save_errno;
282  }
283  if (FD_ISSET(fds[i].fd, &writefds)) {
284  fds[i].revents |= fds[i].events & (POLLOUT | POLLWRNORM | POLLWRBAND);
285  }
286 
287  if (FD_ISSET(fds[i].fd, &exceptfds)) {
288  fds[i].revents |= fds[i].events & (POLLPRI | POLLRDBAND);
289  }
290 
291  if (fds[i].revents & ~POLLHUP) {
292  rc++;
293  }
294  } else {
295  fds[i].revents = POLLNVAL;
296  }
297  ofds->push_back(fds[i]);
298  }
299  return ofds;
300 #elif defined (HAVE_EPOLL)
301  UNUSED(nfds);
302  UNUSED(fds);
303  struct epoll_event ev[nfds];
304  pollfd_vector_ptr ofds;
305 
306  int fd_cnt = ::epoll_wait(epfd, ev, nfds, timeout);
307 
308  if (fd_cnt < 0)
309  {
310  // EINTR means that we got interrupted by a signal, and is not an error
311  if(errno != EINTR) {
312  ROS_ERROR("Error in epoll_wait! %s", strerror(errno));
313  ofds.reset();
314  }
315  }
316  else
317  {
318  ofds.reset(new std::vector<socket_pollfd>);
319  for (int i = 0; i < fd_cnt; i++)
320  {
321  socket_pollfd pfd;
322  pfd.fd = ev[i].data.fd;
323  pfd.revents = ev[i].events;
324  ofds->push_back(pfd);
325  }
326  }
327  return ofds;
328 #else
329  UNUSED(epfd);
330  pollfd_vector_ptr ofds(new std::vector<socket_pollfd>);
331  // Clear the `revents` fields
332  for (nfds_t i = 0; i < nfds; i++)
333  {
334  fds[i].revents = 0;
335  }
336 
337  // use an existing poll implementation
338  int result = poll(fds, nfds, timeout);
339  if ( result < 0 )
340  {
341  // EINTR means that we got interrupted by a signal, and is not an error
342  if(errno != EINTR)
343  {
344  ROS_ERROR("Error in poll! %s", strerror(errno));
345  ofds.reset();
346  }
347  } else {
348  for (nfds_t i = 0; i < nfds; i++)
349  {
350  if (fds[i].revents)
351  {
352  ofds->push_back(fds[i]);
353  fds[i].revents = 0;
354  }
355  }
356  }
357  return ofds;
358 #endif // poll_sockets functions
359 }
360 /*****************************************************************************
361 ** Socket Utilities
362 *****************************************************************************/
368 #ifdef WIN32
369  u_long non_blocking = 1;
370  if(ioctlsocket( socket, FIONBIO, &non_blocking ) != 0 )
371  {
372  return WSAGetLastError();
373  }
374 #else
375  if(fcntl(socket, F_SETFL, O_NONBLOCK) == -1)
376  {
377  return errno;
378  }
379 #endif
380  return 0;
381 }
382 
388 int close_socket(socket_fd_t &socket) {
389 #ifdef WIN32
390  if(::closesocket(socket) == SOCKET_ERROR ) {
391  return -1;
392  } else {
393  return 0;
394  }
395 #else
396  if (::close(socket) < 0) {
397  return -1;
398  } else {
399  return 0;
400  }
401 #endif //WIN32
402 }
403 
404 /*****************************************************************************
405 ** Signal Pair
406 *****************************************************************************/
412 int create_signal_pair(signal_fd_t signal_pair[2]) {
413 #ifdef WIN32 // use a socket pair
414  signal_pair[0] = INVALID_SOCKET;
415  signal_pair[1] = INVALID_SOCKET;
416 
417  union {
418  struct sockaddr_in inaddr;
419  struct sockaddr addr;
420  } a;
421  socklen_t addrlen = sizeof(a.inaddr);
422 
423  /*********************
424  ** Listen Socket
425  **********************/
426  socket_fd_t listen_socket = INVALID_SOCKET;
427  listen_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
428  if (listen_socket == INVALID_SOCKET) {
429  return -1;
430  }
431 
432  // allow it to be bound to an address already in use - do we actually need this?
433  int reuse = 1;
434  if (setsockopt(listen_socket, SOL_SOCKET, SO_REUSEADDR, (char*) &reuse, (socklen_t) sizeof(reuse)) == SOCKET_ERROR ) {
435  ::closesocket(listen_socket);
436  return -1;
437  }
438 
439  memset(&a, 0, sizeof(a));
440  a.inaddr.sin_family = AF_INET;
441  a.inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
442  // For TCP/IP, if the port is specified as zero, the service provider assigns
443  // a unique port to the application from the dynamic client port range.
444  a.inaddr.sin_port = 0;
445 
446  if (bind(listen_socket, &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR) {
447  ::closesocket(listen_socket);
448  return -1;
449  }
450  // we need this below because the system auto filled in some entries, e.g. port #
451  if (getsockname(listen_socket, &a.addr, &addrlen) == SOCKET_ERROR) {
452  ::closesocket(listen_socket);
453  return -1;
454  }
455  // max 1 connection permitted
456  if (listen(listen_socket, 1) == SOCKET_ERROR) {
457  ::closesocket(listen_socket);
458  return -1;
459  }
460 
461  /*********************
462  ** Connection
463  **********************/
464  // do we need io overlapping?
465  // DWORD flags = (make_overlapped ? WSA_FLAG_OVERLAPPED : 0);
466  DWORD overlapped_flag = 0;
467  signal_pair[0] = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, overlapped_flag);
468  if (signal_pair[0] == INVALID_SOCKET) {
469  ::closesocket(listen_socket);
470  ::closesocket(signal_pair[0]);
471  return -1;
472  }
473  // reusing the information from above to connect to the listener
474  if (connect(signal_pair[0], &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR) {
475  ::closesocket(listen_socket);
476  ::closesocket(signal_pair[0]);
477  return -1;
478  }
479  /*********************
480  ** Accept
481  **********************/
482  signal_pair[1] = accept(listen_socket, NULL, NULL);
483  if (signal_pair[1] == INVALID_SOCKET) {
484  ::closesocket(listen_socket);
485  ::closesocket(signal_pair[0]);
486  ::closesocket(signal_pair[1]);
487  return -1;
488  }
489  /*********************
490  ** Nonblocking
491  **********************/
492  // should we do this or should we set io overlapping?
493  if ( (set_non_blocking(signal_pair[0]) != 0) || (set_non_blocking(signal_pair[1]) != 0) ) {
494  ::closesocket(listen_socket);
495  ::closesocket(signal_pair[0]);
496  ::closesocket(signal_pair[1]);
497  return -1;
498  }
499  /*********************
500  ** Cleanup
501  **********************/
502  ::closesocket(listen_socket); // the listener has done its job.
503  return 0;
504 #else // use a pipe pair
505  // initialize
506  signal_pair[0] = -1;
507  signal_pair[1] = -1;
508 
509  if(pipe(signal_pair) != 0) {
510  ROS_FATAL( "pipe() failed");
511  return -1;
512  }
513  if(fcntl(signal_pair[0], F_SETFL, O_NONBLOCK) == -1) {
514  ROS_FATAL( "fcntl() failed");
515  return -1;
516  }
517  if(fcntl(signal_pair[1], F_SETFL, O_NONBLOCK) == -1) {
518  ROS_FATAL( "fcntl() failed");
519  return -1;
520  }
521  return 0;
522 #endif // create_pipe
523 }
524 
525 } // namespace ros
ROSCPP_DECL void del_socket_from_watcher(int epfd, int fd)
Definition: io.cpp:135
ROSCPP_DECL int create_socket_watcher()
Definition: io.cpp:102
#define ROS_FATAL(...)
ROSCPP_DECL int set_non_blocking(socket_fd_t &socket)
Definition: io.cpp:367
int signal_fd_t
Definition: io.h:137
int socket_fd_t
Definition: io.h:136
ROSCPP_DECL void close_socket_watcher(int fd)
Definition: io.cpp:114
ROSCPP_DECL const char * last_socket_error_string()
Definition: io.cpp:74
ROSCPP_DECL bool last_socket_error_is_would_block()
Definition: io.cpp:86
ROSCPP_DECL pollfd_vector_ptr poll_sockets(int epfd, socket_pollfd *fds, nfds_t nfds, int timeout)
A cross platform polling function for sockets.
Definition: io.cpp:180
struct pollfd socket_pollfd
Definition: io.h:138
ROSCPP_DECL void set_events_on_socket(int epfd, int fd, int events)
Definition: io.cpp:147
ROSCPP_DECL int create_signal_pair(signal_fd_t signal_pair[2])
Definition: io.cpp:412
ROSCPP_DECL int last_socket_error()
Definition: io.cpp:67
ROSCPP_DECL int close_socket(socket_fd_t &socket)
Close the socket.
Definition: io.cpp:388
#define ROS_ERROR(...)
ROSCPP_DECL void add_socket_to_watcher(int epfd, int fd)
Definition: io.cpp:119
#define UNUSED(expr)
Definition: io.cpp:59


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Wed Mar 21 2018 07:13:27