transport_tcp.cpp
Go to the documentation of this file.
00001 /*
00002  * Software License Agreement (BSD License)
00003  *
00004  *  Copyright (c) 2008, Willow Garage, Inc.
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions
00009  *  are met:
00010  *
00011  *   * Redistributions of source code must retain the above copyright
00012  *     notice, this list of conditions and the following disclaimer.
00013  *   * Redistributions in binary form must reproduce the above
00014  *     copyright notice, this list of conditions and the following
00015  *     disclaimer in the documentation and/or other materials provided
00016  *     with the distribution.
00017  *   * Neither the name of Willow Garage, Inc. nor the names of its
00018  *     contributors may be used to endorse or promote products derived
00019  *     from this software without specific prior written permission.
00020  *
00021  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024  *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025  *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027  *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028  *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029  *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030  *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031  *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032  *  POSSIBILITY OF SUCH DAMAGE.
00033  */
00034 
00035 #include "ros/io.h"
00036 #include "ros/transport/transport_tcp.h"
00037 #include "ros/poll_set.h"
00038 #include "ros/header.h"
00039 #include "ros/file_log.h"
00040 #include <ros/assert.h>
00041 #include <sstream>
00042 #include <boost/bind.hpp>
00043 #include <fcntl.h>
00044 #include <errno.h>
00045 namespace ros
00046 {
00047 
00048 bool TransportTCP::s_use_keepalive_ = true;
00049 
00050 TransportTCP::TransportTCP(PollSet* poll_set, int flags)
00051 : sock_(ROS_INVALID_SOCKET)
00052 , closed_(false)
00053 , expecting_read_(false)
00054 , expecting_write_(false)
00055 , is_server_(false)
00056 , server_port_(-1)
00057 , poll_set_(poll_set)
00058 , flags_(flags)
00059 {
00060 
00061 }
00062 
00063 TransportTCP::~TransportTCP()
00064 {
00065   ROS_ASSERT_MSG(sock_ == -1, "TransportTCP socket [%d] was never closed", sock_);
00066 }
00067 
00068 bool TransportTCP::setSocket(int sock)
00069 {
00070   sock_ = sock;
00071   return initializeSocket();
00072 }
00073 
00074 bool TransportTCP::setNonBlocking()
00075 {
00076   if (!(flags_ & SYNCHRONOUS))
00077   {
00078           int result = set_non_blocking(sock_);
00079           if ( result != 0 ) {
00080               ROS_ERROR("setting socket [%d] as non_blocking failed with error [%d]", sock_, result);
00081       close();
00082       return false;
00083     }
00084   }
00085 
00086   return true;
00087 }
00088 
00089 bool TransportTCP::initializeSocket()
00090 {
00091   ROS_ASSERT(sock_ != ROS_INVALID_SOCKET);
00092 
00093   if (!setNonBlocking())
00094   {
00095     return false;
00096   }
00097 
00098   setKeepAlive(s_use_keepalive_, 60, 10, 9);
00099 
00100   // connect() will set cached_remote_host_ because it already has the host/port available
00101   if (cached_remote_host_.empty())
00102   {
00103     if (is_server_)
00104     {
00105       cached_remote_host_ = "TCPServer Socket";
00106     }
00107     else
00108     {
00109       std::stringstream ss;
00110       ss << getClientURI() << " on socket " << sock_;
00111       cached_remote_host_ = ss.str();
00112     }
00113   }
00114 
00115 #ifdef ROSCPP_USE_TCP_NODELAY
00116   setNoDelay(true);
00117 #endif
00118 
00119   ROS_ASSERT(poll_set_ || (flags_ & SYNCHRONOUS));
00120   if (poll_set_)
00121   {
00122     ROS_DEBUG("Adding tcp socket [%d] to pollset", sock_);
00123     poll_set_->addSocket(sock_, boost::bind(&TransportTCP::socketUpdate, this, _1), shared_from_this());
00124   }
00125 
00126   if (!(flags_ & SYNCHRONOUS))
00127   {
00128     //enableRead();
00129   }
00130 
00131   return true;
00132 }
00133 
00134 void TransportTCP::parseHeader(const Header& header)
00135 {
00136   std::string nodelay;
00137   if (header.getValue("tcp_nodelay", nodelay) && nodelay == "1")
00138   {
00139     ROSCPP_LOG_DEBUG("Setting nodelay on socket [%d]", sock_);
00140     setNoDelay(true);
00141   }
00142 }
00143 
00144 void TransportTCP::setNoDelay(bool nodelay)
00145 {
00146   int flag = nodelay ? 1 : 0;
00147   int result = setsockopt(sock_, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
00148   if (result < 0)
00149   {
00150     ROS_ERROR("setsockopt failed to set TCP_NODELAY on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00151   }
00152 }
00153 
00154 void TransportTCP::setKeepAlive(bool use, uint32_t idle, uint32_t interval, uint32_t count)
00155 {
00156   if (use)
00157   {
00158     int val = 1;
00159     if (setsockopt(sock_, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast<const char*>(&val), sizeof(val)) != 0)
00160     {
00161       ROS_DEBUG("setsockopt failed to set SO_KEEPALIVE on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00162     }
00163 
00164 /* cygwin SOL_TCP does not seem to support TCP_KEEPIDLE, TCP_KEEPINTVL, TCP_KEEPCNT */
00165 #if defined(SOL_TCP) && !defined(__CYGWIN__)
00166     val = idle;
00167     if (setsockopt(sock_, SOL_TCP, TCP_KEEPIDLE, &val, sizeof(val)) != 0)
00168     {
00169       ROS_DEBUG("setsockopt failed to set TCP_KEEPIDLE on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00170     }
00171 
00172     val = interval;
00173     if (setsockopt(sock_, SOL_TCP, TCP_KEEPINTVL, &val, sizeof(val)) != 0)
00174     {
00175       ROS_DEBUG("setsockopt failed to set TCP_KEEPINTVL on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00176     }
00177 
00178     val = count;
00179     if (setsockopt(sock_, SOL_TCP, TCP_KEEPCNT, &val, sizeof(val)) != 0)
00180     {
00181       ROS_DEBUG("setsockopt failed to set TCP_KEEPCNT on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00182     }
00183 #endif
00184   }
00185   else
00186   {
00187     int val = 0;
00188     if (setsockopt(sock_, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast<const char*>(&val), sizeof(val)) != 0)
00189     {
00190         ROS_DEBUG("setsockopt failed to set SO_KEEPALIVE on socket [%d] [%s]", sock_, cached_remote_host_.c_str());
00191     }
00192   }
00193 }
00194 
00195 bool TransportTCP::connect(const std::string& host, int port)
00196 {
00197   sock_ = socket(AF_INET, SOCK_STREAM, 0);
00198   connected_host_ = host;
00199   connected_port_ = port;
00200 
00201   if (sock_ == ROS_INVALID_SOCKET)
00202   {
00203     ROS_ERROR("socket() failed with error [%s]",  last_socket_error_string());
00204     return false;
00205   }
00206 
00207   setNonBlocking();
00208 
00209   sockaddr_in sin;
00210   sin.sin_family = AF_INET;
00211   if (inet_addr(host.c_str()) == INADDR_NONE)
00212   {
00213     struct addrinfo* addr;
00214     if (getaddrinfo(host.c_str(), NULL, NULL, &addr) != 0)
00215     {
00216       close();
00217       ROS_ERROR("couldn't resolve publisher host [%s]", host.c_str());
00218       return false;
00219     }
00220 
00221     bool found = false;
00222     struct addrinfo* it = addr;
00223     for (; it; it = it->ai_next)
00224     {
00225       if (it->ai_family == AF_INET)
00226       {
00227         memcpy(&sin, it->ai_addr, it->ai_addrlen);
00228         sin.sin_family = it->ai_family;
00229         sin.sin_port = htons(port);
00230 
00231         found = true;
00232         break;
00233       }
00234     }
00235 
00236     freeaddrinfo(addr);
00237 
00238     if (!found)
00239     {
00240       ROS_ERROR("Couldn't find an AF_INET address for [%s]\n", host.c_str());
00241       return false;
00242     }
00243 
00244     ROSCPP_LOG_DEBUG("Resolved publisher host [%s] to [%s] for socket [%d]", host.c_str(), inet_ntoa(sin.sin_addr), sock_);
00245   }
00246   else
00247   {
00248     sin.sin_addr.s_addr = inet_addr(host.c_str()); // already an IP addr
00249   }
00250 
00251   sin.sin_port = htons(port);
00252 
00253   int ret = ::connect(sock_, (sockaddr *)&sin, sizeof(sin));
00254   // windows might need some time to sleep (input from service robotics hack) add this if testing proves it is necessary.
00255   ROS_ASSERT((flags_ & SYNCHRONOUS) || ret != 0);
00256   if (((flags_ & SYNCHRONOUS) && ret != 0) || // synchronous, connect() should return 0
00257       (!(flags_ & SYNCHRONOUS) && last_socket_error() != ROS_SOCKETS_ASYNCHRONOUS_CONNECT_RETURN)) // asynchronous, connect() should return -1 and WSAGetLastError()=WSAEWOULDBLOCK/errno=EINPROGRESS
00258   {
00259     ROSCPP_LOG_DEBUG("Connect to tcpros publisher [%s:%d] failed with error [%d, %s]", host.c_str(), port, ret, last_socket_error_string());
00260     close();
00261 
00262     return false;
00263   }
00264 
00265   // from daniel stonier:
00266 #ifdef WIN32
00267   // This is hackish, but windows fails at recv() if its slow to connect (e.g. happens with wireless)
00268   // recv() needs to check if its connected or not when its asynchronous?
00269   Sleep(100);
00270 #endif
00271 
00272 
00273   std::stringstream ss;
00274   ss << host << ":" << port << " on socket " << sock_;
00275   cached_remote_host_ = ss.str();
00276 
00277   if (!initializeSocket())
00278   {
00279     return false;
00280   }
00281 
00282   if (flags_ & SYNCHRONOUS)
00283   {
00284     ROSCPP_LOG_DEBUG("connect() succeeded to [%s:%d] on socket [%d]", host.c_str(), port, sock_);
00285   }
00286   else
00287   {
00288     ROSCPP_LOG_DEBUG("Async connect() in progress to [%s:%d] on socket [%d]", host.c_str(), port, sock_);
00289   }
00290 
00291   return true;
00292 }
00293 
00294 bool TransportTCP::listen(int port, int backlog, const AcceptCallback& accept_cb)
00295 {
00296   is_server_ = true;
00297   accept_cb_ = accept_cb;
00298 
00299   sock_ = socket(AF_INET, SOCK_STREAM, 0);
00300 
00301   if (sock_ <= 0)
00302   {
00303     ROS_ERROR("socket() failed with error [%s]", last_socket_error_string());
00304     return false;
00305   }
00306 
00307   server_address_.sin_family = AF_INET;
00308   server_address_.sin_port = htons(port);
00309   server_address_.sin_addr.s_addr = INADDR_ANY;
00310   if (bind(sock_, (sockaddr *)&server_address_, sizeof(server_address_)) < 0)
00311   {
00312     ROS_ERROR("bind() failed with error [%s]", last_socket_error_string());
00313     return false;
00314   }
00315 
00316   ::listen(sock_, backlog);
00317   socklen_t len = sizeof(server_address_);
00318   getsockname(sock_, (sockaddr *)&server_address_, &len);
00319   server_port_ = ntohs(server_address_.sin_port);
00320 
00321   if (!initializeSocket())
00322   {
00323     return false;
00324   }
00325 
00326   if (!(flags_ & SYNCHRONOUS))
00327   {
00328     enableRead();
00329   }
00330 
00331   return true;
00332 }
00333 
00334 void TransportTCP::close()
00335 {
00336   Callback disconnect_cb;
00337 
00338   if (!closed_)
00339   {
00340     {
00341       boost::recursive_mutex::scoped_lock lock(close_mutex_);
00342 
00343       if (!closed_)
00344       {
00345         closed_ = true;
00346 
00347         ROS_ASSERT(sock_ != ROS_INVALID_SOCKET);
00348 
00349         if (poll_set_)
00350         {
00351           poll_set_->delSocket(sock_);
00352         }
00353 
00354         ::shutdown(sock_, ROS_SOCKETS_SHUT_RDWR);
00355         if ( close_socket(sock_) != 0 )
00356         {
00357           ROS_ERROR("Error closing socket [%d]: [%s]", sock_, last_socket_error_string());
00358         } else
00359         {
00360           ROSCPP_LOG_DEBUG("TCP socket [%d] closed", sock_);
00361         }
00362         sock_ = ROS_INVALID_SOCKET;
00363 
00364         disconnect_cb = disconnect_cb_;
00365 
00366         disconnect_cb_ = Callback();
00367         read_cb_ = Callback();
00368         write_cb_ = Callback();
00369         accept_cb_ = AcceptCallback();
00370       }
00371     }
00372   }
00373 
00374   if (disconnect_cb)
00375   {
00376     disconnect_cb(shared_from_this());
00377   }
00378 }
00379 
00380 int32_t TransportTCP::read(uint8_t* buffer, uint32_t size)
00381 {
00382   {
00383     boost::recursive_mutex::scoped_lock lock(close_mutex_);
00384 
00385     if (closed_)
00386     {
00387       ROSCPP_LOG_DEBUG("Tried to read on a closed socket [%d]", sock_);
00388       return -1;
00389     }
00390   }
00391 
00392   ROS_ASSERT((int32_t)size > 0);
00393 
00394   int num_bytes = ::recv(sock_, reinterpret_cast<char*>(buffer), size, 0);
00395   if (num_bytes < 0)
00396   {
00397         if ( !last_socket_error_is_would_block() ) // !WSAWOULDBLOCK / !EAGAIN && !EWOULDBLOCK
00398     {
00399       ROSCPP_LOG_DEBUG("recv() on socket [%d] failed with error [%s]", sock_, last_socket_error_string());
00400       close();
00401     }
00402     else
00403     {
00404       num_bytes = 0;
00405     }
00406   }
00407   else if (num_bytes == 0)
00408   {
00409     ROSCPP_LOG_DEBUG("Socket [%d] received 0/%d bytes, closing", sock_, size);
00410     close();
00411     return -1;
00412   }
00413 
00414   return num_bytes;
00415 }
00416 
00417 int32_t TransportTCP::write(uint8_t* buffer, uint32_t size)
00418 {
00419   {
00420     boost::recursive_mutex::scoped_lock lock(close_mutex_);
00421 
00422     if (closed_)
00423     {
00424       ROSCPP_LOG_DEBUG("Tried to write on a closed socket [%d]", sock_);
00425       return -1;
00426     }
00427   }
00428 
00429   ROS_ASSERT((int32_t)size > 0);
00430 
00431   int num_bytes = ::send(sock_, reinterpret_cast<const char*>(buffer), size, 0);
00432   if (num_bytes < 0)
00433   {
00434     if ( !last_socket_error_is_would_block() )
00435     {
00436       ROSCPP_LOG_DEBUG("send() on socket [%d] failed with error [%s]", sock_, last_socket_error_string());
00437       close();
00438     }
00439     else
00440     {
00441       num_bytes = 0;
00442     }
00443   }
00444 
00445   return num_bytes;
00446 }
00447 
00448 void TransportTCP::enableRead()
00449 {
00450   ROS_ASSERT(!(flags_ & SYNCHRONOUS));
00451 
00452   {
00453     boost::recursive_mutex::scoped_lock lock(close_mutex_);
00454 
00455     if (closed_)
00456     {
00457       return;
00458     }
00459   }
00460 
00461   if (!expecting_read_)
00462   {
00463     poll_set_->addEvents(sock_, POLLIN);
00464     expecting_read_ = true;
00465   }
00466 }
00467 
00468 void TransportTCP::disableRead()
00469 {
00470   ROS_ASSERT(!(flags_ & SYNCHRONOUS));
00471 
00472   {
00473     boost::recursive_mutex::scoped_lock lock(close_mutex_);
00474 
00475     if (closed_)
00476     {
00477       return;
00478     }
00479   }
00480 
00481   if (expecting_read_)
00482   {
00483     poll_set_->delEvents(sock_, POLLIN);
00484     expecting_read_ = false;
00485   }
00486 }
00487 
00488 void TransportTCP::enableWrite()
00489 {
00490   ROS_ASSERT(!(flags_ & SYNCHRONOUS));
00491 
00492   {
00493     boost::recursive_mutex::scoped_lock lock(close_mutex_);
00494 
00495     if (closed_)
00496     {
00497       return;
00498     }
00499   }
00500 
00501   if (!expecting_write_)
00502   {
00503     poll_set_->addEvents(sock_, POLLOUT);
00504     expecting_write_ = true;
00505   }
00506 }
00507 
00508 void TransportTCP::disableWrite()
00509 {
00510   ROS_ASSERT(!(flags_ & SYNCHRONOUS));
00511 
00512   {
00513     boost::recursive_mutex::scoped_lock lock(close_mutex_);
00514 
00515     if (closed_)
00516     {
00517       return;
00518     }
00519   }
00520 
00521   if (expecting_write_)
00522   {
00523     poll_set_->delEvents(sock_, POLLOUT);
00524     expecting_write_ = false;
00525   }
00526 }
00527 
00528 TransportTCPPtr TransportTCP::accept()
00529 {
00530   ROS_ASSERT(is_server_);
00531 
00532   sockaddr client_address;
00533   socklen_t len = sizeof(client_address);
00534   int new_sock = ::accept(sock_, (sockaddr *)&client_address, &len);
00535   if (new_sock >= 0)
00536   {
00537     ROSCPP_LOG_DEBUG("Accepted connection on socket [%d], new socket [%d]", sock_, new_sock);
00538 
00539     TransportTCPPtr transport(new TransportTCP(poll_set_, flags_));
00540     if (!transport->setSocket(new_sock))
00541     {
00542       ROS_ERROR("Failed to set socket on transport for socket %d", new_sock);
00543     }
00544 
00545     return transport;
00546   }
00547   else
00548   {
00549     ROS_ERROR("accept() on socket [%d] failed with error [%s]", sock_,  last_socket_error_string());
00550   }
00551 
00552   return TransportTCPPtr();
00553 }
00554 
00555 void TransportTCP::socketUpdate(int events)
00556 {
00557   boost::recursive_mutex::scoped_lock lock(close_mutex_);
00558   {
00559     if (closed_)
00560     {
00561       return;
00562     }
00563   }
00564 
00565   // Handle read events before err/hup/nval, since there may be data left on the wire
00566   if ((events & POLLIN) && expecting_read_)
00567   {
00568     if (is_server_)
00569     {
00570       // Should not block here, because poll() said that it's ready
00571       // for reading
00572       TransportTCPPtr transport = accept();
00573       if (transport)
00574       {
00575         ROS_ASSERT(accept_cb_);
00576         accept_cb_(transport);
00577       }
00578     }
00579     else
00580     {
00581       if (read_cb_)
00582       {
00583         read_cb_(shared_from_this());
00584       }
00585     }
00586   }
00587 
00588   {
00589     boost::recursive_mutex::scoped_lock lock(close_mutex_);
00590 
00591     if (closed_)
00592     {
00593       return;
00594     }
00595   }
00596 
00597   if ((events & POLLOUT) && expecting_write_)
00598   {
00599     if (write_cb_)
00600     {
00601       write_cb_(shared_from_this());
00602     }
00603   }
00604 
00605   {
00606     boost::recursive_mutex::scoped_lock lock(close_mutex_);
00607 
00608     if (closed_)
00609     {
00610       return;
00611     }
00612   }
00613 
00614   if((events & POLLERR) ||
00615      (events & POLLHUP) ||
00616      (events & POLLNVAL))
00617   {
00618     uint32_t error = -1;
00619     socklen_t len = sizeof(error);
00620     if (getsockopt(sock_, SOL_SOCKET, SO_ERROR, reinterpret_cast<char*>(&error), &len) < 0)
00621     {
00622       ROSCPP_LOG_DEBUG("getsockopt failed on socket [%d]", sock_);
00623     }
00624   #ifdef _MSC_VER
00625     char err[60];
00626     strerror_s(err,60,error);
00627     ROSCPP_LOG_DEBUG("Socket %d closed with (ERR|HUP|NVAL) events %d: %s", sock_, events, err);
00628   #else
00629     ROSCPP_LOG_DEBUG("Socket %d closed with (ERR|HUP|NVAL) events %d: %s", sock_, events, strerror(error));
00630   #endif
00631 
00632     close();
00633   }
00634 }
00635 
00636 std::string TransportTCP::getTransportInfo()
00637 {
00638   return "TCPROS connection to [" + cached_remote_host_ + "]";
00639 }
00640 
00641 std::string TransportTCP::getClientURI()
00642 {
00643   ROS_ASSERT(!is_server_);
00644 
00645   sockaddr_in addr;
00646   socklen_t len = sizeof(addr);
00647   getpeername(sock_, (sockaddr *)&addr, &len);
00648   int port = ntohs(addr.sin_port);
00649   std::string ip = inet_ntoa(addr.sin_addr);
00650 
00651   std::stringstream uri;
00652   uri << ip << ":" << port;
00653 
00654   return uri.str();
00655 }
00656 
00657 } // namespace ros


roscpp
Author(s): Morgan Quigley mquigley@cs.stanford.edu, Josh Faust jfaust@willowgarage.com, Brian Gerkey gerkey@willowgarage.com, Troy Straszheim straszheim@willowgarage.com
autogenerated on Sat Dec 28 2013 17:35:52