transport_tcp.cpp
Go to the documentation of this file.
1 /*
2  * Software License Agreement (BSD License)
3  *
4  * Copyright (c) 2008, Willow Garage, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above
14  * copyright notice, this list of conditions and the following
15  * disclaimer in the documentation and/or other materials provided
16  * with the distribution.
17  * * Neither the name of Willow Garage, Inc. nor the names of its
18  * contributors may be used to endorse or promote products derived
19  * from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "ros/io.h"
37 #include "ros/poll_set.h"
38 #include "ros/header.h"
39 #include "ros/file_log.h"
40 #include <ros/assert.h>
41 #include <sstream>
42 #include <boost/bind.hpp>
43 #include <fcntl.h>
44 #include <errno.h>
45 namespace ros
46 {
47 
49 bool TransportTCP::s_use_ipv6_ = false;
50 
51 TransportTCP::TransportTCP(PollSet* poll_set, int flags)
52 : sock_(ROS_INVALID_SOCKET)
53 , closed_(false)
54 , expecting_read_(false)
55 , expecting_write_(false)
56 , is_server_(false)
57 , server_port_(-1)
58 , local_port_(-1)
59 , poll_set_(poll_set)
60 , flags_(flags)
61 {
62 
63 }
64 
66 {
67  ROS_ASSERT_MSG(sock_ == -1, "TransportTCP socket [%d] was never closed", sock_);
68 }
69 
71 {
72  sock_ = sock;
73  return initializeSocket();
74 }
75 
77 {
78  if (!(flags_ & SYNCHRONOUS))
79  {
80  int result = set_non_blocking(sock_);
81  if ( result != 0 ) {
82  ROS_ERROR("setting socket [%d] as non_blocking failed with error [%d]", sock_, result);
83  close();
84  return false;
85  }
86  }
87 
88  return true;
89 }
90 
92 {
94 
95  if (!setNonBlocking())
96  {
97  return false;
98  }
99 
100  setKeepAlive(s_use_keepalive_, 60, 10, 9);
101 
102  // connect() will set cached_remote_host_ because it already has the host/port available
103  if (cached_remote_host_.empty())
104  {
105  if (is_server_)
106  {
107  cached_remote_host_ = "TCPServer Socket";
108  }
109  else
110  {
111  std::stringstream ss;
112  ss << getClientURI() << " on socket " << sock_;
113  cached_remote_host_ = ss.str();
114  }
115  }
116 
117  if (local_port_ < 0)
118  {
119  la_len_ = s_use_ipv6_ ? sizeof(sockaddr_in6) : sizeof(sockaddr_in);
120  getsockname(sock_, (sockaddr *)&local_address_, &la_len_);
121  switch (local_address_.ss_family)
122  {
123  case AF_INET:
124  local_port_ = ntohs(((sockaddr_in *)&local_address_)->sin_port);
125  break;
126  case AF_INET6:
127  local_port_ = ntohs(((sockaddr_in6 *)&local_address_)->sin6_port);
128  break;
129  }
130  }
131 
132 #ifdef ROSCPP_USE_TCP_NODELAY
133  setNoDelay(true);
134 #endif
135 
137  if (poll_set_)
138  {
139  ROS_DEBUG("Adding tcp socket [%d] to pollset", sock_);
140  poll_set_->addSocket(sock_, boost::bind(&TransportTCP::socketUpdate, this, _1), shared_from_this());
141 #if defined(POLLRDHUP) // POLLRDHUP is not part of POSIX!
142  // This is needed to detect dead connections. #1704
143  poll_set_->addEvents(sock_, POLLRDHUP);
144 #endif
145  }
146 
147  if (!(flags_ & SYNCHRONOUS))
148  {
149  //enableRead();
150  }
151 
152  return true;
153 }
154 
156 {
157  std::string nodelay;
158  if (header.getValue("tcp_nodelay", nodelay) && nodelay == "1")
159  {
160  ROSCPP_LOG_DEBUG("Setting nodelay on socket [%d]", sock_);
161  setNoDelay(true);
162  }
163 }
164 
165 void TransportTCP::setNoDelay(bool nodelay)
166 {
167  int flag = nodelay ? 1 : 0;
168  int result = setsockopt(sock_, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
169  if (result < 0)
170  {
171  ROS_ERROR("setsockopt failed to set TCP_NODELAY on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
172  }
173 }
174 
175 void TransportTCP::setKeepAlive(bool use, uint32_t idle, uint32_t interval, uint32_t count)
176 {
177  if (use)
178  {
179  int val = 1;
180  if (setsockopt(sock_, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast<const char*>(&val), sizeof(val)) != 0)
181  {
182  ROS_DEBUG("setsockopt failed to set SO_KEEPALIVE on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
183  }
184 
185 /* cygwin SOL_TCP does not seem to support TCP_KEEPIDLE, TCP_KEEPINTVL, TCP_KEEPCNT */
186 #if defined(SOL_TCP) && defined(TCP_KEEPIDLE)
187  val = idle;
188  if (setsockopt(sock_, SOL_TCP, TCP_KEEPIDLE, &val, sizeof(val)) != 0)
189  {
190  ROS_DEBUG("setsockopt failed to set TCP_KEEPIDLE on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
191  }
192 #endif
193 
194 #if defined(SOL_TCP) && defined(TCP_KEEPINTVL)
195  val = interval;
196  if (setsockopt(sock_, SOL_TCP, TCP_KEEPINTVL, &val, sizeof(val)) != 0)
197  {
198  ROS_DEBUG("setsockopt failed to set TCP_KEEPINTVL on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
199  }
200 #endif
201 
202 #if defined(SOL_TCP) && defined(TCP_KEEPCNT)
203  val = count;
204  if (setsockopt(sock_, SOL_TCP, TCP_KEEPCNT, &val, sizeof(val)) != 0)
205  {
206  ROS_DEBUG("setsockopt failed to set TCP_KEEPCNT on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
207  }
208 #endif
209  }
210  else
211  {
212  int val = 0;
213  if (setsockopt(sock_, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast<const char*>(&val), sizeof(val)) != 0)
214  {
215  ROS_DEBUG("setsockopt failed to set SO_KEEPALIVE on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
216  }
217  }
218 }
219 
220 bool TransportTCP::connect(const std::string& host, int port)
221 {
222  if (!isHostAllowed(host))
223  return false; // adios amigo
224 
225  sock_ = socket(s_use_ipv6_ ? AF_INET6 : AF_INET, SOCK_STREAM, 0);
226  connected_host_ = host;
227  connected_port_ = port;
228 
229  if (sock_ == ROS_INVALID_SOCKET)
230  {
231  ROS_ERROR("socket() failed with error [%s]", last_socket_error_string());
232  return false;
233  }
234 
235  setNonBlocking();
236 
237  sockaddr_storage sas;
238  socklen_t sas_len;
239 
240  in_addr ina;
241  in6_addr in6a;
242  if (inet_pton(AF_INET, host.c_str(), &ina) == 1)
243  {
244  sockaddr_in *address = (sockaddr_in*) &sas;
245  sas_len = sizeof(sockaddr_in);
246 
247  la_len_ = sizeof(sockaddr_in);
248  address->sin_family = AF_INET;
249  address->sin_port = htons(port);
250  address->sin_addr.s_addr = ina.s_addr;
251  }
252  else if (inet_pton(AF_INET6, host.c_str(), &in6a) == 1)
253  {
254  sockaddr_in6 *address = (sockaddr_in6*) &sas;
255  sas_len = sizeof(sockaddr_in6);
256  la_len_ = sizeof(sockaddr_in6);
257  address->sin6_family = AF_INET6;
258  address->sin6_port = htons(port);
259  memcpy(address->sin6_addr.s6_addr, in6a.s6_addr, sizeof(in6a.s6_addr));
260  }
261  else
262  {
263  struct addrinfo* addr;
264  struct addrinfo hints;
265  memset(&hints, 0, sizeof(hints));
266  hints.ai_family = AF_UNSPEC;
267 
268  if (getaddrinfo(host.c_str(), NULL, &hints, &addr) != 0)
269  {
270  close();
271  ROS_ERROR("couldn't resolve publisher host [%s]", host.c_str());
272  return false;
273  }
274 
275  bool found = false;
276  struct addrinfo* it = addr;
277  char namebuf[128] = {};
278  for (; it; it = it->ai_next)
279  {
280  if (!s_use_ipv6_ && it->ai_family == AF_INET)
281  {
282  sockaddr_in *address = (sockaddr_in*) &sas;
283  sas_len = sizeof(*address);
284 
285  memcpy(address, it->ai_addr, it->ai_addrlen);
286  address->sin_family = it->ai_family;
287  address->sin_port = htons(port);
288 
289  strncpy(namebuf, inet_ntoa(address->sin_addr), sizeof(namebuf)-1);
290  found = true;
291  break;
292  }
293  if (s_use_ipv6_ && it->ai_family == AF_INET6)
294  {
295  sockaddr_in6 *address = (sockaddr_in6*) &sas;
296  sas_len = sizeof(*address);
297 
298  memcpy(address, it->ai_addr, it->ai_addrlen);
299  address->sin6_family = it->ai_family;
300  address->sin6_port = htons((u_short) port);
301 
302  // TODO IPV6: does inet_ntop need other includes for Windows?
303  inet_ntop(AF_INET6, (void*)&(address->sin6_addr), namebuf, sizeof(namebuf));
304  found = true;
305  break;
306  }
307  }
308 
309  freeaddrinfo(addr);
310 
311  if (!found)
312  {
313  ROS_ERROR("Couldn't resolve an address for [%s]\n", host.c_str());
314  return false;
315  }
316 
317  ROSCPP_LOG_DEBUG("Resolved publisher host [%s] to [%s] for socket [%d]", host.c_str(), namebuf, sock_);
318  }
319 
320  int ret = ::connect(sock_, (sockaddr*) &sas, sas_len);
321  // windows might need some time to sleep (input from service robotics hack) add this if testing proves it is necessary.
322  // ROS_ASSERT((flags_ & SYNCHRONOUS) || ret != 0);
323  if (((flags_ & SYNCHRONOUS) && ret != 0) || // synchronous, connect() should return 0
324  (!(flags_ & SYNCHRONOUS) && ret != 0 && last_socket_error() != ROS_SOCKETS_ASYNCHRONOUS_CONNECT_RETURN))
325  // asynchronous, connect() may return 0 or -1. When return -1, WSAGetLastError()=WSAEWOULDBLOCK/errno=EINPROGRESS
326  {
327  ROSCPP_CONN_LOG_DEBUG("Connect to tcpros publisher [%s:%d] failed with error [%d, %s]", host.c_str(), port, ret, last_socket_error_string());
328  close();
329 
330  return false;
331  }
332 
333  // from daniel stonier:
334 #ifdef WIN32
335  // This is hackish, but windows fails at recv() if its slow to connect (e.g. happens with wireless)
336  // recv() needs to check if its connected or not when its asynchronous?
337  Sleep(100);
338 #endif
339 
340 
341  std::stringstream ss;
342  ss << host << ":" << port << " on socket " << sock_;
343  cached_remote_host_ = ss.str();
344 
345  if (!initializeSocket())
346  {
347  return false;
348  }
349 
350  if (flags_ & SYNCHRONOUS)
351  {
352  ROSCPP_CONN_LOG_DEBUG("connect() succeeded to [%s:%d] on socket [%d]", host.c_str(), port, sock_);
353  }
354  else
355  {
356  ROSCPP_CONN_LOG_DEBUG("Async connect() in progress to [%s:%d] on socket [%d]", host.c_str(), port, sock_);
357  }
358 
359  return true;
360 }
361 
362 bool TransportTCP::listen(int port, int backlog, const AcceptCallback& accept_cb)
363 {
364  is_server_ = true;
365  accept_cb_ = accept_cb;
366 
367  if (s_use_ipv6_)
368  {
369  sock_ = socket(AF_INET6, SOCK_STREAM, 0);
370  sockaddr_in6 *address = (sockaddr_in6 *)&server_address_;
371  address->sin6_family = AF_INET6;
372  address->sin6_addr = isOnlyLocalhostAllowed() ?
373  in6addr_loopback :
374  in6addr_any;
375  address->sin6_port = htons(port);
376  sa_len_ = sizeof(sockaddr_in6);
377  }
378  else
379  {
380  sock_ = socket(AF_INET, SOCK_STREAM, 0);
381  sockaddr_in *address = (sockaddr_in *)&server_address_;
382  address->sin_family = AF_INET;
383  address->sin_addr.s_addr = isOnlyLocalhostAllowed() ?
384  htonl(INADDR_LOOPBACK) :
385  INADDR_ANY;
386  address->sin_port = htons(port);
387  sa_len_ = sizeof(sockaddr_in);
388  }
389 
390  if (sock_ == ROS_INVALID_SOCKET)
391  {
392  ROS_ERROR("socket() failed with error [%s]", last_socket_error_string());
393  return false;
394  }
395 
396 
397  if (bind(sock_, (sockaddr *)&server_address_, sa_len_) < 0)
398  {
399  ROS_ERROR("bind() failed with error [%s]", last_socket_error_string());
400  return false;
401  }
402 
403  ::listen(sock_, backlog);
404  getsockname(sock_, (sockaddr *)&server_address_, &sa_len_);
405 
406  switch (server_address_.ss_family)
407  {
408  case AF_INET:
409  server_port_ = ntohs(((sockaddr_in *)&server_address_)->sin_port);
410  break;
411  case AF_INET6:
412  server_port_ = ntohs(((sockaddr_in6 *)&server_address_)->sin6_port);
413  break;
414  }
415 
416  if (!initializeSocket())
417  {
418  return false;
419  }
420 
421  if (!(flags_ & SYNCHRONOUS))
422  {
423  enableRead();
424  }
425 
426  return true;
427 }
428 
430 {
431  Callback disconnect_cb;
432 
433  if (!closed_)
434  {
435  {
436  boost::recursive_mutex::scoped_lock lock(close_mutex_);
437 
438  if (!closed_)
439  {
440  closed_ = true;
441 
443 
444  if (poll_set_)
445  {
447  }
448 
450  if ( close_socket(sock_) != 0 )
451  {
452  ROS_ERROR("Error closing socket [%d]: [%s]", sock_, last_socket_error_string());
453  } else
454  {
455  ROSCPP_LOG_DEBUG("TCP socket [%d] closed", sock_);
456  }
458 
459  disconnect_cb = disconnect_cb_;
460 
462  read_cb_ = Callback();
463  write_cb_ = Callback();
465  }
466  }
467  }
468 
469  if (disconnect_cb)
470  {
471  disconnect_cb(shared_from_this());
472  }
473 }
474 
475 int32_t TransportTCP::read(uint8_t* buffer, uint32_t size)
476 {
477  {
478  boost::recursive_mutex::scoped_lock lock(close_mutex_);
479 
480  if (closed_)
481  {
482  ROSCPP_LOG_DEBUG("Tried to read on a closed socket [%d]", sock_);
483  return -1;
484  }
485  }
486 
487  ROS_ASSERT(size > 0);
488 
489  // never read more than INT_MAX since this is the maximum we can report back with the current return type
490  uint32_t read_size = std::min(size, static_cast<uint32_t>(INT_MAX));
491  int num_bytes = ::recv(sock_, reinterpret_cast<char*>(buffer), read_size, 0);
492  if (num_bytes < 0)
493  {
494  if ( !last_socket_error_is_would_block() ) // !WSAWOULDBLOCK / !EAGAIN && !EWOULDBLOCK
495  {
496  ROSCPP_LOG_DEBUG("recv() on socket [%d] failed with error [%s]", sock_, last_socket_error_string());
497  close();
498  }
499  else
500  {
501  num_bytes = 0;
502  }
503  }
504  else if (num_bytes == 0)
505  {
506  ROSCPP_LOG_DEBUG("Socket [%d] received 0/%u bytes, closing", sock_, size);
507  close();
508  return -1;
509  }
510 
511  return num_bytes;
512 }
513 
514 int32_t TransportTCP::write(uint8_t* buffer, uint32_t size)
515 {
516  {
517  boost::recursive_mutex::scoped_lock lock(close_mutex_);
518 
519  if (closed_)
520  {
521  ROSCPP_LOG_DEBUG("Tried to write on a closed socket [%d]", sock_);
522  return -1;
523  }
524  }
525 
526  ROS_ASSERT(size > 0);
527 
528  // never write more than INT_MAX since this is the maximum we can report back with the current return type
529  uint32_t writesize = std::min(size, static_cast<uint32_t>(INT_MAX));
530  int num_bytes = ::send(sock_, reinterpret_cast<const char*>(buffer), writesize, 0);
531  if (num_bytes < 0)
532  {
534  {
535  ROSCPP_LOG_DEBUG("send() on socket [%d] failed with error [%s]", sock_, last_socket_error_string());
536  close();
537  }
538  else
539  {
540  num_bytes = 0;
541  }
542  }
543 
544  return num_bytes;
545 }
546 
548 {
550 
551  {
552  boost::recursive_mutex::scoped_lock lock(close_mutex_);
553 
554  if (closed_)
555  {
556  return;
557  }
558  }
559 
560  if (!expecting_read_)
561  {
562  poll_set_->addEvents(sock_, POLLIN);
563  expecting_read_ = true;
564  }
565 }
566 
568 {
570 
571  {
572  boost::recursive_mutex::scoped_lock lock(close_mutex_);
573 
574  if (closed_)
575  {
576  return;
577  }
578  }
579 
580  if (expecting_read_)
581  {
582  poll_set_->delEvents(sock_, POLLIN);
583  expecting_read_ = false;
584  }
585 }
586 
588 {
590 
591  {
592  boost::recursive_mutex::scoped_lock lock(close_mutex_);
593 
594  if (closed_)
595  {
596  return;
597  }
598  }
599 
600  if (!expecting_write_)
601  {
602  poll_set_->addEvents(sock_, POLLOUT);
603  expecting_write_ = true;
604  }
605 }
606 
608 {
610 
611  {
612  boost::recursive_mutex::scoped_lock lock(close_mutex_);
613 
614  if (closed_)
615  {
616  return;
617  }
618  }
619 
620  if (expecting_write_)
621  {
622  poll_set_->delEvents(sock_, POLLOUT);
623  expecting_write_ = false;
624  }
625 }
626 
628 {
630 
631  sockaddr client_address;
632  socklen_t len = sizeof(client_address);
633  int new_sock = ::accept(sock_, (sockaddr *)&client_address, &len);
634  if (new_sock >= 0)
635  {
636  ROSCPP_LOG_DEBUG("Accepted connection on socket [%d], new socket [%d]", sock_, new_sock);
637 
638  TransportTCPPtr transport(boost::make_shared<TransportTCP>(poll_set_, flags_));
639  if (!transport->setSocket(new_sock))
640  {
641  ROS_ERROR("Failed to set socket on transport for socket %d", new_sock);
642  }
643 
644  return transport;
645  }
646  else
647  {
648  ROS_ERROR("accept() on socket [%d] failed with error [%s]", sock_, last_socket_error_string());
649  }
650 
651  return TransportTCPPtr();
652 }
653 
655 {
656  {
657  boost::recursive_mutex::scoped_lock lock(close_mutex_);
658  if (closed_)
659  {
660  return;
661  }
662 
663  // Handle read events before err/hup/nval, since there may be data left on the wire
664  if ((events & POLLIN) && expecting_read_)
665  {
666  if (is_server_)
667  {
668  // Should not block here, because poll() said that it's ready
669  // for reading
670  TransportTCPPtr transport = accept();
671  if (transport)
672  {
674  accept_cb_(transport);
675  }
676  }
677  else
678  {
679  if (read_cb_)
680  {
681  read_cb_(shared_from_this());
682  }
683  }
684  }
685 
686  if ((events & POLLOUT) && expecting_write_)
687  {
688  if (write_cb_)
689  {
690  write_cb_(shared_from_this());
691  }
692  }
693  }
694 
695  if((events & POLLERR) ||
696  (events & POLLHUP) ||
697 #if defined(POLLRDHUP) // POLLRDHUP is not part of POSIX!
698  (events & POLLRDHUP) ||
699 #endif
700  (events & POLLNVAL))
701  {
702  uint32_t error = -1;
703  socklen_t len = sizeof(error);
704  if (getsockopt(sock_, SOL_SOCKET, SO_ERROR, reinterpret_cast<char*>(&error), &len) < 0)
705  {
706  ROSCPP_LOG_DEBUG("getsockopt failed on socket [%d]", sock_);
707  }
708  #ifdef _MSC_VER
709  char err[60];
710  strerror_s(err,60,error);
711  ROSCPP_LOG_DEBUG("Socket %d closed with (ERR|HUP|NVAL) events %d: %s", sock_, events, err);
712  #else
713  ROSCPP_LOG_DEBUG("Socket %d closed with (ERR|HUP|NVAL) events %d: %s", sock_, events, strerror(error));
714  #endif
715 
716  close();
717  }
718 }
719 
721 {
722  std::stringstream str;
723  str << "TCPROS connection on port " << local_port_ << " to [" << cached_remote_host_ << "]";
724  return str.str();
725 }
726 
728 {
730 
731  sockaddr_storage sas;
732  socklen_t sas_len = sizeof(sas);
733  getpeername(sock_, (sockaddr *)&sas, &sas_len);
734 
735  sockaddr_in *sin = (sockaddr_in *)&sas;
736  sockaddr_in6 *sin6 = (sockaddr_in6 *)&sas;
737 
738  char namebuf[128] = {};
739  int port;
740 
741  switch (sas.ss_family)
742  {
743  case AF_INET:
744  port = ntohs(sin->sin_port);
745  strncpy(namebuf, inet_ntoa(sin->sin_addr), sizeof(namebuf)-1);
746  break;
747  case AF_INET6:
748  port = ntohs(sin6->sin6_port);
749  inet_ntop(AF_INET6, (void*)&(sin6->sin6_addr), namebuf, sizeof(namebuf));
750  break;
751  default:
752  namebuf[0] = 0;
753  port = 0;
754  break;
755  }
756 
757  std::string ip = namebuf;
758  std::stringstream uri;
759  uri << ip << ":" << port;
760 
761  return uri.str();
762 }
763 
764 } // namespace ros
boost::function< void(const TransportTCPPtr &)> AcceptCallback
Definition: transport_tcp.h:84
virtual void parseHeader(const Header &header)
Provides an opportunity for transport-specific options to come in through the header.
bool addEvents(int sock, int events)
Add events to be polled on a socket.
Definition: poll_set.cpp:126
ROSCPP_DECL int set_non_blocking(socket_fd_t &socket)
Definition: io.cpp:371
socket_fd_t sock_
virtual int32_t write(uint8_t *buffer, uint32_t size)
Write a number of bytes from the supplied buffer. Not guaranteed to actually write that number of byt...
bool isOnlyLocalhostAllowed() const
returns true if this transport is only allowed to talk to localhost
Definition: transport.h:148
virtual void disableWrite()
Disable writing on this transport. Allows derived classes to, for example, disable write polling for ...
std::string connected_host_
Callback read_cb_
Definition: transport.h:137
Manages a set of sockets being polled through the poll() function call.
Definition: poll_set.h:57
virtual void enableRead()
Enable reading on this transport. Allows derived classes to, for example, enable read polling for asy...
std::string cached_remote_host_
virtual void enableWrite()
Enable writing on this transport. Allows derived classes to, for example, enable write polling for as...
ROSCPP_DECL const char * last_socket_error_string()
Definition: io.cpp:74
void setNoDelay(bool nodelay)
#define ROSCPP_LOG_DEBUG(...)
Definition: file_log.h:35
AcceptCallback accept_cb_
bool setSocket(int sock)
Set the socket to be used by this transport.
virtual void disableRead()
Disable reading on this transport. Allows derived classes to, for example, disable read polling for a...
static bool s_use_ipv6_
Definition: transport_tcp.h:60
ROSCPP_DECL bool last_socket_error_is_would_block()
Definition: io.cpp:86
#define ROS_ASSERT_MSG(cond,...)
virtual void close()
Close this transport. Once this call has returned, writing on this transport should always return an ...
bool delEvents(int sock, int events)
Delete events to be polled on a socket.
Definition: poll_set.cpp:148
virtual std::string getTransportInfo()
Returns a string description of both the type of transport and who the transport is connected to...
bool initializeSocket()
Initializes the assigned socket – sets it to non-blocking and enables reading.
bool delSocket(int sock)
Delete a socket.
Definition: poll_set.cpp:94
boost::recursive_mutex close_mutex_
#define ROS_SOCKETS_ASYNCHRONOUS_CONNECT_RETURN
Definition: io.h:108
bool getValue(const std::string &key, std::string &value) const
virtual int32_t read(uint8_t *buffer, uint32_t size)
Read a number of bytes into the supplied buffer. Not guaranteed to actually read that number of bytes...
sockaddr_storage server_address_
TransportTCPPtr accept()
Accept a connection on a server socket. Blocks until a connection is available.
sockaddr_storage local_address_
bool listen(int port, int backlog, const AcceptCallback &accept_cb)
Start a server socket and listen on a port.
boost::function< void(const TransportPtr &)> Callback
Definition: transport.h:106
Callback disconnect_cb_
Definition: transport.h:136
static bool s_use_keepalive_
Definition: transport_tcp.h:59
void socketUpdate(int events)
boost::shared_ptr< TransportTCP > TransportTCPPtr
Definition: forwards.h:58
#define ROS_INVALID_SOCKET
Definition: io.h:107
Callback write_cb_
Definition: transport.h:138
#define ROSCPP_CONN_LOG_DEBUG(...)
Definition: file_log.h:36
TransportTCP(PollSet *poll_set, int flags=0)
ROSCPP_DECL void shutdown()
Disconnects everything and unregisters from the master. It is generally not necessary to call this fu...
Definition: init.cpp:578
bool isHostAllowed(const std::string &host) const
returns true if the transport is allowed to connect to the host passed to it.
Definition: transport.cpp:107
#define ROS_SOCKETS_SHUT_RDWR
Definition: io.h:106
std::string getClientURI()
Returns the URI of the remote host.
bool connect(const std::string &host, int port)
Connect to a remote host.
ROSCPP_DECL int last_socket_error()
Definition: io.cpp:67
void setKeepAlive(bool use, uint32_t idle, uint32_t interval, uint32_t count)
#define ROS_ASSERT(cond)
ROSCPP_DECL int close_socket(socket_fd_t &socket)
Close the socket.
Definition: io.cpp:392
bool addSocket(int sock, const SocketUpdateFunc &update_func, const TransportPtr &transport=TransportPtr())
Add a socket.
Definition: poll_set.cpp:66
#define ROS_ERROR(...)
#define ROS_DEBUG(...)
virtual ~TransportTCP()


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas
autogenerated on Mon Nov 2 2020 03:52:26