transport_tcp.cpp
Go to the documentation of this file.
00001 /*
00002  * Software License Agreement (BSD License)
00003  *
00004  *  Copyright (c) 2008, Willow Garage, Inc.
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions
00009  *  are met:
00010  *
00011  *   * Redistributions of source code must retain the above copyright
00012  *     notice, this list of conditions and the following disclaimer.
00013  *   * Redistributions in binary form must reproduce the above
00014  *     copyright notice, this list of conditions and the following
00015  *     disclaimer in the documentation and/or other materials provided
00016  *     with the distribution.
00017  *   * Neither the name of Willow Garage, Inc. nor the names of its
00018  *     contributors may be used to endorse or promote products derived
00019  *     from this software without specific prior written permission.
00020  *
00021  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024  *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025  *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027  *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028  *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029  *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030  *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031  *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032  *  POSSIBILITY OF SUCH DAMAGE.
00033  */
00034 
00035 #include "ros/io.h"
00036 #include "ros/transport/transport_tcp.h"
00037 #include "ros/poll_set.h"
00038 #include "ros/header.h"
00039 #include "ros/file_log.h"
00040 #include <ros/assert.h>
00041 #include <sstream>
00042 #include <boost/bind.hpp>
00043 #include <fcntl.h>
00044 #include <errno.h>
00045 namespace ros
00046 {
00047 
00048 bool TransportTCP::s_use_keepalive_ = true;
00049 bool TransportTCP::s_use_ipv6_ = false;
00050 
00051 TransportTCP::TransportTCP(PollSet* poll_set, int flags)
00052 : sock_(ROS_INVALID_SOCKET)
00053 , closed_(false)
00054 , expecting_read_(false)
00055 , expecting_write_(false)
00056 , is_server_(false)
00057 , server_port_(-1)
00058 , local_port_(-1)
00059 , poll_set_(poll_set)
00060 , flags_(flags)
00061 {
00062 
00063 }
00064 
00065 TransportTCP::~TransportTCP()
00066 {
00067   ROS_ASSERT_MSG(sock_ == -1, "TransportTCP socket [%d] was never closed", sock_);
00068 }
00069 
00070 bool TransportTCP::setSocket(int sock)
00071 {
00072   sock_ = sock;
00073   return initializeSocket();
00074 }
00075 
00076 bool TransportTCP::setNonBlocking()
00077 {
00078   if (!(flags_ & SYNCHRONOUS))
00079   {
00080           int result = set_non_blocking(sock_);
00081           if ( result != 0 ) {
00082               ROS_ERROR("setting socket [%d] as non_blocking failed with error [%d]", sock_, result);
00083       close();
00084       return false;
00085     }
00086   }
00087 
00088   return true;
00089 }
00090 
00091 bool TransportTCP::initializeSocket()
00092 {
00093   ROS_ASSERT(sock_ != ROS_INVALID_SOCKET);
00094 
00095   if (!setNonBlocking())
00096   {
00097     return false;
00098   }
00099 
00100   setKeepAlive(s_use_keepalive_, 60, 10, 9);
00101 
00102   // connect() will set cached_remote_host_ because it already has the host/port available
00103   if (cached_remote_host_.empty())
00104   {
00105     if (is_server_)
00106     {
00107       cached_remote_host_ = "TCPServer Socket";
00108     }
00109     else
00110     {
00111       std::stringstream ss;
00112       ss << getClientURI() << " on socket " << sock_;
00113       cached_remote_host_ = ss.str();
00114     }
00115   }
00116 
00117   if (local_port_ < 0)
00118   {
00119     la_len_ = s_use_ipv6_  ? sizeof(sockaddr_in6) : sizeof(sockaddr_in);
00120     getsockname(sock_, (sockaddr *)&local_address_, &la_len_);
00121     switch (local_address_.ss_family)
00122     {
00123       case AF_INET:
00124         local_port_ = ntohs(((sockaddr_in *)&local_address_)->sin_port);
00125         break;
00126       case AF_INET6:
00127         local_port_ = ntohs(((sockaddr_in6 *)&local_address_)->sin6_port);
00128         break;
00129     }
00130   }
00131 
00132 #ifdef ROSCPP_USE_TCP_NODELAY
00133   setNoDelay(true);
00134 #endif
00135 
00136   ROS_ASSERT(poll_set_ || (flags_ & SYNCHRONOUS));
00137   if (poll_set_)
00138   {
00139     ROS_DEBUG("Adding tcp socket [%d] to pollset", sock_);
00140     poll_set_->addSocket(sock_, boost::bind(&TransportTCP::socketUpdate, this, _1), shared_from_this());
00141   }
00142 
00143   if (!(flags_ & SYNCHRONOUS))
00144   {
00145     //enableRead();
00146   }
00147 
00148   return true;
00149 }
00150 
00151 void TransportTCP::parseHeader(const Header& header)
00152 {
00153   std::string nodelay;
00154   if (header.getValue("tcp_nodelay", nodelay) && nodelay == "1")
00155   {
00156     ROSCPP_LOG_DEBUG("Setting nodelay on socket [%d]", sock_);
00157     setNoDelay(true);
00158   }
00159 }
00160 
00161 void TransportTCP::setNoDelay(bool nodelay)
00162 {
00163   int flag = nodelay ? 1 : 0;
00164   int result = setsockopt(sock_, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
00165   if (result < 0)
00166   {
00167     ROS_ERROR("setsockopt failed to set TCP_NODELAY on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00168   }
00169 }
00170 
00171 void TransportTCP::setKeepAlive(bool use, uint32_t idle, uint32_t interval, uint32_t count)
00172 {
00173   if (use)
00174   {
00175     int val = 1;
00176     if (setsockopt(sock_, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast<const char*>(&val), sizeof(val)) != 0)
00177     {
00178       ROS_DEBUG("setsockopt failed to set SO_KEEPALIVE on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00179     }
00180 
00181 /* cygwin SOL_TCP does not seem to support TCP_KEEPIDLE, TCP_KEEPINTVL, TCP_KEEPCNT */
00182 #if defined(SOL_TCP) && !defined(__CYGWIN__)
00183     val = idle;
00184     if (setsockopt(sock_, SOL_TCP, TCP_KEEPIDLE, &val, sizeof(val)) != 0)
00185     {
00186       ROS_DEBUG("setsockopt failed to set TCP_KEEPIDLE on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00187     }
00188 
00189     val = interval;
00190     if (setsockopt(sock_, SOL_TCP, TCP_KEEPINTVL, &val, sizeof(val)) != 0)
00191     {
00192       ROS_DEBUG("setsockopt failed to set TCP_KEEPINTVL on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00193     }
00194 
00195     val = count;
00196     if (setsockopt(sock_, SOL_TCP, TCP_KEEPCNT, &val, sizeof(val)) != 0)
00197     {
00198       ROS_DEBUG("setsockopt failed to set TCP_KEEPCNT on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00199     }
00200 #endif
00201   }
00202   else
00203   {
00204     int val = 0;
00205     if (setsockopt(sock_, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast<const char*>(&val), sizeof(val)) != 0)
00206     {
00207         ROS_DEBUG("setsockopt failed to set SO_KEEPALIVE on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00208     }
00209   }
00210 }
00211 
00212 bool TransportTCP::connect(const std::string& host, int port)
00213 {
00214   if (!isHostAllowed(host))
00215     return false; // adios amigo
00216 
00217   sock_ = socket(s_use_ipv6_ ? AF_INET6 : AF_INET, SOCK_STREAM, 0);
00218   connected_host_ = host;
00219   connected_port_ = port;
00220 
00221   if (sock_ == ROS_INVALID_SOCKET)
00222   {
00223     ROS_ERROR("socket() failed with error [%s]",  last_socket_error_string());
00224     return false;
00225   }
00226 
00227   setNonBlocking();
00228 
00229   sockaddr_storage sas;
00230   socklen_t sas_len;
00231 
00232   in_addr ina;
00233   in6_addr in6a;
00234   if (inet_pton(AF_INET, host.c_str(), &ina) == 1)
00235   {
00236     sockaddr_in *address = (sockaddr_in*) &sas;
00237     sas_len = sizeof(sockaddr_in);
00238     
00239     la_len_ = sizeof(sockaddr_in);
00240     address->sin_family = AF_INET;
00241     address->sin_port = htons(port);
00242     address->sin_addr.s_addr = ina.s_addr;
00243   }
00244   else if (inet_pton(AF_INET6, host.c_str(), &in6a) == 1)
00245   {
00246     sockaddr_in6 *address = (sockaddr_in6*) &sas;
00247     sas_len = sizeof(sockaddr_in6);
00248     la_len_ = sizeof(sockaddr_in6);
00249     address->sin6_family = AF_INET6;
00250     address->sin6_port = htons(port);
00251     memcpy(address->sin6_addr.s6_addr, in6a.s6_addr, sizeof(in6a.s6_addr));
00252   }
00253   else
00254   {
00255     struct addrinfo* addr;
00256     struct addrinfo hints;
00257     memset(&hints, 0, sizeof(hints));
00258     hints.ai_family = AF_UNSPEC;
00259 
00260     if (getaddrinfo(host.c_str(), NULL, &hints, &addr) != 0)
00261     {
00262       close();
00263       ROS_ERROR("couldn't resolve publisher host [%s]", host.c_str());
00264       return false;
00265     }
00266 
00267     bool found = false;
00268     struct addrinfo* it = addr;
00269     char namebuf[128];
00270     for (; it; it = it->ai_next)
00271     {
00272       if (!s_use_ipv6_ && it->ai_family == AF_INET)
00273       {
00274         sockaddr_in *address = (sockaddr_in*) &sas;
00275         sas_len = sizeof(*address);
00276         
00277         memcpy(address, it->ai_addr, it->ai_addrlen);
00278         address->sin_family = it->ai_family;
00279         address->sin_port = htons(port);
00280         
00281         strcpy(namebuf, inet_ntoa(address->sin_addr));
00282         found = true;
00283         break;
00284       }
00285       if (s_use_ipv6_ && it->ai_family == AF_INET6)
00286       {
00287         sockaddr_in6 *address = (sockaddr_in6*) &sas;
00288         sas_len = sizeof(*address);
00289       
00290         memcpy(address, it->ai_addr, it->ai_addrlen);
00291         address->sin6_family = it->ai_family;
00292         address->sin6_port = htons((u_short) port);
00293       
00294         // TODO IPV6: does inet_ntop need other includes for Windows?
00295         inet_ntop(AF_INET6, (void*)&(address->sin6_addr), namebuf, sizeof(namebuf));
00296         found = true;
00297         break;
00298       }
00299     }
00300 
00301     freeaddrinfo(addr);
00302 
00303     if (!found)
00304     {
00305       ROS_ERROR("Couldn't resolve an address for [%s]\n", host.c_str());
00306       return false;
00307     }
00308 
00309     ROSCPP_LOG_DEBUG("Resolved publisher host [%s] to [%s] for socket [%d]", host.c_str(), namebuf, sock_);
00310   }
00311 
00312   int ret = ::connect(sock_, (sockaddr*) &sas, sas_len);
00313   // windows might need some time to sleep (input from service robotics hack) add this if testing proves it is necessary.
00314   ROS_ASSERT((flags_ & SYNCHRONOUS) || ret != 0);
00315   if (((flags_ & SYNCHRONOUS) && ret != 0) || // synchronous, connect() should return 0
00316       (!(flags_ & SYNCHRONOUS) && last_socket_error() != ROS_SOCKETS_ASYNCHRONOUS_CONNECT_RETURN)) // asynchronous, connect() should return -1 and WSAGetLastError()=WSAEWOULDBLOCK/errno=EINPROGRESS
00317   {
00318     ROSCPP_LOG_DEBUG("Connect to tcpros publisher [%s:%d] failed with error [%d, %s]", host.c_str(), port, ret, last_socket_error_string());
00319     close();
00320 
00321     return false;
00322   }
00323 
00324   // from daniel stonier:
00325 #ifdef WIN32
00326   // This is hackish, but windows fails at recv() if its slow to connect (e.g. happens with wireless)
00327   // recv() needs to check if its connected or not when its asynchronous?
00328   Sleep(100);
00329 #endif
00330 
00331 
00332   std::stringstream ss;
00333   ss << host << ":" << port << " on socket " << sock_;
00334   cached_remote_host_ = ss.str();
00335 
00336   if (!initializeSocket())
00337   {
00338     return false;
00339   }
00340 
00341   if (flags_ & SYNCHRONOUS)
00342   {
00343     ROSCPP_LOG_DEBUG("connect() succeeded to [%s:%d] on socket [%d]", host.c_str(), port, sock_);
00344   }
00345   else
00346   {
00347     ROSCPP_LOG_DEBUG("Async connect() in progress to [%s:%d] on socket [%d]", host.c_str(), port, sock_);
00348   }
00349 
00350   return true;
00351 }
00352 
00353 bool TransportTCP::listen(int port, int backlog, const AcceptCallback& accept_cb)
00354 {
00355   is_server_ = true;
00356   accept_cb_ = accept_cb;
00357 
00358   if (s_use_ipv6_)
00359   {
00360     sock_ = socket(AF_INET6, SOCK_STREAM, 0);
00361     sockaddr_in6 *address = (sockaddr_in6 *)&server_address_;
00362     address->sin6_family = AF_INET6;
00363     address->sin6_addr = isOnlyLocalhostAllowed() ? 
00364                          in6addr_loopback : 
00365                          in6addr_any;
00366     address->sin6_port = htons(port);
00367     sa_len_ = sizeof(sockaddr_in6);
00368   }
00369   else
00370   {
00371     sock_ = socket(AF_INET, SOCK_STREAM, 0);
00372     sockaddr_in *address = (sockaddr_in *)&server_address_;
00373     address->sin_family = AF_INET;
00374     address->sin_addr.s_addr = isOnlyLocalhostAllowed() ? 
00375                                htonl(INADDR_LOOPBACK) : 
00376                                INADDR_ANY;
00377     address->sin_port = htons(port);
00378     sa_len_ = sizeof(sockaddr_in);
00379   }
00380 
00381   if (sock_ <= 0)
00382   {
00383     ROS_ERROR("socket() failed with error [%s]", last_socket_error_string());
00384     return false;
00385   }
00386 
00387 
00388   if (bind(sock_, (sockaddr *)&server_address_, sa_len_) < 0)
00389   {
00390     ROS_ERROR("bind() failed with error [%s]", last_socket_error_string());
00391     return false;
00392   }
00393 
00394   ::listen(sock_, backlog);
00395   getsockname(sock_, (sockaddr *)&server_address_, &sa_len_);
00396 
00397   switch (server_address_.ss_family)
00398   {
00399     case AF_INET:
00400       server_port_ = ntohs(((sockaddr_in *)&server_address_)->sin_port);
00401       break;
00402     case AF_INET6:
00403       server_port_ = ntohs(((sockaddr_in6 *)&server_address_)->sin6_port);
00404       break;
00405   }
00406 
00407   if (!initializeSocket())
00408   {
00409     return false;
00410   }
00411 
00412   if (!(flags_ & SYNCHRONOUS))
00413   {
00414     enableRead();
00415   }
00416 
00417   return true;
00418 }
00419 
00420 void TransportTCP::close()
00421 {
00422   Callback disconnect_cb;
00423 
00424   if (!closed_)
00425   {
00426     {
00427       boost::recursive_mutex::scoped_lock lock(close_mutex_);
00428 
00429       if (!closed_)
00430       {
00431         closed_ = true;
00432 
00433         ROS_ASSERT(sock_ != ROS_INVALID_SOCKET);
00434 
00435         if (poll_set_)
00436         {
00437           poll_set_->delSocket(sock_);
00438         }
00439 
00440         ::shutdown(sock_, ROS_SOCKETS_SHUT_RDWR);
00441         if ( close_socket(sock_) != 0 )
00442         {
00443           ROS_ERROR("Error closing socket [%d]: [%s]", sock_, last_socket_error_string());
00444         } else
00445         {
00446           ROSCPP_LOG_DEBUG("TCP socket [%d] closed", sock_);
00447         }
00448         sock_ = ROS_INVALID_SOCKET;
00449 
00450         disconnect_cb = disconnect_cb_;
00451 
00452         disconnect_cb_ = Callback();
00453         read_cb_ = Callback();
00454         write_cb_ = Callback();
00455         accept_cb_ = AcceptCallback();
00456       }
00457     }
00458   }
00459 
00460   if (disconnect_cb)
00461   {
00462     disconnect_cb(shared_from_this());
00463   }
00464 }
00465 
00466 int32_t TransportTCP::read(uint8_t* buffer, uint32_t size)
00467 {
00468   {
00469     boost::recursive_mutex::scoped_lock lock(close_mutex_);
00470 
00471     if (closed_)
00472     {
00473       ROSCPP_LOG_DEBUG("Tried to read on a closed socket [%d]", sock_);
00474       return -1;
00475     }
00476   }
00477 
00478   ROS_ASSERT(size > 0);
00479 
00480   // never read more than INT_MAX since this is the maximum we can report back with the current return type
00481   uint32_t read_size = std::min(size, static_cast<uint32_t>(INT_MAX));
00482   int num_bytes = ::recv(sock_, reinterpret_cast<char*>(buffer), read_size, 0);
00483   if (num_bytes < 0)
00484   {
00485         if ( !last_socket_error_is_would_block() ) // !WSAWOULDBLOCK / !EAGAIN && !EWOULDBLOCK
00486     {
00487       ROSCPP_LOG_DEBUG("recv() on socket [%d] failed with error [%s]", sock_, last_socket_error_string());
00488       close();
00489     }
00490     else
00491     {
00492       num_bytes = 0;
00493     }
00494   }
00495   else if (num_bytes == 0)
00496   {
00497     ROSCPP_LOG_DEBUG("Socket [%d] received 0/%u bytes, closing", sock_, size);
00498     close();
00499     return -1;
00500   }
00501 
00502   return num_bytes;
00503 }
00504 
00505 int32_t TransportTCP::write(uint8_t* buffer, uint32_t size)
00506 {
00507   {
00508     boost::recursive_mutex::scoped_lock lock(close_mutex_);
00509 
00510     if (closed_)
00511     {
00512       ROSCPP_LOG_DEBUG("Tried to write on a closed socket [%d]", sock_);
00513       return -1;
00514     }
00515   }
00516 
00517   ROS_ASSERT(size > 0);
00518 
00519   // never write more than INT_MAX since this is the maximum we can report back with the current return type
00520   uint32_t writesize = std::min(size, static_cast<uint32_t>(INT_MAX));
00521   int num_bytes = ::send(sock_, reinterpret_cast<const char*>(buffer), writesize, 0);
00522   if (num_bytes < 0)
00523   {
00524     if ( !last_socket_error_is_would_block() )
00525     {
00526       ROSCPP_LOG_DEBUG("send() on socket [%d] failed with error [%s]", sock_, last_socket_error_string());
00527       close();
00528     }
00529     else
00530     {
00531       num_bytes = 0;
00532     }
00533   }
00534 
00535   return num_bytes;
00536 }
00537 
00538 void TransportTCP::enableRead()
00539 {
00540   ROS_ASSERT(!(flags_ & SYNCHRONOUS));
00541 
00542   {
00543     boost::recursive_mutex::scoped_lock lock(close_mutex_);
00544 
00545     if (closed_)
00546     {
00547       return;
00548     }
00549   }
00550 
00551   if (!expecting_read_)
00552   {
00553     poll_set_->addEvents(sock_, POLLIN);
00554     expecting_read_ = true;
00555   }
00556 }
00557 
00558 void TransportTCP::disableRead()
00559 {
00560   ROS_ASSERT(!(flags_ & SYNCHRONOUS));
00561 
00562   {
00563     boost::recursive_mutex::scoped_lock lock(close_mutex_);
00564 
00565     if (closed_)
00566     {
00567       return;
00568     }
00569   }
00570 
00571   if (expecting_read_)
00572   {
00573     poll_set_->delEvents(sock_, POLLIN);
00574     expecting_read_ = false;
00575   }
00576 }
00577 
00578 void TransportTCP::enableWrite()
00579 {
00580   ROS_ASSERT(!(flags_ & SYNCHRONOUS));
00581 
00582   {
00583     boost::recursive_mutex::scoped_lock lock(close_mutex_);
00584 
00585     if (closed_)
00586     {
00587       return;
00588     }
00589   }
00590 
00591   if (!expecting_write_)
00592   {
00593     poll_set_->addEvents(sock_, POLLOUT);
00594     expecting_write_ = true;
00595   }
00596 }
00597 
00598 void TransportTCP::disableWrite()
00599 {
00600   ROS_ASSERT(!(flags_ & SYNCHRONOUS));
00601 
00602   {
00603     boost::recursive_mutex::scoped_lock lock(close_mutex_);
00604 
00605     if (closed_)
00606     {
00607       return;
00608     }
00609   }
00610 
00611   if (expecting_write_)
00612   {
00613     poll_set_->delEvents(sock_, POLLOUT);
00614     expecting_write_ = false;
00615   }
00616 }
00617 
00618 TransportTCPPtr TransportTCP::accept()
00619 {
00620   ROS_ASSERT(is_server_);
00621 
00622   sockaddr client_address;
00623   socklen_t len = sizeof(client_address);
00624   int new_sock = ::accept(sock_, (sockaddr *)&client_address, &len);
00625   if (new_sock >= 0)
00626   {
00627     ROSCPP_LOG_DEBUG("Accepted connection on socket [%d], new socket [%d]", sock_, new_sock);
00628 
00629     TransportTCPPtr transport(boost::make_shared<TransportTCP>(poll_set_, flags_));
00630     if (!transport->setSocket(new_sock))
00631     {
00632       ROS_ERROR("Failed to set socket on transport for socket %d", new_sock);
00633     }
00634 
00635     return transport;
00636   }
00637   else
00638   {
00639     ROS_ERROR("accept() on socket [%d] failed with error [%s]", sock_,  last_socket_error_string());
00640   }
00641 
00642   return TransportTCPPtr();
00643 }
00644 
00645 void TransportTCP::socketUpdate(int events)
00646 {
00647   {
00648     boost::recursive_mutex::scoped_lock lock(close_mutex_);
00649     if (closed_)
00650     {
00651       return;
00652     }
00653 
00654     // Handle read events before err/hup/nval, since there may be data left on the wire
00655     if ((events & POLLIN) && expecting_read_)
00656     {
00657       if (is_server_)
00658       {
00659         // Should not block here, because poll() said that it's ready
00660         // for reading
00661         TransportTCPPtr transport = accept();
00662         if (transport)
00663         {
00664           ROS_ASSERT(accept_cb_);
00665           accept_cb_(transport);
00666         }
00667       }
00668       else
00669       {
00670         if (read_cb_)
00671         {
00672           read_cb_(shared_from_this());
00673         }
00674       }
00675     }
00676 
00677     if ((events & POLLOUT) && expecting_write_)
00678     {
00679       if (write_cb_)
00680       {
00681         write_cb_(shared_from_this());
00682       }
00683     }
00684   }
00685 
00686   if((events & POLLERR) ||
00687      (events & POLLHUP) ||
00688      (events & POLLNVAL))
00689   {
00690     uint32_t error = -1;
00691     socklen_t len = sizeof(error);
00692     if (getsockopt(sock_, SOL_SOCKET, SO_ERROR, reinterpret_cast<char*>(&error), &len) < 0)
00693     {
00694       ROSCPP_LOG_DEBUG("getsockopt failed on socket [%d]", sock_);
00695     }
00696   #ifdef _MSC_VER
00697     char err[60];
00698     strerror_s(err,60,error);
00699     ROSCPP_LOG_DEBUG("Socket %d closed with (ERR|HUP|NVAL) events %d: %s", sock_, events, err);
00700   #else
00701     ROSCPP_LOG_DEBUG("Socket %d closed with (ERR|HUP|NVAL) events %d: %s", sock_, events, strerror(error));
00702   #endif
00703 
00704     close();
00705   }
00706 }
00707 
00708 std::string TransportTCP::getTransportInfo()
00709 {
00710   std::stringstream str;
00711   str << "TCPROS connection on port " << local_port_ << " to [" << cached_remote_host_ << "]";
00712   return str.str();
00713 }
00714 
00715 std::string TransportTCP::getClientURI()
00716 {
00717   ROS_ASSERT(!is_server_);
00718 
00719   sockaddr_storage sas;
00720   socklen_t sas_len = sizeof(sas);
00721   getpeername(sock_, (sockaddr *)&sas, &sas_len);
00722   
00723   sockaddr_in *sin = (sockaddr_in *)&sas;
00724   sockaddr_in6 *sin6 = (sockaddr_in6 *)&sas;
00725 
00726   char namebuf[128];
00727   int port;
00728 
00729   switch (sas.ss_family)
00730   {
00731     case AF_INET:
00732       port = ntohs(sin->sin_port);
00733       strcpy(namebuf, inet_ntoa(sin->sin_addr));
00734       break;
00735     case AF_INET6:
00736       port = ntohs(sin6->sin6_port);
00737       inet_ntop(AF_INET6, (void*)&(sin6->sin6_addr), namebuf, sizeof(namebuf));
00738       break;
00739     default:
00740       namebuf[0] = 0;
00741       port = 0;
00742       break;
00743   }
00744 
00745   std::string ip = namebuf;
00746   std::stringstream uri;
00747   uri << ip << ":" << port;
00748 
00749   return uri.str();
00750 }
00751 
00752 } // namespace ros


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Thu Jun 6 2019 21:10:05