transport_tcp.cpp
Go to the documentation of this file.
1 /*
2  * Software License Agreement (BSD License)
3  *
4  * Copyright (c) 2008, Willow Garage, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above
14  * copyright notice, this list of conditions and the following
15  * disclaimer in the documentation and/or other materials provided
16  * with the distribution.
17  * * Neither the name of Willow Garage, Inc. nor the names of its
18  * contributors may be used to endorse or promote products derived
19  * from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "ros/io.h"
37 #include "ros/poll_set.h"
38 #include "ros/header.h"
39 #include "ros/file_log.h"
40 #include <ros/assert.h>
41 #include <sstream>
42 #include <boost/bind.hpp>
43 #include <fcntl.h>
44 #include <errno.h>
45 #ifndef _WIN32
46  #include <sys/socket.h> // explicit include required for FreeBSD
47 #endif
48 namespace ros
49 {
50 
52 bool TransportTCP::s_use_ipv6_ = false;
53 
54 TransportTCP::TransportTCP(PollSet* poll_set, int flags)
55 : sock_(ROS_INVALID_SOCKET)
56 , async_connected_(false)
57 , closed_(false)
58 , expecting_read_(false)
59 , expecting_write_(false)
60 , is_server_(false)
61 , server_port_(-1)
62 , local_port_(-1)
63 , poll_set_(poll_set)
64 , flags_(flags)
65 {
66 
67 }
68 
70 {
71  ROS_ASSERT_MSG(sock_ == -1, "TransportTCP socket [%d] was never closed", sock_);
72 }
73 
75 {
76  sock_ = sock;
77  return initializeSocket();
78 }
79 
81 {
82  if (!(flags_ & SYNCHRONOUS))
83  {
84  int result = set_non_blocking(sock_);
85  if ( result != 0 ) {
86  ROS_ERROR("setting socket [%d] as non_blocking failed with error [%d]", sock_, result);
87  close();
88  return false;
89  }
90  }
91 
92  return true;
93 }
94 
96 {
98 
99  if (!setNonBlocking())
100  {
101  return false;
102  }
103 
104  setKeepAlive(s_use_keepalive_, 60, 10, 9);
105 
106  // connect() will set cached_remote_host_ because it already has the host/port available
107  if (cached_remote_host_.empty())
108  {
109  if (is_server_)
110  {
111  cached_remote_host_ = "TCPServer Socket";
112  }
113  else
114  {
115  std::stringstream ss;
116  ss << getClientURI() << " on socket " << sock_;
117  cached_remote_host_ = ss.str();
118  }
119  }
120 
121  if (local_port_ < 0)
122  {
123  la_len_ = s_use_ipv6_ ? sizeof(sockaddr_in6) : sizeof(sockaddr_in);
124  getsockname(sock_, (sockaddr *)&local_address_, &la_len_);
125  switch (local_address_.ss_family)
126  {
127  case AF_INET:
128  local_port_ = ntohs(((sockaddr_in *)&local_address_)->sin_port);
129  break;
130  case AF_INET6:
131  local_port_ = ntohs(((sockaddr_in6 *)&local_address_)->sin6_port);
132  break;
133  }
134  }
135 
136 #ifdef ROSCPP_USE_TCP_NODELAY
137  setNoDelay(true);
138 #endif
139 
141  if (poll_set_)
142  {
143  ROS_DEBUG("Adding tcp socket [%d] to pollset", sock_);
144  poll_set_->addSocket(sock_, boost::bind(&TransportTCP::socketUpdate, this, boost::placeholders::_1), shared_from_this());
145 #if defined(POLLRDHUP) // POLLRDHUP is not part of POSIX!
146  // This is needed to detect dead connections. #1704
147  poll_set_->addEvents(sock_, POLLRDHUP);
148 #endif
149  }
150 
151  if (!(flags_ & SYNCHRONOUS))
152  {
153  //enableRead();
154  }
155 
156  return true;
157 }
158 
160 {
161  std::string nodelay;
162  if (header.getValue("tcp_nodelay", nodelay) && nodelay == "1")
163  {
164  ROSCPP_LOG_DEBUG("Setting nodelay on socket [%d]", sock_);
165  setNoDelay(true);
166  }
167 }
168 
169 void TransportTCP::setNoDelay(bool nodelay)
170 {
171  int flag = nodelay ? 1 : 0;
172  int result = setsockopt(sock_, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
173  if (result < 0)
174  {
175  ROS_ERROR("setsockopt failed to set TCP_NODELAY on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
176  }
177 }
178 
179 void TransportTCP::setKeepAlive(bool use, uint32_t idle, uint32_t interval, uint32_t count)
180 {
181  if (use)
182  {
183  int val = 1;
184  if (setsockopt(sock_, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast<const char*>(&val), sizeof(val)) != 0)
185  {
186  ROS_DEBUG("setsockopt failed to set SO_KEEPALIVE on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
187  }
188 
189 /* cygwin SOL_TCP does not seem to support TCP_KEEPIDLE, TCP_KEEPINTVL, TCP_KEEPCNT */
190 #if defined(SOL_TCP) && defined(TCP_KEEPIDLE)
191  val = idle;
192  if (setsockopt(sock_, SOL_TCP, TCP_KEEPIDLE, &val, sizeof(val)) != 0)
193  {
194  ROS_DEBUG("setsockopt failed to set TCP_KEEPIDLE on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
195  }
196 #else
197  (void)idle;
198 #endif
199 
200 #if defined(SOL_TCP) && defined(TCP_KEEPINTVL)
201  val = interval;
202  if (setsockopt(sock_, SOL_TCP, TCP_KEEPINTVL, &val, sizeof(val)) != 0)
203  {
204  ROS_DEBUG("setsockopt failed to set TCP_KEEPINTVL on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
205  }
206 #else
207  (void)interval;
208 #endif
209 
210 #if defined(SOL_TCP) && defined(TCP_KEEPCNT)
211  val = count;
212  if (setsockopt(sock_, SOL_TCP, TCP_KEEPCNT, &val, sizeof(val)) != 0)
213  {
214  ROS_DEBUG("setsockopt failed to set TCP_KEEPCNT on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
215  }
216 #else
217  (void)count;
218 #endif
219  }
220  else
221  {
222  int val = 0;
223  if (setsockopt(sock_, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast<const char*>(&val), sizeof(val)) != 0)
224  {
225  ROS_DEBUG("setsockopt failed to set SO_KEEPALIVE on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
226  }
227  }
228 }
229 
230 bool TransportTCP::connect(const std::string& host, int port)
231 {
232  if (!isHostAllowed(host))
233  return false; // adios amigo
234 
235  sock_ = socket(s_use_ipv6_ ? AF_INET6 : AF_INET, SOCK_STREAM, 0);
236  connected_host_ = host;
237  connected_port_ = port;
238 
239  if (sock_ == ROS_INVALID_SOCKET)
240  {
241  ROS_ERROR("socket() failed with error [%s]", last_socket_error_string());
242  return false;
243  }
244 
245  setNonBlocking();
246 
247  sockaddr_storage sas;
248  socklen_t sas_len;
249 
250  in_addr ina;
251  in6_addr in6a;
252  if (inet_pton(AF_INET, host.c_str(), &ina) == 1)
253  {
254  sockaddr_in *address = (sockaddr_in*) &sas;
255  sas_len = sizeof(sockaddr_in);
256 
257  la_len_ = sizeof(sockaddr_in);
258  address->sin_family = AF_INET;
259  address->sin_port = htons(port);
260  address->sin_addr.s_addr = ina.s_addr;
261  }
262  else if (inet_pton(AF_INET6, host.c_str(), &in6a) == 1)
263  {
264  sockaddr_in6 *address = (sockaddr_in6*) &sas;
265  sas_len = sizeof(sockaddr_in6);
266  la_len_ = sizeof(sockaddr_in6);
267  address->sin6_family = AF_INET6;
268  address->sin6_port = htons(port);
269  memcpy(address->sin6_addr.s6_addr, in6a.s6_addr, sizeof(in6a.s6_addr));
270  }
271  else
272  {
273  struct addrinfo* addr;
274  struct addrinfo hints;
275  memset(&hints, 0, sizeof(hints));
276  hints.ai_family = AF_UNSPEC;
277 
278  if (getaddrinfo(host.c_str(), NULL, &hints, &addr) != 0)
279  {
280  close();
281  ROS_ERROR("couldn't resolve publisher host [%s]", host.c_str());
282  return false;
283  }
284 
285  bool found = false;
286  struct addrinfo* it = addr;
287  char namebuf[128] = {};
288  for (; it; it = it->ai_next)
289  {
290  if (!s_use_ipv6_ && it->ai_family == AF_INET)
291  {
292  sockaddr_in *address = (sockaddr_in*) &sas;
293  sas_len = sizeof(*address);
294 
295  memcpy(address, it->ai_addr, it->ai_addrlen);
296  address->sin_family = it->ai_family;
297  address->sin_port = htons(port);
298 
299  strncpy(namebuf, inet_ntoa(address->sin_addr), sizeof(namebuf)-1);
300  found = true;
301  break;
302  }
303  if (s_use_ipv6_ && it->ai_family == AF_INET6)
304  {
305  sockaddr_in6 *address = (sockaddr_in6*) &sas;
306  sas_len = sizeof(*address);
307 
308  memcpy(address, it->ai_addr, it->ai_addrlen);
309  address->sin6_family = it->ai_family;
310  address->sin6_port = htons((u_short) port);
311 
312  // TODO IPV6: does inet_ntop need other includes for Windows?
313  inet_ntop(AF_INET6, (void*)&(address->sin6_addr), namebuf, sizeof(namebuf));
314  found = true;
315  break;
316  }
317  }
318 
319  freeaddrinfo(addr);
320 
321  if (!found)
322  {
323  ROS_ERROR("Couldn't resolve an address for [%s]\n", host.c_str());
324  return false;
325  }
326 
327  ROSCPP_LOG_DEBUG("Resolved publisher host [%s] to [%s] for socket [%d]", host.c_str(), namebuf, sock_);
328  }
329 
330  int ret = ::connect(sock_, (sockaddr*) &sas, sas_len);
331  // windows might need some time to sleep (input from service robotics hack) add this if testing proves it is necessary.
332  // ROS_ASSERT((flags_ & SYNCHRONOUS) || ret != 0);
333  if (((flags_ & SYNCHRONOUS) && ret != 0) || // synchronous, connect() should return 0
335  // asynchronous, connect() may return 0 or -1. When return -1, WSAGetLastError()=WSAEWOULDBLOCK/errno=EINPROGRESS
336  {
337  ROSCPP_CONN_LOG_DEBUG("Connect to tcpros publisher [%s:%d] failed with error [%d, %s]", host.c_str(), port, ret, last_socket_error_string());
338  close();
339 
340  return false;
341  }
342 
343  // from daniel stonier:
344 #ifdef WIN32
345  // This is hackish, but windows fails at recv() if its slow to connect (e.g. happens with wireless)
346  // recv() needs to check if its connected or not when its asynchronous?
347  Sleep(100);
348 #endif
349 
350 
351  std::stringstream ss;
352  ss << host << ":" << port << " on socket " << sock_;
353  cached_remote_host_ = ss.str();
354 
355  if (!initializeSocket())
356  {
357  return false;
358  }
359 
360  if (flags_ & SYNCHRONOUS)
361  {
362  ROSCPP_CONN_LOG_DEBUG("connect() succeeded to [%s:%d] on socket [%d]", host.c_str(), port, sock_);
363  }
364  else
365  {
366  ROSCPP_CONN_LOG_DEBUG("Async connect() in progress to [%s:%d] on socket [%d]", host.c_str(), port, sock_);
367  }
368 
369  return true;
370 }
371 
372 bool TransportTCP::listen(int port, int backlog, const AcceptCallback& accept_cb)
373 {
374  is_server_ = true;
375  accept_cb_ = accept_cb;
376 
377  if (s_use_ipv6_)
378  {
379  sock_ = socket(AF_INET6, SOCK_STREAM, 0);
380  sockaddr_in6 *address = (sockaddr_in6 *)&server_address_;
381  address->sin6_family = AF_INET6;
382  address->sin6_addr = isOnlyLocalhostAllowed() ?
383  in6addr_loopback :
384  in6addr_any;
385  address->sin6_port = htons(port);
386  sa_len_ = sizeof(sockaddr_in6);
387  }
388  else
389  {
390  sock_ = socket(AF_INET, SOCK_STREAM, 0);
391  sockaddr_in *address = (sockaddr_in *)&server_address_;
392  address->sin_family = AF_INET;
393  address->sin_addr.s_addr = isOnlyLocalhostAllowed() ?
394  htonl(INADDR_LOOPBACK) :
395  INADDR_ANY;
396  address->sin_port = htons(port);
397  sa_len_ = sizeof(sockaddr_in);
398  }
399 
400  if (sock_ == ROS_INVALID_SOCKET)
401  {
402  ROS_ERROR("socket() failed with error [%s]", last_socket_error_string());
403  return false;
404  }
405 
406 
407  if (bind(sock_, (sockaddr *)&server_address_, sa_len_) < 0)
408  {
409  ROS_ERROR("bind() failed with error [%s]", last_socket_error_string());
410  return false;
411  }
412 
413  ::listen(sock_, backlog);
414  getsockname(sock_, (sockaddr *)&server_address_, &sa_len_);
415 
416  switch (server_address_.ss_family)
417  {
418  case AF_INET:
419  server_port_ = ntohs(((sockaddr_in *)&server_address_)->sin_port);
420  break;
421  case AF_INET6:
422  server_port_ = ntohs(((sockaddr_in6 *)&server_address_)->sin6_port);
423  break;
424  }
425 
426  if (!initializeSocket())
427  {
428  return false;
429  }
430 
431  if (!(flags_ & SYNCHRONOUS))
432  {
433  enableRead();
434  }
435 
436  return true;
437 }
438 
440 {
441  Callback disconnect_cb;
442 
443  if (!closed_)
444  {
445  {
446  boost::recursive_mutex::scoped_lock lock(close_mutex_);
447 
448  if (!closed_)
449  {
450  closed_ = true;
451 
453 
454  if (poll_set_)
455  {
457  }
458 
460  if ( close_socket(sock_) != 0 )
461  {
462  ROS_ERROR("Error closing socket [%d]: [%s]", sock_, last_socket_error_string());
463  } else
464  {
465  ROSCPP_LOG_DEBUG("TCP socket [%d] closed", sock_);
466  }
468 
469  disconnect_cb = disconnect_cb_;
470 
472  read_cb_ = Callback();
473  write_cb_ = Callback();
475  }
476  }
477  }
478 
479  if (disconnect_cb)
480  {
481  disconnect_cb(shared_from_this());
482  }
483 }
484 
485 int32_t TransportTCP::read(uint8_t* buffer, uint32_t size)
486 {
487  {
488  boost::recursive_mutex::scoped_lock lock(close_mutex_);
489 
490  // if socket is async and not connected, check if it's conneted
491  if (!(flags_ & SYNCHRONOUS) && !async_connected_ && !closed_) {
492  int ret, err;
493  ret = is_async_connected(sock_, err);
494  if (ret == 1) {
495  ROSCPP_CONN_LOG_DEBUG("Async socket[%d] is connected", sock_);
496  async_connected_ = true;
497  } else if (ret == -1) {
498  ROSCPP_LOG_DEBUG("Async connect on socket [%d] failed with error [%s]", sock_, socket_error_string(err));
499  close();
500  } else {
501  // socket is connecting
502  return 0;
503  }
504  }
505 
506  if (closed_)
507  {
508  ROSCPP_LOG_DEBUG("Tried to read on a closed socket [%d]", sock_);
509  return -1;
510  }
511  }
512 
513  ROS_ASSERT(size > 0);
514 
515  // never read more than INT_MAX since this is the maximum we can report back with the current return type
516  uint32_t read_size = std::min(size, static_cast<uint32_t>(INT_MAX));
517  int num_bytes = ::recv(sock_, reinterpret_cast<char*>(buffer), read_size, 0);
518  if (num_bytes < 0)
519  {
520  if ( !last_socket_error_is_would_block() ) // !WSAWOULDBLOCK / !EAGAIN && !EWOULDBLOCK
521  {
522  ROSCPP_LOG_DEBUG("recv() on socket [%d] failed with error [%s]", sock_, last_socket_error_string());
523  close();
524  }
525  else
526  {
527  num_bytes = 0;
528  }
529  }
530  else if (num_bytes == 0)
531  {
532  ROSCPP_LOG_DEBUG("Socket [%d] received 0/%u bytes, closing", sock_, size);
533  close();
534  return -1;
535  }
536 
537  return num_bytes;
538 }
539 
540 int32_t TransportTCP::write(uint8_t* buffer, uint32_t size)
541 {
542  {
543  boost::recursive_mutex::scoped_lock lock(close_mutex_);
544 
545  // if socket is async and not connected, check if it's conneted
546  if (!(flags_ & SYNCHRONOUS) && !async_connected_ && !closed_) {
547  int ret, err;
548  ret = is_async_connected(sock_, err);
549  if (ret == 1) {
550  ROSCPP_CONN_LOG_DEBUG("Async socket[%d] is connected", sock_);
551  async_connected_ = true;
552  } else if (ret == -1) {
553  ROSCPP_LOG_DEBUG("Async connect on socket [%d] failed with error [%s]", sock_, socket_error_string(err));
554  close();
555  } else {
556  // socket is connecting
557  return 0;
558  }
559  }
560 
561  if (closed_)
562  {
563  ROSCPP_LOG_DEBUG("Tried to write on a closed socket [%d]", sock_);
564  return -1;
565  }
566  }
567 
568  ROS_ASSERT(size > 0);
569 
570  // never write more than INT_MAX since this is the maximum we can report back with the current return type
571  uint32_t writesize = std::min(size, static_cast<uint32_t>(INT_MAX));
572  int num_bytes = ::send(sock_, reinterpret_cast<const char*>(buffer), writesize, 0);
573  if (num_bytes < 0)
574  {
576  {
577  ROSCPP_LOG_DEBUG("send() on socket [%d] failed with error [%s]", sock_, last_socket_error_string());
578  close();
579  }
580  else
581  {
582  num_bytes = 0;
583  }
584  }
585 
586  return num_bytes;
587 }
588 
590 {
592 
593  {
594  boost::recursive_mutex::scoped_lock lock(close_mutex_);
595 
596  if (closed_)
597  {
598  return;
599  }
600  }
601 
602  if (!expecting_read_)
603  {
604  poll_set_->addEvents(sock_, POLLIN);
605  expecting_read_ = true;
606  }
607 }
608 
610 {
612 
613  {
614  boost::recursive_mutex::scoped_lock lock(close_mutex_);
615 
616  if (closed_)
617  {
618  return;
619  }
620  }
621 
622  if (expecting_read_)
623  {
624  poll_set_->delEvents(sock_, POLLIN);
625  expecting_read_ = false;
626  }
627 }
628 
630 {
632 
633  {
634  boost::recursive_mutex::scoped_lock lock(close_mutex_);
635 
636  if (closed_)
637  {
638  return;
639  }
640  }
641 
642  if (!expecting_write_)
643  {
644  poll_set_->addEvents(sock_, POLLOUT);
645  expecting_write_ = true;
646  }
647 }
648 
650 {
652 
653  {
654  boost::recursive_mutex::scoped_lock lock(close_mutex_);
655 
656  if (closed_)
657  {
658  return;
659  }
660  }
661 
662  if (expecting_write_)
663  {
664  poll_set_->delEvents(sock_, POLLOUT);
665  expecting_write_ = false;
666  }
667 }
668 
670 {
672 
673  sockaddr client_address;
674  socklen_t len = sizeof(client_address);
675  int new_sock = ::accept(sock_, (sockaddr *)&client_address, &len);
676  if (new_sock >= 0)
677  {
678  ROSCPP_LOG_DEBUG("Accepted connection on socket [%d], new socket [%d]", sock_, new_sock);
679 
680  TransportTCPPtr transport(boost::make_shared<TransportTCP>(poll_set_, flags_));
681  if (!transport->setSocket(new_sock))
682  {
683  ROS_ERROR("Failed to set socket on transport for socket %d", new_sock);
684  }
685 
686  return transport;
687  }
688  else
689  {
690  ROS_ERROR("accept() on socket [%d] failed with error [%s]", sock_, last_socket_error_string());
691  }
692 
693  return TransportTCPPtr();
694 }
695 
697 {
698  {
699  boost::recursive_mutex::scoped_lock lock(close_mutex_);
700  if (closed_)
701  {
702  return;
703  }
704 
705  // Handle read events before err/hup/nval, since there may be data left on the wire
706  if ((events & POLLIN) && expecting_read_)
707  {
708  if (is_server_)
709  {
710  // Should not block here, because poll() said that it's ready
711  // for reading
712  TransportTCPPtr transport = accept();
713  if (transport)
714  {
716  accept_cb_(transport);
717  }
718  }
719  else
720  {
721  if (read_cb_)
722  {
723  read_cb_(shared_from_this());
724  }
725  }
726  }
727 
728  if ((events & POLLOUT) && expecting_write_)
729  {
730  if (write_cb_)
731  {
732  write_cb_(shared_from_this());
733  }
734  }
735  }
736 
737  if((events & POLLERR) ||
738  (events & POLLHUP) ||
739 #if defined(POLLRDHUP) // POLLRDHUP is not part of POSIX!
740  (events & POLLRDHUP) ||
741 #endif
742  (events & POLLNVAL))
743  {
744  uint32_t error = -1;
745  socklen_t len = sizeof(error);
746  if (getsockopt(sock_, SOL_SOCKET, SO_ERROR, reinterpret_cast<char*>(&error), &len) < 0)
747  {
748  ROSCPP_LOG_DEBUG("getsockopt failed on socket [%d]", sock_);
749  }
750  #ifdef _MSC_VER
751  char err[60];
752  strerror_s(err,60,error);
753  ROSCPP_LOG_DEBUG("Socket %d closed with (ERR|HUP|NVAL) events %d: %s", sock_, events, err);
754  #else
755  ROSCPP_LOG_DEBUG("Socket %d closed with (ERR|HUP|NVAL) events %d: %s", sock_, events, strerror(error));
756  #endif
757 
758  close();
759  }
760 }
761 
763 {
764  std::stringstream str;
765  str << "TCPROS connection on port " << local_port_ << " to [" << cached_remote_host_ << "]";
766  return str.str();
767 }
768 
770 {
772 
773  sockaddr_storage sas;
774  socklen_t sas_len = sizeof(sas);
775  getpeername(sock_, (sockaddr *)&sas, &sas_len);
776 
777  sockaddr_in *sin = (sockaddr_in *)&sas;
778  sockaddr_in6 *sin6 = (sockaddr_in6 *)&sas;
779 
780  char namebuf[128] = {};
781  int port;
782 
783  switch (sas.ss_family)
784  {
785  case AF_INET:
786  port = ntohs(sin->sin_port);
787  strncpy(namebuf, inet_ntoa(sin->sin_addr), sizeof(namebuf)-1);
788  break;
789  case AF_INET6:
790  port = ntohs(sin6->sin6_port);
791  inet_ntop(AF_INET6, (void*)&(sin6->sin6_addr), namebuf, sizeof(namebuf));
792  break;
793  default:
794  namebuf[0] = 0;
795  port = 0;
796  break;
797  }
798 
799  std::string ip = namebuf;
800  std::stringstream uri;
801  uri << ip << ":" << port;
802 
803  return uri.str();
804 }
805 
806 } // namespace ros
ros::TransportTCP::disableWrite
virtual void disableWrite()
Disable writing on this transport. Allows derived classes to, for example, disable write polling for ...
Definition: transport_tcp.cpp:649
ros::socket_error_string
const ROSCPP_DECL char * socket_error_string(int err)
Definition: io.cpp:78
ROSCPP_CONN_LOG_DEBUG
#define ROSCPP_CONN_LOG_DEBUG(...)
Definition: file_log.h:36
ROS_SOCKETS_SHUT_RDWR
#define ROS_SOCKETS_SHUT_RDWR
Definition: io.h:106
ros::PollSet::delSocket
bool delSocket(int sock)
Delete a socket.
Definition: poll_set.cpp:94
boost::shared_ptr< TransportTCP >
ros::TransportTCP::initializeSocket
bool initializeSocket()
Initializes the assigned socket – sets it to non-blocking and enables reading.
Definition: transport_tcp.cpp:95
ros::TransportTCP::expecting_write_
bool expecting_write_
Definition: transport_tcp.h:148
ros
ros::TransportTCP::sock_
socket_fd_t sock_
Definition: transport_tcp.h:142
ros::TransportTCP::local_address_
sockaddr_storage local_address_
Definition: transport_tcp.h:153
ros::TransportTCP::accept
TransportTCPPtr accept()
Accept a connection on a server socket. Blocks until a connection is available.
Definition: transport_tcp.cpp:669
ros::Header
ros::TransportTCP::parseHeader
virtual void parseHeader(const Header &header)
Provides an opportunity for transport-specific options to come in through the header.
Definition: transport_tcp.cpp:159
ros::is_async_connected
ROSCPP_DECL int is_async_connected(socket_fd_t &socket, int &err)
Definition: io.cpp:384
ros::TransportTCP::expecting_read_
bool expecting_read_
Definition: transport_tcp.h:147
ros::Transport::write_cb_
Callback write_cb_
Definition: transport.h:137
ros::TransportTCP::s_use_keepalive_
static bool s_use_keepalive_
Definition: transport_tcp.h:59
ros::shutdown
ROSCPP_DECL void shutdown()
Disconnects everything and unregisters from the master. It is generally not necessary to call this fu...
Definition: init.cpp:594
ros::TransportTCP::cached_remote_host_
std::string cached_remote_host_
Definition: transport_tcp.h:160
ros::TransportTCP::write
virtual int32_t write(uint8_t *buffer, uint32_t size)
Write a number of bytes from the supplied buffer. Not guaranteed to actually write that number of byt...
Definition: transport_tcp.cpp:540
ros::TransportTCP::close
virtual void close()
Close this transport. Once this call has returned, writing on this transport should always return an ...
Definition: transport_tcp.cpp:439
ros::PollSet::addEvents
bool addEvents(int sock, int events)
Add events to be polled on a socket.
Definition: poll_set.cpp:126
ros::TransportTCP::connected_host_
std::string connected_host_
Definition: transport_tcp.h:165
ros::TransportTCP::close_mutex_
boost::recursive_mutex close_mutex_
Definition: transport_tcp.h:145
ROS_SOCKETS_ASYNCHRONOUS_CONNECT_RETURN
#define ROS_SOCKETS_ASYNCHRONOUS_CONNECT_RETURN
Definition: io.h:108
ros::TransportTCP::server_address_
sockaddr_storage server_address_
Definition: transport_tcp.h:151
ros::TransportTCP::is_server_
bool is_server_
Definition: transport_tcp.h:150
ros::TransportTCP::la_len_
socklen_t la_len_
Definition: transport_tcp.h:154
ros::TransportTCP::setSocket
bool setSocket(int sock)
Set the socket to be used by this transport.
Definition: transport_tcp.cpp:74
ros::TransportTCP::enableRead
virtual void enableRead()
Enable reading on this transport. Allows derived classes to, for example, enable read polling for asy...
Definition: transport_tcp.cpp:589
poll_set.h
ros::TransportTCP::disableRead
virtual void disableRead()
Disable reading on this transport. Allows derived classes to, for example, disable read polling for a...
Definition: transport_tcp.cpp:609
ros::PollSet::addSocket
bool addSocket(int sock, const SocketUpdateFunc &update_func, const TransportPtr &transport=TransportPtr())
Add a socket.
Definition: poll_set.cpp:66
ros::TransportTCP::socketUpdate
void socketUpdate(int events)
Definition: transport_tcp.cpp:696
ros::Transport::Callback
boost::function< void(const TransportPtr &)> Callback
Definition: transport.h:105
ros::TransportTCP::setNoDelay
void setNoDelay(bool nodelay)
Definition: transport_tcp.cpp:169
ROS_DEBUG
#define ROS_DEBUG(...)
ros::TransportTCP::getTransportInfo
virtual std::string getTransportInfo()
Returns a string description of both the type of transport and who the transport is connected to.
Definition: transport_tcp.cpp:762
ros::TransportTCP::getClientURI
std::string getClientURI()
Returns the URI of the remote host.
Definition: transport_tcp.cpp:769
ros::TransportTCP::SYNCHRONOUS
@ SYNCHRONOUS
Definition: transport_tcp.h:65
transport_tcp.h
ros::TransportTCP::connect
bool connect(const std::string &host, int port)
Connect to a remote host.
Definition: transport_tcp.cpp:230
ROS_ASSERT_MSG
#define ROS_ASSERT_MSG(cond,...)
ros::TransportTCP::closed_
bool closed_
Definition: transport_tcp.h:144
ros::TransportTCP::sa_len_
socklen_t sa_len_
Definition: transport_tcp.h:152
ros::Transport::isOnlyLocalhostAllowed
bool isOnlyLocalhostAllowed() const
returns true if this transport is only allowed to talk to localhost
Definition: transport.h:147
ros::PollSet::delEvents
bool delEvents(int sock, int events)
Delete events to be polled on a socket.
Definition: poll_set.cpp:148
ros::TransportTCP::local_port_
int local_port_
Definition: transport_tcp.h:157
ros::last_socket_error_is_would_block
ROSCPP_DECL bool last_socket_error_is_would_block()
Definition: io.cpp:98
ros::TransportTCP::enableWrite
virtual void enableWrite()
Enable writing on this transport. Allows derived classes to, for example, enable write polling for as...
Definition: transport_tcp.cpp:629
ros::TransportTCP::TransportTCP
TransportTCP(PollSet *poll_set, int flags=0)
Definition: transport_tcp.cpp:54
ros::TransportTCP::~TransportTCP
virtual ~TransportTCP()
Definition: transport_tcp.cpp:69
ros::last_socket_error
ROSCPP_DECL int last_socket_error()
Definition: io.cpp:70
ros::TransportTCP::read
virtual int32_t read(uint8_t *buffer, uint32_t size)
Read a number of bytes into the supplied buffer. Not guaranteed to actually read that number of bytes...
Definition: transport_tcp.cpp:485
ros::TransportTCPPtr
boost::shared_ptr< TransportTCP > TransportTCPPtr
Definition: forwards.h:58
ros::close_socket
ROSCPP_DECL int close_socket(socket_fd_t &socket)
Close the socket.
Definition: io.cpp:524
ros::Transport::isHostAllowed
bool isHostAllowed(const std::string &host) const
returns true if the transport is allowed to connect to the host passed to it.
Definition: transport.cpp:111
ros::TransportTCP::setKeepAlive
void setKeepAlive(bool use, uint32_t idle, uint32_t interval, uint32_t count)
Definition: transport_tcp.cpp:179
ros::TransportTCP::accept_cb_
AcceptCallback accept_cb_
Definition: transport_tcp.h:158
io.h
ROS_ERROR
#define ROS_ERROR(...)
ros::TransportTCP::flags_
int flags_
Definition: transport_tcp.h:163
header.h
ros::TransportTCP::s_use_ipv6_
static bool s_use_ipv6_
Definition: transport_tcp.h:60
ros::TransportTCP::listen
bool listen(int port, int backlog, const AcceptCallback &accept_cb)
Start a server socket and listen on a port.
Definition: transport_tcp.cpp:372
ros::TransportTCP::server_port_
int server_port_
Definition: transport_tcp.h:156
ros::TransportTCP::async_connected_
bool async_connected_
Definition: transport_tcp.h:143
ros::TransportTCP::setNonBlocking
bool setNonBlocking()
Definition: transport_tcp.cpp:80
ROS_INVALID_SOCKET
#define ROS_INVALID_SOCKET
Definition: io.h:107
ros::Transport::disconnect_cb_
Callback disconnect_cb_
Definition: transport.h:135
header
const std::string header
assert.h
ros::TransportTCP::poll_set_
PollSet * poll_set_
Definition: transport_tcp.h:162
ros::TransportTCP::AcceptCallback
boost::function< void(const TransportTCPPtr &)> AcceptCallback
Definition: transport_tcp.h:84
ROS_ASSERT
#define ROS_ASSERT(cond)
ros::Transport::read_cb_
Callback read_cb_
Definition: transport.h:136
ros::TransportTCP::connected_port_
int connected_port_
Definition: transport_tcp.h:166
ros::set_non_blocking
ROSCPP_DECL int set_non_blocking(socket_fd_t &socket)
Definition: io.cpp:503
ROSCPP_LOG_DEBUG
#define ROSCPP_LOG_DEBUG(...)
Definition: file_log.h:35
ros::last_socket_error_string
const ROSCPP_DECL char * last_socket_error_string()
Definition: io.cpp:90
ros::PollSet
Manages a set of sockets being polled through the poll() function call.
Definition: poll_set.h:57
file_log.h


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas
autogenerated on Sat Oct 17 2020 19:28:47