transport_tcp.cpp
Go to the documentation of this file.
1 /*
2  * Software License Agreement (BSD License)
3  *
4  * Copyright (c) 2008, Willow Garage, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above
14  * copyright notice, this list of conditions and the following
15  * disclaimer in the documentation and/or other materials provided
16  * with the distribution.
17  * * Neither the name of Willow Garage, Inc. nor the names of its
18  * contributors may be used to endorse or promote products derived
19  * from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "ros/io.h"
37 #include "ros/poll_set.h"
38 #include "ros/header.h"
39 #include "ros/file_log.h"
40 #include <ros/assert.h>
41 #include <sstream>
42 #include <boost/bind.hpp>
43 #include <fcntl.h>
44 #include <errno.h>
45 #ifndef _WIN32
46  #include <sys/socket.h> // explicit include required for FreeBSD
47 #endif
48 namespace ros
49 {
50 
52 bool TransportTCP::s_use_ipv6_ = false;
53 
54 TransportTCP::TransportTCP(PollSet* poll_set, int flags)
55 : sock_(ROS_INVALID_SOCKET)
56 , closed_(false)
57 , expecting_read_(false)
58 , expecting_write_(false)
59 , is_server_(false)
60 , server_port_(-1)
61 , local_port_(-1)
62 , poll_set_(poll_set)
63 , flags_(flags)
64 {
65 
66 }
67 
69 {
70  ROS_ASSERT_MSG(sock_ == -1, "TransportTCP socket [%d] was never closed", sock_);
71 }
72 
74 {
75  sock_ = sock;
76  return initializeSocket();
77 }
78 
80 {
81  if (!(flags_ & SYNCHRONOUS))
82  {
83  int result = set_non_blocking(sock_);
84  if ( result != 0 ) {
85  ROS_ERROR("setting socket [%d] as non_blocking failed with error [%d]", sock_, result);
86  close();
87  return false;
88  }
89  }
90 
91  return true;
92 }
93 
95 {
97 
98  if (!setNonBlocking())
99  {
100  return false;
101  }
102 
103  setKeepAlive(s_use_keepalive_, 60, 10, 9);
104 
105  // connect() will set cached_remote_host_ because it already has the host/port available
106  if (cached_remote_host_.empty())
107  {
108  if (is_server_)
109  {
110  cached_remote_host_ = "TCPServer Socket";
111  }
112  else
113  {
114  std::stringstream ss;
115  ss << getClientURI() << " on socket " << sock_;
116  cached_remote_host_ = ss.str();
117  }
118  }
119 
120  if (local_port_ < 0)
121  {
122  la_len_ = s_use_ipv6_ ? sizeof(sockaddr_in6) : sizeof(sockaddr_in);
123  getsockname(sock_, (sockaddr *)&local_address_, &la_len_);
124  switch (local_address_.ss_family)
125  {
126  case AF_INET:
127  local_port_ = ntohs(((sockaddr_in *)&local_address_)->sin_port);
128  break;
129  case AF_INET6:
130  local_port_ = ntohs(((sockaddr_in6 *)&local_address_)->sin6_port);
131  break;
132  }
133  }
134 
135 #ifdef ROSCPP_USE_TCP_NODELAY
136  setNoDelay(true);
137 #endif
138 
140  if (poll_set_)
141  {
142  ROS_DEBUG("Adding tcp socket [%d] to pollset", sock_);
143  poll_set_->addSocket(sock_, boost::bind(&TransportTCP::socketUpdate, this, _1), shared_from_this());
144 #if defined(POLLRDHUP) // POLLRDHUP is not part of POSIX!
145  // This is needed to detect dead connections. #1704
146  poll_set_->addEvents(sock_, POLLRDHUP);
147 #endif
148  }
149 
150  if (!(flags_ & SYNCHRONOUS))
151  {
152  //enableRead();
153  }
154 
155  return true;
156 }
157 
159 {
160  std::string nodelay;
161  if (header.getValue("tcp_nodelay", nodelay) && nodelay == "1")
162  {
163  ROSCPP_LOG_DEBUG("Setting nodelay on socket [%d]", sock_);
164  setNoDelay(true);
165  }
166 }
167 
168 void TransportTCP::setNoDelay(bool nodelay)
169 {
170  int flag = nodelay ? 1 : 0;
171  int result = setsockopt(sock_, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
172  if (result < 0)
173  {
174  ROS_ERROR("setsockopt failed to set TCP_NODELAY on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
175  }
176 }
177 
178 void TransportTCP::setKeepAlive(bool use, uint32_t idle, uint32_t interval, uint32_t count)
179 {
180  if (use)
181  {
182  int val = 1;
183  if (setsockopt(sock_, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast<const char*>(&val), sizeof(val)) != 0)
184  {
185  ROS_DEBUG("setsockopt failed to set SO_KEEPALIVE on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
186  }
187 
188 /* cygwin SOL_TCP does not seem to support TCP_KEEPIDLE, TCP_KEEPINTVL, TCP_KEEPCNT */
189 #if defined(SOL_TCP) && defined(TCP_KEEPIDLE)
190  val = idle;
191  if (setsockopt(sock_, SOL_TCP, TCP_KEEPIDLE, &val, sizeof(val)) != 0)
192  {
193  ROS_DEBUG("setsockopt failed to set TCP_KEEPIDLE on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
194  }
195 #else
196  (void)idle;
197 #endif
198 
199 #if defined(SOL_TCP) && defined(TCP_KEEPINTVL)
200  val = interval;
201  if (setsockopt(sock_, SOL_TCP, TCP_KEEPINTVL, &val, sizeof(val)) != 0)
202  {
203  ROS_DEBUG("setsockopt failed to set TCP_KEEPINTVL on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
204  }
205 #else
206  (void)interval;
207 #endif
208 
209 #if defined(SOL_TCP) && defined(TCP_KEEPCNT)
210  val = count;
211  if (setsockopt(sock_, SOL_TCP, TCP_KEEPCNT, &val, sizeof(val)) != 0)
212  {
213  ROS_DEBUG("setsockopt failed to set TCP_KEEPCNT on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
214  }
215 #else
216  (void)count;
217 #endif
218  }
219  else
220  {
221  int val = 0;
222  if (setsockopt(sock_, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast<const char*>(&val), sizeof(val)) != 0)
223  {
224  ROS_DEBUG("setsockopt failed to set SO_KEEPALIVE on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
225  }
226  }
227 }
228 
229 bool TransportTCP::connect(const std::string& host, int port)
230 {
231  if (!isHostAllowed(host))
232  return false; // adios amigo
233 
234  sock_ = socket(s_use_ipv6_ ? AF_INET6 : AF_INET, SOCK_STREAM, 0);
235  connected_host_ = host;
236  connected_port_ = port;
237 
238  if (sock_ == ROS_INVALID_SOCKET)
239  {
240  ROS_ERROR("socket() failed with error [%s]", last_socket_error_string());
241  return false;
242  }
243 
244  setNonBlocking();
245 
246  sockaddr_storage sas;
247  socklen_t sas_len;
248 
249  in_addr ina;
250  in6_addr in6a;
251  if (inet_pton(AF_INET, host.c_str(), &ina) == 1)
252  {
253  sockaddr_in *address = (sockaddr_in*) &sas;
254  sas_len = sizeof(sockaddr_in);
255 
256  la_len_ = sizeof(sockaddr_in);
257  address->sin_family = AF_INET;
258  address->sin_port = htons(port);
259  address->sin_addr.s_addr = ina.s_addr;
260  }
261  else if (inet_pton(AF_INET6, host.c_str(), &in6a) == 1)
262  {
263  sockaddr_in6 *address = (sockaddr_in6*) &sas;
264  sas_len = sizeof(sockaddr_in6);
265  la_len_ = sizeof(sockaddr_in6);
266  address->sin6_family = AF_INET6;
267  address->sin6_port = htons(port);
268  memcpy(address->sin6_addr.s6_addr, in6a.s6_addr, sizeof(in6a.s6_addr));
269  }
270  else
271  {
272  struct addrinfo* addr;
273  struct addrinfo hints;
274  memset(&hints, 0, sizeof(hints));
275  hints.ai_family = AF_UNSPEC;
276 
277  if (getaddrinfo(host.c_str(), NULL, &hints, &addr) != 0)
278  {
279  close();
280  ROS_ERROR("couldn't resolve publisher host [%s]", host.c_str());
281  return false;
282  }
283 
284  bool found = false;
285  struct addrinfo* it = addr;
286  char namebuf[128] = {};
287  for (; it; it = it->ai_next)
288  {
289  if (!s_use_ipv6_ && it->ai_family == AF_INET)
290  {
291  sockaddr_in *address = (sockaddr_in*) &sas;
292  sas_len = sizeof(*address);
293 
294  memcpy(address, it->ai_addr, it->ai_addrlen);
295  address->sin_family = it->ai_family;
296  address->sin_port = htons(port);
297 
298  strncpy(namebuf, inet_ntoa(address->sin_addr), sizeof(namebuf)-1);
299  found = true;
300  break;
301  }
302  if (s_use_ipv6_ && it->ai_family == AF_INET6)
303  {
304  sockaddr_in6 *address = (sockaddr_in6*) &sas;
305  sas_len = sizeof(*address);
306 
307  memcpy(address, it->ai_addr, it->ai_addrlen);
308  address->sin6_family = it->ai_family;
309  address->sin6_port = htons((u_short) port);
310 
311  // TODO IPV6: does inet_ntop need other includes for Windows?
312  inet_ntop(AF_INET6, (void*)&(address->sin6_addr), namebuf, sizeof(namebuf));
313  found = true;
314  break;
315  }
316  }
317 
318  freeaddrinfo(addr);
319 
320  if (!found)
321  {
322  ROS_ERROR("Couldn't resolve an address for [%s]\n", host.c_str());
323  return false;
324  }
325 
326  ROSCPP_LOG_DEBUG("Resolved publisher host [%s] to [%s] for socket [%d]", host.c_str(), namebuf, sock_);
327  }
328 
329  int ret = ::connect(sock_, (sockaddr*) &sas, sas_len);
330  // windows might need some time to sleep (input from service robotics hack) add this if testing proves it is necessary.
331  // ROS_ASSERT((flags_ & SYNCHRONOUS) || ret != 0);
332  if (((flags_ & SYNCHRONOUS) && ret != 0) || // synchronous, connect() should return 0
333  (!(flags_ & SYNCHRONOUS) && ret != 0 && last_socket_error() != ROS_SOCKETS_ASYNCHRONOUS_CONNECT_RETURN))
334  // asynchronous, connect() may return 0 or -1. When return -1, WSAGetLastError()=WSAEWOULDBLOCK/errno=EINPROGRESS
335  {
336  ROSCPP_CONN_LOG_DEBUG("Connect to tcpros publisher [%s:%d] failed with error [%d, %s]", host.c_str(), port, ret, last_socket_error_string());
337  close();
338 
339  return false;
340  }
341 
342  // from daniel stonier:
343 #ifdef WIN32
344  // This is hackish, but windows fails at recv() if its slow to connect (e.g. happens with wireless)
345  // recv() needs to check if its connected or not when its asynchronous?
346  Sleep(100);
347 #endif
348 
349 
350  std::stringstream ss;
351  ss << host << ":" << port << " on socket " << sock_;
352  cached_remote_host_ = ss.str();
353 
354  if (!initializeSocket())
355  {
356  return false;
357  }
358 
359  if (flags_ & SYNCHRONOUS)
360  {
361  ROSCPP_CONN_LOG_DEBUG("connect() succeeded to [%s:%d] on socket [%d]", host.c_str(), port, sock_);
362  }
363  else
364  {
365  ROSCPP_CONN_LOG_DEBUG("Async connect() in progress to [%s:%d] on socket [%d]", host.c_str(), port, sock_);
366  }
367 
368  return true;
369 }
370 
371 bool TransportTCP::listen(int port, int backlog, const AcceptCallback& accept_cb)
372 {
373  is_server_ = true;
374  accept_cb_ = accept_cb;
375 
376  if (s_use_ipv6_)
377  {
378  sock_ = socket(AF_INET6, SOCK_STREAM, 0);
379  sockaddr_in6 *address = (sockaddr_in6 *)&server_address_;
380  address->sin6_family = AF_INET6;
381  address->sin6_addr = isOnlyLocalhostAllowed() ?
382  in6addr_loopback :
383  in6addr_any;
384  address->sin6_port = htons(port);
385  sa_len_ = sizeof(sockaddr_in6);
386  }
387  else
388  {
389  sock_ = socket(AF_INET, SOCK_STREAM, 0);
390  sockaddr_in *address = (sockaddr_in *)&server_address_;
391  address->sin_family = AF_INET;
392  address->sin_addr.s_addr = isOnlyLocalhostAllowed() ?
393  htonl(INADDR_LOOPBACK) :
394  INADDR_ANY;
395  address->sin_port = htons(port);
396  sa_len_ = sizeof(sockaddr_in);
397  }
398 
399  if (sock_ == ROS_INVALID_SOCKET)
400  {
401  ROS_ERROR("socket() failed with error [%s]", last_socket_error_string());
402  return false;
403  }
404 
405 
406  if (bind(sock_, (sockaddr *)&server_address_, sa_len_) < 0)
407  {
408  ROS_ERROR("bind() failed with error [%s]", last_socket_error_string());
409  return false;
410  }
411 
412  ::listen(sock_, backlog);
413  getsockname(sock_, (sockaddr *)&server_address_, &sa_len_);
414 
415  switch (server_address_.ss_family)
416  {
417  case AF_INET:
418  server_port_ = ntohs(((sockaddr_in *)&server_address_)->sin_port);
419  break;
420  case AF_INET6:
421  server_port_ = ntohs(((sockaddr_in6 *)&server_address_)->sin6_port);
422  break;
423  }
424 
425  if (!initializeSocket())
426  {
427  return false;
428  }
429 
430  if (!(flags_ & SYNCHRONOUS))
431  {
432  enableRead();
433  }
434 
435  return true;
436 }
437 
439 {
440  Callback disconnect_cb;
441 
442  if (!closed_)
443  {
444  {
445  boost::recursive_mutex::scoped_lock lock(close_mutex_);
446 
447  if (!closed_)
448  {
449  closed_ = true;
450 
452 
453  if (poll_set_)
454  {
456  }
457 
459  if ( close_socket(sock_) != 0 )
460  {
461  ROS_ERROR("Error closing socket [%d]: [%s]", sock_, last_socket_error_string());
462  } else
463  {
464  ROSCPP_LOG_DEBUG("TCP socket [%d] closed", sock_);
465  }
467 
468  disconnect_cb = disconnect_cb_;
469 
471  read_cb_ = Callback();
472  write_cb_ = Callback();
474  }
475  }
476  }
477 
478  if (disconnect_cb)
479  {
480  disconnect_cb(shared_from_this());
481  }
482 }
483 
484 int32_t TransportTCP::read(uint8_t* buffer, uint32_t size)
485 {
486  {
487  boost::recursive_mutex::scoped_lock lock(close_mutex_);
488 
489  if (closed_)
490  {
491  ROSCPP_LOG_DEBUG("Tried to read on a closed socket [%d]", sock_);
492  return -1;
493  }
494  }
495 
496  ROS_ASSERT(size > 0);
497 
498  // never read more than INT_MAX since this is the maximum we can report back with the current return type
499  uint32_t read_size = std::min(size, static_cast<uint32_t>(INT_MAX));
500  int num_bytes = ::recv(sock_, reinterpret_cast<char*>(buffer), read_size, 0);
501  if (num_bytes < 0)
502  {
503  if ( !last_socket_error_is_would_block() ) // !WSAWOULDBLOCK / !EAGAIN && !EWOULDBLOCK
504  {
505  ROSCPP_LOG_DEBUG("recv() on socket [%d] failed with error [%s]", sock_, last_socket_error_string());
506  close();
507  }
508  else
509  {
510  num_bytes = 0;
511  }
512  }
513  else if (num_bytes == 0)
514  {
515  ROSCPP_LOG_DEBUG("Socket [%d] received 0/%u bytes, closing", sock_, size);
516  close();
517  return -1;
518  }
519 
520  return num_bytes;
521 }
522 
523 int32_t TransportTCP::write(uint8_t* buffer, uint32_t size)
524 {
525  {
526  boost::recursive_mutex::scoped_lock lock(close_mutex_);
527 
528  if (closed_)
529  {
530  ROSCPP_LOG_DEBUG("Tried to write on a closed socket [%d]", sock_);
531  return -1;
532  }
533  }
534 
535  ROS_ASSERT(size > 0);
536 
537  // never write more than INT_MAX since this is the maximum we can report back with the current return type
538  uint32_t writesize = std::min(size, static_cast<uint32_t>(INT_MAX));
539  int num_bytes = ::send(sock_, reinterpret_cast<const char*>(buffer), writesize, 0);
540  if (num_bytes < 0)
541  {
543  {
544  ROSCPP_LOG_DEBUG("send() on socket [%d] failed with error [%s]", sock_, last_socket_error_string());
545  close();
546  }
547  else
548  {
549  num_bytes = 0;
550  }
551  }
552 
553  return num_bytes;
554 }
555 
557 {
559 
560  {
561  boost::recursive_mutex::scoped_lock lock(close_mutex_);
562 
563  if (closed_)
564  {
565  return;
566  }
567  }
568 
569  if (!expecting_read_)
570  {
571  poll_set_->addEvents(sock_, POLLIN);
572  expecting_read_ = true;
573  }
574 }
575 
577 {
579 
580  {
581  boost::recursive_mutex::scoped_lock lock(close_mutex_);
582 
583  if (closed_)
584  {
585  return;
586  }
587  }
588 
589  if (expecting_read_)
590  {
591  poll_set_->delEvents(sock_, POLLIN);
592  expecting_read_ = false;
593  }
594 }
595 
597 {
599 
600  {
601  boost::recursive_mutex::scoped_lock lock(close_mutex_);
602 
603  if (closed_)
604  {
605  return;
606  }
607  }
608 
609  if (!expecting_write_)
610  {
611  poll_set_->addEvents(sock_, POLLOUT);
612  expecting_write_ = true;
613  }
614 }
615 
617 {
619 
620  {
621  boost::recursive_mutex::scoped_lock lock(close_mutex_);
622 
623  if (closed_)
624  {
625  return;
626  }
627  }
628 
629  if (expecting_write_)
630  {
631  poll_set_->delEvents(sock_, POLLOUT);
632  expecting_write_ = false;
633  }
634 }
635 
637 {
639 
640  sockaddr client_address;
641  socklen_t len = sizeof(client_address);
642  int new_sock = ::accept(sock_, (sockaddr *)&client_address, &len);
643  if (new_sock >= 0)
644  {
645  ROSCPP_LOG_DEBUG("Accepted connection on socket [%d], new socket [%d]", sock_, new_sock);
646 
647  TransportTCPPtr transport(boost::make_shared<TransportTCP>(poll_set_, flags_));
648  if (!transport->setSocket(new_sock))
649  {
650  ROS_ERROR("Failed to set socket on transport for socket %d", new_sock);
651  }
652 
653  return transport;
654  }
655  else
656  {
657  ROS_ERROR("accept() on socket [%d] failed with error [%s]", sock_, last_socket_error_string());
658  }
659 
660  return TransportTCPPtr();
661 }
662 
664 {
665  {
666  boost::recursive_mutex::scoped_lock lock(close_mutex_);
667  if (closed_)
668  {
669  return;
670  }
671 
672  // Handle read events before err/hup/nval, since there may be data left on the wire
673  if ((events & POLLIN) && expecting_read_)
674  {
675  if (is_server_)
676  {
677  // Should not block here, because poll() said that it's ready
678  // for reading
679  TransportTCPPtr transport = accept();
680  if (transport)
681  {
683  accept_cb_(transport);
684  }
685  }
686  else
687  {
688  if (read_cb_)
689  {
690  read_cb_(shared_from_this());
691  }
692  }
693  }
694 
695  if ((events & POLLOUT) && expecting_write_)
696  {
697  if (write_cb_)
698  {
699  write_cb_(shared_from_this());
700  }
701  }
702  }
703 
704  if((events & POLLERR) ||
705  (events & POLLHUP) ||
706 #if defined(POLLRDHUP) // POLLRDHUP is not part of POSIX!
707  (events & POLLRDHUP) ||
708 #endif
709  (events & POLLNVAL))
710  {
711  uint32_t error = -1;
712  socklen_t len = sizeof(error);
713  if (getsockopt(sock_, SOL_SOCKET, SO_ERROR, reinterpret_cast<char*>(&error), &len) < 0)
714  {
715  ROSCPP_LOG_DEBUG("getsockopt failed on socket [%d]", sock_);
716  }
717  #ifdef _MSC_VER
718  char err[60];
719  strerror_s(err,60,error);
720  ROSCPP_LOG_DEBUG("Socket %d closed with (ERR|HUP|NVAL) events %d: %s", sock_, events, err);
721  #else
722  ROSCPP_LOG_DEBUG("Socket %d closed with (ERR|HUP|NVAL) events %d: %s", sock_, events, strerror(error));
723  #endif
724 
725  close();
726  }
727 }
728 
730 {
731  std::stringstream str;
732  str << "TCPROS connection on port " << local_port_ << " to [" << cached_remote_host_ << "]";
733  return str.str();
734 }
735 
737 {
739 
740  sockaddr_storage sas;
741  socklen_t sas_len = sizeof(sas);
742  getpeername(sock_, (sockaddr *)&sas, &sas_len);
743 
744  sockaddr_in *sin = (sockaddr_in *)&sas;
745  sockaddr_in6 *sin6 = (sockaddr_in6 *)&sas;
746 
747  char namebuf[128] = {};
748  int port;
749 
750  switch (sas.ss_family)
751  {
752  case AF_INET:
753  port = ntohs(sin->sin_port);
754  strncpy(namebuf, inet_ntoa(sin->sin_addr), sizeof(namebuf)-1);
755  break;
756  case AF_INET6:
757  port = ntohs(sin6->sin6_port);
758  inet_ntop(AF_INET6, (void*)&(sin6->sin6_addr), namebuf, sizeof(namebuf));
759  break;
760  default:
761  namebuf[0] = 0;
762  port = 0;
763  break;
764  }
765 
766  std::string ip = namebuf;
767  std::stringstream uri;
768  uri << ip << ":" << port;
769 
770  return uri.str();
771 }
772 
773 } // namespace ros
boost::function< void(const TransportTCPPtr &)> AcceptCallback
Definition: transport_tcp.h:84
virtual void parseHeader(const Header &header)
Provides an opportunity for transport-specific options to come in through the header.
bool addEvents(int sock, int events)
Add events to be polled on a socket.
Definition: poll_set.cpp:126
ROSCPP_DECL int set_non_blocking(socket_fd_t &socket)
Definition: io.cpp:371
socket_fd_t sock_
virtual int32_t write(uint8_t *buffer, uint32_t size)
Write a number of bytes from the supplied buffer. Not guaranteed to actually write that number of byt...
virtual void disableWrite()
Disable writing on this transport. Allows derived classes to, for example, disable write polling for ...
std::string connected_host_
Callback read_cb_
Definition: transport.h:137
Manages a set of sockets being polled through the poll() function call.
Definition: poll_set.h:57
virtual void enableRead()
Enable reading on this transport. Allows derived classes to, for example, enable read polling for asy...
std::string cached_remote_host_
virtual void enableWrite()
Enable writing on this transport. Allows derived classes to, for example, enable write polling for as...
ROSCPP_DECL const char * last_socket_error_string()
Definition: io.cpp:74
void setNoDelay(bool nodelay)
#define ROSCPP_LOG_DEBUG(...)
Definition: file_log.h:35
AcceptCallback accept_cb_
bool setSocket(int sock)
Set the socket to be used by this transport.
virtual void disableRead()
Disable reading on this transport. Allows derived classes to, for example, disable read polling for a...
static bool s_use_ipv6_
Definition: transport_tcp.h:60
ROSCPP_DECL bool last_socket_error_is_would_block()
Definition: io.cpp:86
#define ROS_ASSERT_MSG(cond,...)
virtual void close()
Close this transport. Once this call has returned, writing on this transport should always return an ...
bool delEvents(int sock, int events)
Delete events to be polled on a socket.
Definition: poll_set.cpp:148
virtual std::string getTransportInfo()
Returns a string description of both the type of transport and who the transport is connected to...
bool initializeSocket()
Initializes the assigned socket – sets it to non-blocking and enables reading.
bool delSocket(int sock)
Delete a socket.
Definition: poll_set.cpp:94
boost::recursive_mutex close_mutex_
#define ROS_SOCKETS_ASYNCHRONOUS_CONNECT_RETURN
Definition: io.h:108
virtual int32_t read(uint8_t *buffer, uint32_t size)
Read a number of bytes into the supplied buffer. Not guaranteed to actually read that number of bytes...
sockaddr_storage server_address_
TransportTCPPtr accept()
Accept a connection on a server socket. Blocks until a connection is available.
sockaddr_storage local_address_
bool listen(int port, int backlog, const AcceptCallback &accept_cb)
Start a server socket and listen on a port.
boost::function< void(const TransportPtr &)> Callback
Definition: transport.h:106
Callback disconnect_cb_
Definition: transport.h:136
static bool s_use_keepalive_
Definition: transport_tcp.h:59
void socketUpdate(int events)
boost::shared_ptr< TransportTCP > TransportTCPPtr
Definition: forwards.h:58
#define ROS_INVALID_SOCKET
Definition: io.h:107
Callback write_cb_
Definition: transport.h:138
#define ROSCPP_CONN_LOG_DEBUG(...)
Definition: file_log.h:36
TransportTCP(PollSet *poll_set, int flags=0)
bool getValue(const std::string &key, std::string &value) const
ROSCPP_DECL void shutdown()
Disconnects everything and unregisters from the master. It is generally not necessary to call this fu...
Definition: init.cpp:594
#define ROS_SOCKETS_SHUT_RDWR
Definition: io.h:106
std::string getClientURI()
Returns the URI of the remote host.
bool connect(const std::string &host, int port)
Connect to a remote host.
ROSCPP_DECL int last_socket_error()
Definition: io.cpp:67
void setKeepAlive(bool use, uint32_t idle, uint32_t interval, uint32_t count)
#define ROS_ASSERT(cond)
ROSCPP_DECL int close_socket(socket_fd_t &socket)
Close the socket.
Definition: io.cpp:392
bool isHostAllowed(const std::string &host) const
returns true if the transport is allowed to connect to the host passed to it.
Definition: transport.cpp:111
bool isOnlyLocalhostAllowed() const
returns true if this transport is only allowed to talk to localhost
Definition: transport.h:148
bool addSocket(int sock, const SocketUpdateFunc &update_func, const TransportPtr &transport=TransportPtr())
Add a socket.
Definition: poll_set.cpp:66
#define ROS_ERROR(...)
#define ROS_DEBUG(...)
virtual ~TransportTCP()


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas
autogenerated on Mon Feb 28 2022 23:33:27