io.cpp
Go to the documentation of this file.
1 /*
2  * Software License Agreement (BSD License)
3  *
4  * Copyright (c) 2008, Willow Garage, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above
14  * copyright notice, this list of conditions and the following
15  * disclaimer in the documentation and/or other materials provided
16  * with the distribution.
17  * * Neither the name of Willow Garage, Inc. nor the names of its
18  * contributors may be used to endorse or promote products derived
19  * from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  */
34 /*****************************************************************************
35 ** Includes
36 *****************************************************************************/
37 
38 #include "config.h"
39 
40 #include <ros/io.h>
41 #include <ros/assert.h> // don't need if we dont call the pipe functions.
42 #include <errno.h> // for EFAULT and co.
43 #include <iostream>
44 #include <sstream>
45 #ifdef WIN32
46 #else
47  #include <cstring> // strerror
48  #include <fcntl.h> // for non-blocking configuration
49 #endif
50 
51 #ifdef HAVE_EPOLL
52  #include <sys/epoll.h>
53 #endif
54 
55 /*****************************************************************************
56 ** Macros
57 *****************************************************************************/
58 
59 #define UNUSED(expr) do { (void)(expr); } while (0)
60 
61 /*****************************************************************************
62 ** Namespaces
63 *****************************************************************************/
64 
65 namespace ros {
66 
68  #ifdef WIN32
69  return WSAGetLastError();
70  #else
71  return errno;
72  #endif
73 }
74 const char* last_socket_error_string() {
75  #ifdef WIN32
76  // could fix this to use FORMAT_MESSAGE and print a real string later,
77  // but not high priority.
78  std::stringstream ostream;
79  ostream << "WSA Error: " << WSAGetLastError();
80  return ostream.str().c_str();
81  #else
82  return strerror(errno);
83  #endif
84 }
85 
87 #if defined(WIN32)
88  if ( WSAGetLastError() == WSAEWOULDBLOCK ) {
89  return true;
90  } else {
91  return false;
92  }
93 #else
94  if ( ( errno == EAGAIN ) || ( errno == EWOULDBLOCK ) ) { // posix permits either
95  return true;
96  } else {
97  return false;
98  }
99 #endif
100 }
101 
103  int epfd = -1;
104 #if defined(HAVE_EPOLL)
105  epfd = ::epoll_create1(0);
106  if (epfd < 0)
107  {
108  ROS_ERROR("Unable to create epoll watcher: %s", strerror(errno));
109  }
110 #endif
111  return epfd;
112 }
113 
114 void close_socket_watcher(int fd) {
115  if (fd >= 0)
116  ::close(fd);
117 }
118 
119 void add_socket_to_watcher(int epfd, int fd) {
120 #if defined(HAVE_EPOLL)
121  struct epoll_event ev;
122  bzero(&ev, sizeof(ev));
123 
124  ev.events = 0;
125  ev.data.fd = fd;
126 
127  if (::epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev))
128  {
129  ROS_ERROR("Unable to add FD to epoll: %s", strerror(errno));
130  }
131 #else
132  UNUSED(epfd);
133  UNUSED(fd);
134 #endif
135 }
136 
137 void del_socket_from_watcher(int epfd, int fd) {
138 #if defined(HAVE_EPOLL)
139  if (::epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL))
140  {
141  ROS_ERROR("Unable to remove FD to epoll: %s", strerror(errno));
142  }
143 #else
144  UNUSED(epfd);
145  UNUSED(fd);
146 #endif
147 }
148 
149 void set_events_on_socket(int epfd, int fd, int events) {
150 #if defined(HAVE_EPOLL)
151  struct epoll_event ev;
152  bzero(&ev, sizeof(ev));
153 
154  ev.events = events;
155  ev.data.fd = fd;
156  if (::epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev))
157  {
158  ROS_ERROR("Unable to modify FD epoll: %s", strerror(errno));
159  }
160 #else
161  UNUSED(epfd);
162  UNUSED(fd);
163  UNUSED(events);
164 #endif
165 }
166 
167 
168 
169 /*****************************************************************************
170 ** Service Robotics/Libssh Functions
171 *****************************************************************************/
184 pollfd_vector_ptr poll_sockets(int epfd, socket_pollfd *fds, nfds_t nfds, int timeout) {
185 #if defined(WIN32)
186  fd_set readfds, writefds, exceptfds;
187  struct timeval tv, *ptv;
188  socket_fd_t max_fd;
189  int rc;
190  nfds_t i;
192 
193  UNUSED(epfd);
194 
195  if (fds == NULL) {
196  errno = EFAULT;
197  return ofds;
198  }
199 
200  FD_ZERO (&readfds);
201  FD_ZERO (&writefds);
202  FD_ZERO (&exceptfds);
203 
204  /*********************
205  ** Compute fd sets
206  **********************/
207  // also find the largest descriptor.
208  for (rc = -1, max_fd = 0, i = 0; i < nfds; i++) {
209  if (fds[i].fd == INVALID_SOCKET) {
210  continue;
211  }
212  if (fds[i].events & (POLLIN | POLLRDNORM)) {
213  FD_SET (fds[i].fd, &readfds);
214  }
215  if (fds[i].events & (POLLOUT | POLLWRNORM | POLLWRBAND)) {
216  FD_SET (fds[i].fd, &writefds);
217  }
218  if (fds[i].events & (POLLPRI | POLLRDBAND)) {
219  FD_SET (fds[i].fd, &exceptfds);
220  }
221  if (fds[i].fd > max_fd &&
222  (fds[i].events & (POLLIN | POLLOUT | POLLPRI |
223  POLLRDNORM | POLLRDBAND |
224  POLLWRNORM | POLLWRBAND))) {
225  max_fd = fds[i].fd;
226  rc = 0;
227  }
228  }
229 
230  if (rc == -1) {
231  errno = EINVAL;
232  return ofds;
233  }
234  /*********************
235  ** Setting the timeout
236  **********************/
237  if (timeout < 0) {
238  ptv = NULL;
239  } else {
240  ptv = &tv;
241  if (timeout == 0) {
242  tv.tv_sec = 0;
243  tv.tv_usec = 0;
244  } else {
245  tv.tv_sec = timeout / 1000;
246  tv.tv_usec = (timeout % 1000) * 1000;
247  }
248  }
249 
250  rc = select (max_fd + 1, &readfds, &writefds, &exceptfds, ptv);
251  if (rc < 0) {
252  return ofds;
253  }
254  ofds.reset(new std::vector<socket_pollfd>);
255  if ( rc == 0 ) {
256  return ofds;
257  }
258 
259  for (rc = 0, i = 0; i < nfds; i++) {
260  if (fds[i].fd != INVALID_SOCKET) {
261  fds[i].revents = 0;
262 
263  if (FD_ISSET(fds[i].fd, &readfds)) {
264  int save_errno = errno;
265  char data[64] = {0};
266  int ret;
267 
268  /* support for POLLHUP */
269  // just check if there's incoming data, without removing it from the queue.
270  ret = recv(fds[i].fd, data, 64, MSG_PEEK);
271  #ifdef WIN32
272  if ((ret == -1) &&
273  (errno == WSAESHUTDOWN || errno == WSAECONNRESET ||
274  (errno == WSAECONNABORTED) || errno == WSAENETRESET))
275  #else
276  if ((ret == -1) &&
277  (errno == ESHUTDOWN || errno == ECONNRESET ||
278  (errno == ECONNABORTED) || errno == ENETRESET))
279  #endif
280  {
281  fds[i].revents |= POLLHUP;
282  } else {
283  fds[i].revents |= fds[i].events & (POLLIN | POLLRDNORM);
284  }
285  errno = save_errno;
286  }
287  if (FD_ISSET(fds[i].fd, &writefds)) {
288  fds[i].revents |= fds[i].events & (POLLOUT | POLLWRNORM | POLLWRBAND);
289  }
290 
291  if (FD_ISSET(fds[i].fd, &exceptfds)) {
292  fds[i].revents |= fds[i].events & (POLLPRI | POLLRDBAND);
293  }
294 
295  if (fds[i].revents & ~POLLHUP) {
296  rc++;
297  }
298  } else {
299  fds[i].revents = POLLNVAL;
300  }
301  ofds->push_back(fds[i]);
302  }
303  return ofds;
304 #elif defined (HAVE_EPOLL)
305  UNUSED(nfds);
306  UNUSED(fds);
307  struct epoll_event ev[nfds];
308  pollfd_vector_ptr ofds;
309 
310  int fd_cnt = ::epoll_wait(epfd, ev, nfds, timeout);
311 
312  if (fd_cnt < 0)
313  {
314  // EINTR means that we got interrupted by a signal, and is not an error
315  if(errno != EINTR) {
316  ROS_ERROR("Error in epoll_wait! %s", strerror(errno));
317  ofds.reset();
318  }
319  }
320  else
321  {
322  ofds.reset(new std::vector<socket_pollfd>);
323  for (int i = 0; i < fd_cnt; i++)
324  {
325  socket_pollfd pfd;
326  pfd.fd = ev[i].data.fd;
327  pfd.revents = ev[i].events;
328  ofds->push_back(pfd);
329  }
330  }
331  return ofds;
332 #else
333  UNUSED(epfd);
334  pollfd_vector_ptr ofds(new std::vector<socket_pollfd>);
335  // Clear the `revents` fields
336  for (nfds_t i = 0; i < nfds; i++)
337  {
338  fds[i].revents = 0;
339  }
340 
341  // use an existing poll implementation
342  int result = poll(fds, nfds, timeout);
343  if ( result < 0 )
344  {
345  // EINTR means that we got interrupted by a signal, and is not an error
346  if(errno != EINTR)
347  {
348  ROS_ERROR("Error in poll! %s", strerror(errno));
349  ofds.reset();
350  }
351  } else {
352  for (nfds_t i = 0; i < nfds; i++)
353  {
354  if (fds[i].revents)
355  {
356  ofds->push_back(fds[i]);
357  fds[i].revents = 0;
358  }
359  }
360  }
361  return ofds;
362 #endif // poll_sockets functions
363 }
364 /*****************************************************************************
365 ** Socket Utilities
366 *****************************************************************************/
372 #ifdef WIN32
373  u_long non_blocking = 1;
374  if(ioctlsocket( socket, FIONBIO, &non_blocking ) != 0 )
375  {
376  return WSAGetLastError();
377  }
378 #else
379  if(fcntl(socket, F_SETFL, O_NONBLOCK) == -1)
380  {
381  return errno;
382  }
383 #endif
384  return 0;
385 }
386 
392 int close_socket(socket_fd_t &socket) {
393 #ifdef WIN32
394  if(::closesocket(socket) == SOCKET_ERROR ) {
395  return -1;
396  } else {
397  return 0;
398  }
399 #else
400  if (::close(socket) < 0) {
401  return -1;
402  } else {
403  return 0;
404  }
405 #endif //WIN32
406 }
407 
408 /*****************************************************************************
409 ** Signal Pair
410 *****************************************************************************/
416 int create_signal_pair(signal_fd_t signal_pair[2]) {
417 #ifdef WIN32 // use a socket pair
418  signal_pair[0] = INVALID_SOCKET;
419  signal_pair[1] = INVALID_SOCKET;
420 
421  union {
422  struct sockaddr_in inaddr;
423  struct sockaddr addr;
424  } a;
425  socklen_t addrlen = sizeof(a.inaddr);
426 
427  /*********************
428  ** Listen Socket
429  **********************/
430  socket_fd_t listen_socket = INVALID_SOCKET;
431  listen_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
432  if (listen_socket == INVALID_SOCKET) {
433  return -1;
434  }
435 
436  // allow it to be bound to an address already in use - do we actually need this?
437  int reuse = 1;
438  if (setsockopt(listen_socket, SOL_SOCKET, SO_REUSEADDR, (char*) &reuse, (socklen_t) sizeof(reuse)) == SOCKET_ERROR ) {
439  ::closesocket(listen_socket);
440  return -1;
441  }
442 
443  memset(&a, 0, sizeof(a));
444  a.inaddr.sin_family = AF_INET;
445  a.inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
446  // For TCP/IP, if the port is specified as zero, the service provider assigns
447  // a unique port to the application from the dynamic client port range.
448  a.inaddr.sin_port = 0;
449 
450  if (bind(listen_socket, &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR) {
451  ::closesocket(listen_socket);
452  return -1;
453  }
454  // we need this below because the system auto filled in some entries, e.g. port #
455  if (getsockname(listen_socket, &a.addr, &addrlen) == SOCKET_ERROR) {
456  ::closesocket(listen_socket);
457  return -1;
458  }
459  // max 1 connection permitted
460  if (listen(listen_socket, 1) == SOCKET_ERROR) {
461  ::closesocket(listen_socket);
462  return -1;
463  }
464 
465  /*********************
466  ** Connection
467  **********************/
468  // do we need io overlapping?
469  // DWORD flags = (make_overlapped ? WSA_FLAG_OVERLAPPED : 0);
470  DWORD overlapped_flag = 0;
471  signal_pair[0] = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, overlapped_flag);
472  if (signal_pair[0] == INVALID_SOCKET) {
473  ::closesocket(listen_socket);
474  ::closesocket(signal_pair[0]);
475  return -1;
476  }
477  // reusing the information from above to connect to the listener
478  if (connect(signal_pair[0], &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR) {
479  ::closesocket(listen_socket);
480  ::closesocket(signal_pair[0]);
481  return -1;
482  }
483  /*********************
484  ** Accept
485  **********************/
486  signal_pair[1] = accept(listen_socket, NULL, NULL);
487  if (signal_pair[1] == INVALID_SOCKET) {
488  ::closesocket(listen_socket);
489  ::closesocket(signal_pair[0]);
490  ::closesocket(signal_pair[1]);
491  return -1;
492  }
493  /*********************
494  ** Nonblocking
495  **********************/
496  // should we do this or should we set io overlapping?
497  if ( (set_non_blocking(signal_pair[0]) != 0) || (set_non_blocking(signal_pair[1]) != 0) ) {
498  ::closesocket(listen_socket);
499  ::closesocket(signal_pair[0]);
500  ::closesocket(signal_pair[1]);
501  return -1;
502  }
503  /*********************
504  ** Cleanup
505  **********************/
506  ::closesocket(listen_socket); // the listener has done its job.
507  return 0;
508 #else // use a pipe pair
509  // initialize
510  signal_pair[0] = -1;
511  signal_pair[1] = -1;
512 
513  if(pipe(signal_pair) != 0) {
514  ROS_FATAL( "pipe() failed");
515  return -1;
516  }
517  if(fcntl(signal_pair[0], F_SETFL, O_NONBLOCK) == -1) {
518  ROS_FATAL( "fcntl() failed");
519  return -1;
520  }
521  if(fcntl(signal_pair[1], F_SETFL, O_NONBLOCK) == -1) {
522  ROS_FATAL( "fcntl() failed");
523  return -1;
524  }
525  return 0;
526 #endif // create_pipe
527 }
528 
529 } // namespace ros
ROSCPP_DECL void del_socket_from_watcher(int epfd, int fd)
Definition: io.cpp:137
ROSCPP_DECL int create_socket_watcher()
Definition: io.cpp:102
#define ROS_FATAL(...)
ROSCPP_DECL int set_non_blocking(socket_fd_t &socket)
Definition: io.cpp:371
int signal_fd_t
Definition: io.h:137
int socket_fd_t
Definition: io.h:136
ROSCPP_DECL void close_socket_watcher(int fd)
Definition: io.cpp:114
ROSCPP_DECL const char * last_socket_error_string()
Definition: io.cpp:74
ROSCPP_DECL bool last_socket_error_is_would_block()
Definition: io.cpp:86
ROSCPP_DECL pollfd_vector_ptr poll_sockets(int epfd, socket_pollfd *fds, nfds_t nfds, int timeout)
A cross platform polling function for sockets.
Definition: io.cpp:184
struct pollfd socket_pollfd
Definition: io.h:138
ROSCPP_DECL void set_events_on_socket(int epfd, int fd, int events)
Definition: io.cpp:149
ROSCPP_DECL int create_signal_pair(signal_fd_t signal_pair[2])
Definition: io.cpp:416
ROSCPP_DECL int last_socket_error()
Definition: io.cpp:67
ROSCPP_DECL int close_socket(socket_fd_t &socket)
Close the socket.
Definition: io.cpp:392
#define ROS_ERROR(...)
ROSCPP_DECL void add_socket_to_watcher(int epfd, int fd)
Definition: io.cpp:119
#define UNUSED(expr)
Definition: io.cpp:59


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas
autogenerated on Mon Feb 28 2022 23:33:27