52 #include <sys/epoll.h> 59 #define UNUSED(expr) do { (void)(expr); } while (0) 69 return WSAGetLastError();
78 std::stringstream ostream;
79 ostream <<
"WSA Error: " << WSAGetLastError();
80 return ostream.str().c_str();
82 return strerror(errno);
88 if ( WSAGetLastError() == WSAEWOULDBLOCK ) {
94 if ( ( errno == EAGAIN ) || ( errno == EWOULDBLOCK ) ) {
104 #if defined(HAVE_EPOLL) 105 epfd = ::epoll_create1(0);
108 ROS_ERROR(
"Unable to create epoll watcher: %s", strerror(errno));
120 #if defined(HAVE_EPOLL) 121 struct epoll_event ev;
122 bzero(&ev,
sizeof(ev));
127 if (::epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev))
129 ROS_ERROR(
"Unable to add FD to epoll: %s", strerror(errno));
138 #if defined(HAVE_EPOLL) 139 if (::epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL))
141 ROS_ERROR(
"Unable to remove FD to epoll: %s", strerror(errno));
150 #if defined(HAVE_EPOLL) 151 struct epoll_event ev;
152 bzero(&ev,
sizeof(ev));
156 if (::epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev))
158 ROS_ERROR(
"Unable to modify FD epoll: %s", strerror(errno));
186 fd_set readfds, writefds, exceptfds;
187 struct timeval tv, *ptv;
202 FD_ZERO (&exceptfds);
208 for (rc = -1, max_fd = 0, i = 0; i < nfds; i++) {
209 if (fds[i].fd == INVALID_SOCKET) {
212 if (fds[i].events & (POLLIN | POLLRDNORM)) {
213 FD_SET (fds[i].fd, &readfds);
215 if (fds[i].events & (POLLOUT | POLLWRNORM | POLLWRBAND)) {
216 FD_SET (fds[i].fd, &writefds);
218 if (fds[i].events & (POLLPRI | POLLRDBAND)) {
219 FD_SET (fds[i].fd, &exceptfds);
221 if (fds[i].fd > max_fd &&
222 (fds[i].events & (POLLIN | POLLOUT | POLLPRI |
223 POLLRDNORM | POLLRDBAND |
224 POLLWRNORM | POLLWRBAND))) {
245 tv.tv_sec = timeout / 1000;
246 tv.tv_usec = (timeout % 1000) * 1000;
250 rc = select (max_fd + 1, &readfds, &writefds, &exceptfds, ptv);
254 ofds.reset(
new std::vector<socket_pollfd>);
259 for (rc = 0, i = 0; i < nfds; i++) {
260 if (fds[i].fd != INVALID_SOCKET) {
263 if (FD_ISSET(fds[i].fd, &readfds)) {
264 int save_errno = errno;
270 ret = recv(fds[i].fd, data, 64, MSG_PEEK);
273 (errno == WSAESHUTDOWN || errno == WSAECONNRESET ||
274 (errno == WSAECONNABORTED) || errno == WSAENETRESET))
277 (errno == ESHUTDOWN || errno == ECONNRESET ||
278 (errno == ECONNABORTED) || errno == ENETRESET))
281 fds[i].revents |= POLLHUP;
283 fds[i].revents |= fds[i].events & (POLLIN | POLLRDNORM);
287 if (FD_ISSET(fds[i].fd, &writefds)) {
288 fds[i].revents |= fds[i].events & (POLLOUT | POLLWRNORM | POLLWRBAND);
291 if (FD_ISSET(fds[i].fd, &exceptfds)) {
292 fds[i].revents |= fds[i].events & (POLLPRI | POLLRDBAND);
295 if (fds[i].revents & ~POLLHUP) {
299 fds[i].revents = POLLNVAL;
301 ofds->push_back(fds[i]);
304 #elif defined (HAVE_EPOLL) 307 struct epoll_event ev[nfds];
310 int fd_cnt = ::epoll_wait(epfd, ev, nfds, timeout);
316 ROS_ERROR(
"Error in epoll_wait! %s", strerror(errno));
322 ofds.reset(
new std::vector<socket_pollfd>);
323 for (
int i = 0; i < fd_cnt; i++)
326 pfd.fd = ev[i].data.fd;
327 pfd.revents = ev[i].events;
328 ofds->push_back(pfd);
336 for (nfds_t i = 0; i < nfds; i++)
342 int result = poll(fds, nfds, timeout);
348 ROS_ERROR(
"Error in poll! %s", strerror(errno));
352 for (nfds_t i = 0; i < nfds; i++)
356 ofds->push_back(fds[i]);
362 #endif // poll_sockets functions 373 u_long non_blocking = 1;
374 if(ioctlsocket( socket, FIONBIO, &non_blocking ) != 0 )
376 return WSAGetLastError();
379 if(fcntl(socket, F_SETFL, O_NONBLOCK) == -1)
394 if(::closesocket(socket) == SOCKET_ERROR ) {
400 if (::close(socket) < 0) {
417 #ifdef WIN32 // use a socket pair 418 signal_pair[0] = INVALID_SOCKET;
419 signal_pair[1] = INVALID_SOCKET;
422 struct sockaddr_in inaddr;
423 struct sockaddr addr;
425 socklen_t addrlen =
sizeof(a.inaddr);
431 listen_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
432 if (listen_socket == INVALID_SOCKET) {
438 if (setsockopt(listen_socket, SOL_SOCKET, SO_REUSEADDR, (
char*) &reuse, (socklen_t)
sizeof(reuse)) == SOCKET_ERROR ) {
439 ::closesocket(listen_socket);
443 memset(&a, 0,
sizeof(a));
444 a.inaddr.sin_family = AF_INET;
445 a.inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
448 a.inaddr.sin_port = 0;
450 if (bind(listen_socket, &a.addr,
sizeof(a.inaddr)) == SOCKET_ERROR) {
451 ::closesocket(listen_socket);
455 if (getsockname(listen_socket, &a.addr, &addrlen) == SOCKET_ERROR) {
456 ::closesocket(listen_socket);
460 if (listen(listen_socket, 1) == SOCKET_ERROR) {
461 ::closesocket(listen_socket);
470 DWORD overlapped_flag = 0;
471 signal_pair[0] = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, overlapped_flag);
472 if (signal_pair[0] == INVALID_SOCKET) {
473 ::closesocket(listen_socket);
474 ::closesocket(signal_pair[0]);
478 if (connect(signal_pair[0], &a.addr,
sizeof(a.inaddr)) == SOCKET_ERROR) {
479 ::closesocket(listen_socket);
480 ::closesocket(signal_pair[0]);
486 signal_pair[1] = accept(listen_socket, NULL, NULL);
487 if (signal_pair[1] == INVALID_SOCKET) {
488 ::closesocket(listen_socket);
489 ::closesocket(signal_pair[0]);
490 ::closesocket(signal_pair[1]);
498 ::closesocket(listen_socket);
499 ::closesocket(signal_pair[0]);
500 ::closesocket(signal_pair[1]);
506 ::closesocket(listen_socket);
508 #else // use a pipe pair 513 if(pipe(signal_pair) != 0) {
517 if(fcntl(signal_pair[0], F_SETFL, O_NONBLOCK) == -1) {
521 if(fcntl(signal_pair[1], F_SETFL, O_NONBLOCK) == -1) {
526 #endif // create_pipe ROSCPP_DECL void del_socket_from_watcher(int epfd, int fd)
ROSCPP_DECL int create_socket_watcher()
ROSCPP_DECL int set_non_blocking(socket_fd_t &socket)
ROSCPP_DECL void close_socket_watcher(int fd)
ROSCPP_DECL const char * last_socket_error_string()
ROSCPP_DECL bool last_socket_error_is_would_block()
ROSCPP_DECL pollfd_vector_ptr poll_sockets(int epfd, socket_pollfd *fds, nfds_t nfds, int timeout)
A cross platform polling function for sockets.
struct pollfd socket_pollfd
ROSCPP_DECL void set_events_on_socket(int epfd, int fd, int events)
ROSCPP_DECL int create_signal_pair(signal_fd_t signal_pair[2])
ROSCPP_DECL int last_socket_error()
ROSCPP_DECL int close_socket(socket_fd_t &socket)
Close the socket.
ROSCPP_DECL void add_socket_to_watcher(int epfd, int fd)