transport_tcp.cpp
Go to the documentation of this file.
1 /*
2  * Software License Agreement (BSD License)
3  *
4  * Copyright (c) 2008, Willow Garage, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above
14  * copyright notice, this list of conditions and the following
15  * disclaimer in the documentation and/or other materials provided
16  * with the distribution.
17  * * Neither the name of Willow Garage, Inc. nor the names of its
18  * contributors may be used to endorse or promote products derived
19  * from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "ros/io.h"
37 #include "ros/poll_set.h"
38 #include "ros/header.h"
39 #include "ros/file_log.h"
40 #include <ros/assert.h>
41 #include <sstream>
42 #include <boost/bind.hpp>
43 #include <fcntl.h>
44 #include <errno.h>
45 namespace ros
46 {
47 
49 bool TransportTCP::s_use_ipv6_ = false;
50 
51 TransportTCP::TransportTCP(PollSet* poll_set, int flags)
52 : sock_(ROS_INVALID_SOCKET)
53 , closed_(false)
54 , expecting_read_(false)
55 , expecting_write_(false)
56 , is_server_(false)
57 , server_port_(-1)
58 , local_port_(-1)
59 , poll_set_(poll_set)
60 , flags_(flags)
61 {
62 
63 }
64 
66 {
67  ROS_ASSERT_MSG(sock_ == -1, "TransportTCP socket [%d] was never closed", sock_);
68 }
69 
71 {
72  sock_ = sock;
73  return initializeSocket();
74 }
75 
77 {
78  if (!(flags_ & SYNCHRONOUS))
79  {
80  int result = set_non_blocking(sock_);
81  if ( result != 0 ) {
82  ROS_ERROR("setting socket [%d] as non_blocking failed with error [%d]", sock_, result);
83  close();
84  return false;
85  }
86  }
87 
88  return true;
89 }
90 
92 {
94 
95  if (!setNonBlocking())
96  {
97  return false;
98  }
99 
100  setKeepAlive(s_use_keepalive_, 60, 10, 9);
101 
102  // connect() will set cached_remote_host_ because it already has the host/port available
103  if (cached_remote_host_.empty())
104  {
105  if (is_server_)
106  {
107  cached_remote_host_ = "TCPServer Socket";
108  }
109  else
110  {
111  std::stringstream ss;
112  ss << getClientURI() << " on socket " << sock_;
113  cached_remote_host_ = ss.str();
114  }
115  }
116 
117  if (local_port_ < 0)
118  {
119  la_len_ = s_use_ipv6_ ? sizeof(sockaddr_in6) : sizeof(sockaddr_in);
120  getsockname(sock_, (sockaddr *)&local_address_, &la_len_);
121  switch (local_address_.ss_family)
122  {
123  case AF_INET:
124  local_port_ = ntohs(((sockaddr_in *)&local_address_)->sin_port);
125  break;
126  case AF_INET6:
127  local_port_ = ntohs(((sockaddr_in6 *)&local_address_)->sin6_port);
128  break;
129  }
130  }
131 
132 #ifdef ROSCPP_USE_TCP_NODELAY
133  setNoDelay(true);
134 #endif
135 
137  if (poll_set_)
138  {
139  ROS_DEBUG("Adding tcp socket [%d] to pollset", sock_);
140  poll_set_->addSocket(sock_, boost::bind(&TransportTCP::socketUpdate, this, _1), shared_from_this());
141  }
142 
143  if (!(flags_ & SYNCHRONOUS))
144  {
145  //enableRead();
146  }
147 
148  return true;
149 }
150 
152 {
153  std::string nodelay;
154  if (header.getValue("tcp_nodelay", nodelay) && nodelay == "1")
155  {
156  ROSCPP_LOG_DEBUG("Setting nodelay on socket [%d]", sock_);
157  setNoDelay(true);
158  }
159 }
160 
161 void TransportTCP::setNoDelay(bool nodelay)
162 {
163  int flag = nodelay ? 1 : 0;
164  int result = setsockopt(sock_, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
165  if (result < 0)
166  {
167  ROS_ERROR("setsockopt failed to set TCP_NODELAY on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
168  }
169 }
170 
171 void TransportTCP::setKeepAlive(bool use, uint32_t idle, uint32_t interval, uint32_t count)
172 {
173  if (use)
174  {
175  int val = 1;
176  if (setsockopt(sock_, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast<const char*>(&val), sizeof(val)) != 0)
177  {
178  ROS_DEBUG("setsockopt failed to set SO_KEEPALIVE on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
179  }
180 
181 /* cygwin SOL_TCP does not seem to support TCP_KEEPIDLE, TCP_KEEPINTVL, TCP_KEEPCNT */
182 #if defined(SOL_TCP) && defined(TCP_KEEPIDLE)
183  val = idle;
184  if (setsockopt(sock_, SOL_TCP, TCP_KEEPIDLE, &val, sizeof(val)) != 0)
185  {
186  ROS_DEBUG("setsockopt failed to set TCP_KEEPIDLE on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
187  }
188 #else
189  (void)idle;
190 #endif
191 
192 #if defined(SOL_TCP) && defined(TCP_KEEPINTVL)
193  val = interval;
194  if (setsockopt(sock_, SOL_TCP, TCP_KEEPINTVL, &val, sizeof(val)) != 0)
195  {
196  ROS_DEBUG("setsockopt failed to set TCP_KEEPINTVL on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
197  }
198 #else
199  (void)interval;
200 #endif
201 
202 #if defined(SOL_TCP) && defined(TCP_KEEPCNT)
203  val = count;
204  if (setsockopt(sock_, SOL_TCP, TCP_KEEPCNT, &val, sizeof(val)) != 0)
205  {
206  ROS_DEBUG("setsockopt failed to set TCP_KEEPCNT on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
207  }
208 #else
209  (void)count;
210 #endif
211  }
212  else
213  {
214  int val = 0;
215  if (setsockopt(sock_, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast<const char*>(&val), sizeof(val)) != 0)
216  {
217  ROS_DEBUG("setsockopt failed to set SO_KEEPALIVE on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
218  }
219  }
220 }
221 
222 bool TransportTCP::connect(const std::string& host, int port)
223 {
224  if (!isHostAllowed(host))
225  return false; // adios amigo
226 
227  sock_ = socket(s_use_ipv6_ ? AF_INET6 : AF_INET, SOCK_STREAM, 0);
228  connected_host_ = host;
229  connected_port_ = port;
230 
231  if (sock_ == ROS_INVALID_SOCKET)
232  {
233  ROS_ERROR("socket() failed with error [%s]", last_socket_error_string());
234  return false;
235  }
236 
237  setNonBlocking();
238 
239  sockaddr_storage sas;
240  socklen_t sas_len;
241 
242  in_addr ina;
243  in6_addr in6a;
244  if (inet_pton(AF_INET, host.c_str(), &ina) == 1)
245  {
246  sockaddr_in *address = (sockaddr_in*) &sas;
247  sas_len = sizeof(sockaddr_in);
248 
249  la_len_ = sizeof(sockaddr_in);
250  address->sin_family = AF_INET;
251  address->sin_port = htons(port);
252  address->sin_addr.s_addr = ina.s_addr;
253  }
254  else if (inet_pton(AF_INET6, host.c_str(), &in6a) == 1)
255  {
256  sockaddr_in6 *address = (sockaddr_in6*) &sas;
257  sas_len = sizeof(sockaddr_in6);
258  la_len_ = sizeof(sockaddr_in6);
259  address->sin6_family = AF_INET6;
260  address->sin6_port = htons(port);
261  memcpy(address->sin6_addr.s6_addr, in6a.s6_addr, sizeof(in6a.s6_addr));
262  }
263  else
264  {
265  struct addrinfo* addr;
266  struct addrinfo hints;
267  memset(&hints, 0, sizeof(hints));
268  hints.ai_family = AF_UNSPEC;
269 
270  if (getaddrinfo(host.c_str(), NULL, &hints, &addr) != 0)
271  {
272  close();
273  ROS_ERROR("couldn't resolve publisher host [%s]", host.c_str());
274  return false;
275  }
276 
277  bool found = false;
278  struct addrinfo* it = addr;
279  char namebuf[128];
280  for (; it; it = it->ai_next)
281  {
282  if (!s_use_ipv6_ && it->ai_family == AF_INET)
283  {
284  sockaddr_in *address = (sockaddr_in*) &sas;
285  sas_len = sizeof(*address);
286 
287  memcpy(address, it->ai_addr, it->ai_addrlen);
288  address->sin_family = it->ai_family;
289  address->sin_port = htons(port);
290 
291  strcpy(namebuf, inet_ntoa(address->sin_addr));
292  found = true;
293  break;
294  }
295  if (s_use_ipv6_ && it->ai_family == AF_INET6)
296  {
297  sockaddr_in6 *address = (sockaddr_in6*) &sas;
298  sas_len = sizeof(*address);
299 
300  memcpy(address, it->ai_addr, it->ai_addrlen);
301  address->sin6_family = it->ai_family;
302  address->sin6_port = htons((u_short) port);
303 
304  // TODO IPV6: does inet_ntop need other includes for Windows?
305  inet_ntop(AF_INET6, (void*)&(address->sin6_addr), namebuf, sizeof(namebuf));
306  found = true;
307  break;
308  }
309  }
310 
311  freeaddrinfo(addr);
312 
313  if (!found)
314  {
315  ROS_ERROR("Couldn't resolve an address for [%s]\n", host.c_str());
316  return false;
317  }
318 
319  ROSCPP_LOG_DEBUG("Resolved publisher host [%s] to [%s] for socket [%d]", host.c_str(), namebuf, sock_);
320  }
321 
322  int ret = ::connect(sock_, (sockaddr*) &sas, sas_len);
323  // windows might need some time to sleep (input from service robotics hack) add this if testing proves it is necessary.
324  // ROS_ASSERT((flags_ & SYNCHRONOUS) || ret != 0);
325  if (((flags_ & SYNCHRONOUS) && ret != 0) || // synchronous, connect() should return 0
326  (!(flags_ & SYNCHRONOUS) && ret != 0 && last_socket_error() != ROS_SOCKETS_ASYNCHRONOUS_CONNECT_RETURN))
327  // asynchronous, connect() may return 0 or -1. When return -1, WSAGetLastError()=WSAEWOULDBLOCK/errno=EINPROGRESS
328  {
329  ROSCPP_CONN_LOG_DEBUG("Connect to tcpros publisher [%s:%d] failed with error [%d, %s]", host.c_str(), port, ret, last_socket_error_string());
330  close();
331 
332  return false;
333  }
334 
335  // from daniel stonier:
336 #ifdef WIN32
337  // This is hackish, but windows fails at recv() if its slow to connect (e.g. happens with wireless)
338  // recv() needs to check if its connected or not when its asynchronous?
339  Sleep(100);
340 #endif
341 
342 
343  std::stringstream ss;
344  ss << host << ":" << port << " on socket " << sock_;
345  cached_remote_host_ = ss.str();
346 
347  if (!initializeSocket())
348  {
349  return false;
350  }
351 
352  if (flags_ & SYNCHRONOUS)
353  {
354  ROSCPP_CONN_LOG_DEBUG("connect() succeeded to [%s:%d] on socket [%d]", host.c_str(), port, sock_);
355  }
356  else
357  {
358  ROSCPP_CONN_LOG_DEBUG("Async connect() in progress to [%s:%d] on socket [%d]", host.c_str(), port, sock_);
359  }
360 
361  return true;
362 }
363 
364 bool TransportTCP::listen(int port, int backlog, const AcceptCallback& accept_cb)
365 {
366  is_server_ = true;
367  accept_cb_ = accept_cb;
368 
369  if (s_use_ipv6_)
370  {
371  sock_ = socket(AF_INET6, SOCK_STREAM, 0);
372  sockaddr_in6 *address = (sockaddr_in6 *)&server_address_;
373  address->sin6_family = AF_INET6;
374  address->sin6_addr = isOnlyLocalhostAllowed() ?
375  in6addr_loopback :
376  in6addr_any;
377  address->sin6_port = htons(port);
378  sa_len_ = sizeof(sockaddr_in6);
379  }
380  else
381  {
382  sock_ = socket(AF_INET, SOCK_STREAM, 0);
383  sockaddr_in *address = (sockaddr_in *)&server_address_;
384  address->sin_family = AF_INET;
385  address->sin_addr.s_addr = isOnlyLocalhostAllowed() ?
386  htonl(INADDR_LOOPBACK) :
387  INADDR_ANY;
388  address->sin_port = htons(port);
389  sa_len_ = sizeof(sockaddr_in);
390  }
391 
392  if (sock_ <= 0)
393  {
394  ROS_ERROR("socket() failed with error [%s]", last_socket_error_string());
395  return false;
396  }
397 
398 
399  if (bind(sock_, (sockaddr *)&server_address_, sa_len_) < 0)
400  {
401  ROS_ERROR("bind() failed with error [%s]", last_socket_error_string());
402  return false;
403  }
404 
405  ::listen(sock_, backlog);
406  getsockname(sock_, (sockaddr *)&server_address_, &sa_len_);
407 
408  switch (server_address_.ss_family)
409  {
410  case AF_INET:
411  server_port_ = ntohs(((sockaddr_in *)&server_address_)->sin_port);
412  break;
413  case AF_INET6:
414  server_port_ = ntohs(((sockaddr_in6 *)&server_address_)->sin6_port);
415  break;
416  }
417 
418  if (!initializeSocket())
419  {
420  return false;
421  }
422 
423  if (!(flags_ & SYNCHRONOUS))
424  {
425  enableRead();
426  }
427 
428  return true;
429 }
430 
432 {
433  Callback disconnect_cb;
434 
435  if (!closed_)
436  {
437  {
438  boost::recursive_mutex::scoped_lock lock(close_mutex_);
439 
440  if (!closed_)
441  {
442  closed_ = true;
443 
445 
446  if (poll_set_)
447  {
449  }
450 
452  if ( close_socket(sock_) != 0 )
453  {
454  ROS_ERROR("Error closing socket [%d]: [%s]", sock_, last_socket_error_string());
455  } else
456  {
457  ROSCPP_LOG_DEBUG("TCP socket [%d] closed", sock_);
458  }
460 
461  disconnect_cb = disconnect_cb_;
462 
464  read_cb_ = Callback();
465  write_cb_ = Callback();
467  }
468  }
469  }
470 
471  if (disconnect_cb)
472  {
473  disconnect_cb(shared_from_this());
474  }
475 }
476 
477 int32_t TransportTCP::read(uint8_t* buffer, uint32_t size)
478 {
479  {
480  boost::recursive_mutex::scoped_lock lock(close_mutex_);
481 
482  if (closed_)
483  {
484  ROSCPP_LOG_DEBUG("Tried to read on a closed socket [%d]", sock_);
485  return -1;
486  }
487  }
488 
489  ROS_ASSERT(size > 0);
490 
491  // never read more than INT_MAX since this is the maximum we can report back with the current return type
492  uint32_t read_size = std::min(size, static_cast<uint32_t>(INT_MAX));
493  int num_bytes = ::recv(sock_, reinterpret_cast<char*>(buffer), read_size, 0);
494  if (num_bytes < 0)
495  {
496  if ( !last_socket_error_is_would_block() ) // !WSAWOULDBLOCK / !EAGAIN && !EWOULDBLOCK
497  {
498  ROSCPP_LOG_DEBUG("recv() on socket [%d] failed with error [%s]", sock_, last_socket_error_string());
499  close();
500  }
501  else
502  {
503  num_bytes = 0;
504  }
505  }
506  else if (num_bytes == 0)
507  {
508  ROSCPP_LOG_DEBUG("Socket [%d] received 0/%u bytes, closing", sock_, size);
509  close();
510  return -1;
511  }
512 
513  return num_bytes;
514 }
515 
516 int32_t TransportTCP::write(uint8_t* buffer, uint32_t size)
517 {
518  {
519  boost::recursive_mutex::scoped_lock lock(close_mutex_);
520 
521  if (closed_)
522  {
523  ROSCPP_LOG_DEBUG("Tried to write on a closed socket [%d]", sock_);
524  return -1;
525  }
526  }
527 
528  ROS_ASSERT(size > 0);
529 
530  // never write more than INT_MAX since this is the maximum we can report back with the current return type
531  uint32_t writesize = std::min(size, static_cast<uint32_t>(INT_MAX));
532  int num_bytes = ::send(sock_, reinterpret_cast<const char*>(buffer), writesize, 0);
533  if (num_bytes < 0)
534  {
536  {
537  ROSCPP_LOG_DEBUG("send() on socket [%d] failed with error [%s]", sock_, last_socket_error_string());
538  close();
539  }
540  else
541  {
542  num_bytes = 0;
543  }
544  }
545 
546  return num_bytes;
547 }
548 
550 {
552 
553  {
554  boost::recursive_mutex::scoped_lock lock(close_mutex_);
555 
556  if (closed_)
557  {
558  return;
559  }
560  }
561 
562  if (!expecting_read_)
563  {
564  poll_set_->addEvents(sock_, POLLIN);
565  expecting_read_ = true;
566  }
567 }
568 
570 {
572 
573  {
574  boost::recursive_mutex::scoped_lock lock(close_mutex_);
575 
576  if (closed_)
577  {
578  return;
579  }
580  }
581 
582  if (expecting_read_)
583  {
584  poll_set_->delEvents(sock_, POLLIN);
585  expecting_read_ = false;
586  }
587 }
588 
590 {
592 
593  {
594  boost::recursive_mutex::scoped_lock lock(close_mutex_);
595 
596  if (closed_)
597  {
598  return;
599  }
600  }
601 
602  if (!expecting_write_)
603  {
604  poll_set_->addEvents(sock_, POLLOUT);
605  expecting_write_ = true;
606  }
607 }
608 
610 {
612 
613  {
614  boost::recursive_mutex::scoped_lock lock(close_mutex_);
615 
616  if (closed_)
617  {
618  return;
619  }
620  }
621 
622  if (expecting_write_)
623  {
624  poll_set_->delEvents(sock_, POLLOUT);
625  expecting_write_ = false;
626  }
627 }
628 
630 {
632 
633  sockaddr client_address;
634  socklen_t len = sizeof(client_address);
635  int new_sock = ::accept(sock_, (sockaddr *)&client_address, &len);
636  if (new_sock >= 0)
637  {
638  ROSCPP_LOG_DEBUG("Accepted connection on socket [%d], new socket [%d]", sock_, new_sock);
639 
640  TransportTCPPtr transport(boost::make_shared<TransportTCP>(poll_set_, flags_));
641  if (!transport->setSocket(new_sock))
642  {
643  ROS_ERROR("Failed to set socket on transport for socket %d", new_sock);
644  }
645 
646  return transport;
647  }
648  else
649  {
650  ROS_ERROR("accept() on socket [%d] failed with error [%s]", sock_, last_socket_error_string());
651  }
652 
653  return TransportTCPPtr();
654 }
655 
657 {
658  {
659  boost::recursive_mutex::scoped_lock lock(close_mutex_);
660  if (closed_)
661  {
662  return;
663  }
664 
665  // Handle read events before err/hup/nval, since there may be data left on the wire
666  if ((events & POLLIN) && expecting_read_)
667  {
668  if (is_server_)
669  {
670  // Should not block here, because poll() said that it's ready
671  // for reading
672  TransportTCPPtr transport = accept();
673  if (transport)
674  {
676  accept_cb_(transport);
677  }
678  }
679  else
680  {
681  if (read_cb_)
682  {
683  read_cb_(shared_from_this());
684  }
685  }
686  }
687 
688  if ((events & POLLOUT) && expecting_write_)
689  {
690  if (write_cb_)
691  {
692  write_cb_(shared_from_this());
693  }
694  }
695  }
696 
697  if((events & POLLERR) ||
698  (events & POLLHUP) ||
699  (events & POLLNVAL))
700  {
701  uint32_t error = -1;
702  socklen_t len = sizeof(error);
703  if (getsockopt(sock_, SOL_SOCKET, SO_ERROR, reinterpret_cast<char*>(&error), &len) < 0)
704  {
705  ROSCPP_LOG_DEBUG("getsockopt failed on socket [%d]", sock_);
706  }
707  #ifdef _MSC_VER
708  char err[60];
709  strerror_s(err,60,error);
710  ROSCPP_LOG_DEBUG("Socket %d closed with (ERR|HUP|NVAL) events %d: %s", sock_, events, err);
711  #else
712  ROSCPP_LOG_DEBUG("Socket %d closed with (ERR|HUP|NVAL) events %d: %s", sock_, events, strerror(error));
713  #endif
714 
715  close();
716  }
717 }
718 
720 {
721  std::stringstream str;
722  str << "TCPROS connection on port " << local_port_ << " to [" << cached_remote_host_ << "]";
723  return str.str();
724 }
725 
727 {
729 
730  sockaddr_storage sas;
731  socklen_t sas_len = sizeof(sas);
732  getpeername(sock_, (sockaddr *)&sas, &sas_len);
733 
734  sockaddr_in *sin = (sockaddr_in *)&sas;
735  sockaddr_in6 *sin6 = (sockaddr_in6 *)&sas;
736 
737  char namebuf[128];
738  int port;
739 
740  switch (sas.ss_family)
741  {
742  case AF_INET:
743  port = ntohs(sin->sin_port);
744  strcpy(namebuf, inet_ntoa(sin->sin_addr));
745  break;
746  case AF_INET6:
747  port = ntohs(sin6->sin6_port);
748  inet_ntop(AF_INET6, (void*)&(sin6->sin6_addr), namebuf, sizeof(namebuf));
749  break;
750  default:
751  namebuf[0] = 0;
752  port = 0;
753  break;
754  }
755 
756  std::string ip = namebuf;
757  std::stringstream uri;
758  uri << ip << ":" << port;
759 
760  return uri.str();
761 }
762 
763 } // namespace ros
boost::function< void(const TransportTCPPtr &)> AcceptCallback
Definition: transport_tcp.h:84
virtual void parseHeader(const Header &header)
Provides an opportunity for transport-specific options to come in through the header.
bool addEvents(int sock, int events)
Add events to be polled on a socket.
Definition: poll_set.cpp:126
ROSCPP_DECL int set_non_blocking(socket_fd_t &socket)
Definition: io.cpp:371
socket_fd_t sock_
virtual int32_t write(uint8_t *buffer, uint32_t size)
Write a number of bytes from the supplied buffer. Not guaranteed to actually write that number of byt...
bool isOnlyLocalhostAllowed() const
returns true if this transport is only allowed to talk to localhost
Definition: transport.h:147
virtual void disableWrite()
Disable writing on this transport. Allows derived classes to, for example, disable write polling for ...
std::string connected_host_
Callback read_cb_
Definition: transport.h:136
Manages a set of sockets being polled through the poll() function call.
Definition: poll_set.h:57
virtual void enableRead()
Enable reading on this transport. Allows derived classes to, for example, enable read polling for asy...
std::string cached_remote_host_
virtual void enableWrite()
Enable writing on this transport. Allows derived classes to, for example, enable write polling for as...
ROSCPP_DECL const char * last_socket_error_string()
Definition: io.cpp:74
void setNoDelay(bool nodelay)
#define ROSCPP_LOG_DEBUG(...)
Definition: file_log.h:35
AcceptCallback accept_cb_
bool setSocket(int sock)
Set the socket to be used by this transport.
virtual void disableRead()
Disable reading on this transport. Allows derived classes to, for example, disable read polling for a...
static bool s_use_ipv6_
Definition: transport_tcp.h:60
ROSCPP_DECL bool last_socket_error_is_would_block()
Definition: io.cpp:86
#define ROS_ASSERT_MSG(cond,...)
virtual void close()
Close this transport. Once this call has returned, writing on this transport should always return an ...
bool delEvents(int sock, int events)
Delete events to be polled on a socket.
Definition: poll_set.cpp:148
virtual std::string getTransportInfo()
Returns a string description of both the type of transport and who the transport is connected to...
bool initializeSocket()
Initializes the assigned socket – sets it to non-blocking and enables reading.
bool delSocket(int sock)
Delete a socket.
Definition: poll_set.cpp:94
boost::recursive_mutex close_mutex_
#define ROS_SOCKETS_ASYNCHRONOUS_CONNECT_RETURN
Definition: io.h:108
bool getValue(const std::string &key, std::string &value) const
virtual int32_t read(uint8_t *buffer, uint32_t size)
Read a number of bytes into the supplied buffer. Not guaranteed to actually read that number of bytes...
sockaddr_storage server_address_
TransportTCPPtr accept()
Accept a connection on a server socket. Blocks until a connection is available.
sockaddr_storage local_address_
bool listen(int port, int backlog, const AcceptCallback &accept_cb)
Start a server socket and listen on a port.
boost::function< void(const TransportPtr &)> Callback
Definition: transport.h:105
Callback disconnect_cb_
Definition: transport.h:135
static bool s_use_keepalive_
Definition: transport_tcp.h:59
void socketUpdate(int events)
boost::shared_ptr< TransportTCP > TransportTCPPtr
Definition: forwards.h:58
#define ROS_INVALID_SOCKET
Definition: io.h:107
Callback write_cb_
Definition: transport.h:137
#define ROSCPP_CONN_LOG_DEBUG(...)
Definition: file_log.h:36
TransportTCP(PollSet *poll_set, int flags=0)
ROSCPP_DECL void shutdown()
Disconnects everything and unregisters from the master. It is generally not necessary to call this fu...
Definition: init.cpp:578
bool isHostAllowed(const std::string &host) const
returns true if the transport is allowed to connect to the host passed to it.
Definition: transport.cpp:107
#define ROS_SOCKETS_SHUT_RDWR
Definition: io.h:106
std::string getClientURI()
Returns the URI of the remote host.
bool connect(const std::string &host, int port)
Connect to a remote host.
ROSCPP_DECL int last_socket_error()
Definition: io.cpp:67
void setKeepAlive(bool use, uint32_t idle, uint32_t interval, uint32_t count)
#define ROS_ASSERT(cond)
ROSCPP_DECL int close_socket(socket_fd_t &socket)
Close the socket.
Definition: io.cpp:392
bool addSocket(int sock, const SocketUpdateFunc &update_func, const TransportPtr &transport=TransportPtr())
Add a socket.
Definition: poll_set.cpp:66
#define ROS_ERROR(...)
#define ROS_DEBUG(...)
virtual ~TransportTCP()


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Sun Feb 3 2019 03:29:54