transport_udp.cpp
Go to the documentation of this file.
00001 /*
00002  * Software License Agreement (BSD License)
00003  *
00004  *  Copyright (c) 2008, Willow Garage, Inc.
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions
00009  *  are met:
00010  *
00011  *   * Redistributions of source code must retain the above copyright
00012  *     notice, this list of conditions and the following disclaimer.
00013  *   * Redistributions in binary form must reproduce the above
00014  *     copyright notice, this list of conditions and the following
00015  *     disclaimer in the documentation and/or other materials provided
00016  *     with the distribution.
00017  *   * Neither the name of Willow Garage, Inc. nor the names of its
00018  *     contributors may be used to endorse or promote products derived
00019  *     from this software without specific prior written permission.
00020  *
00021  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024  *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025  *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027  *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028  *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029  *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030  *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031  *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032  *  POSSIBILITY OF SUCH DAMAGE.
00033  */
00034 
00035 #include "ros/transport/transport_udp.h"
00036 #include "ros/poll_set.h"
00037 #include "ros/file_log.h"
00038 
00039 #include <ros/assert.h>
00040 #include <boost/bind.hpp>
00041 
00042 #include <fcntl.h>
00043 #if defined(__APPLE__)
00044   // For readv() and writev()
00045   #include <sys/types.h>
00046   #include <sys/uio.h>
00047   #include <unistd.h>
00048 #elif defined(__ANDROID__)
00049   // For readv() and writev() on ANDROID
00050   #include <sys/uio.h>
00051 #endif
00052 
00053 namespace ros
00054 {
00055 
00056 TransportUDP::TransportUDP(PollSet* poll_set, int flags, int max_datagram_size)
00057 : sock_(-1)
00058 , closed_(false)
00059 , expecting_read_(false)
00060 , expecting_write_(false)
00061 , is_server_(false)
00062 , server_port_(-1)
00063 , local_port_(-1)
00064 , poll_set_(poll_set)
00065 , flags_(flags)
00066 , connection_id_(0)
00067 , current_message_id_(0)
00068 , total_blocks_(0)
00069 , last_block_(0)
00070 , max_datagram_size_(max_datagram_size)
00071 , data_filled_(0)
00072 , reorder_buffer_(0)
00073 , reorder_bytes_(0)
00074 {
00075   // This may eventually be machine dependent
00076   if (max_datagram_size_ == 0)
00077     max_datagram_size_ = 1500;
00078   reorder_buffer_ = new uint8_t[max_datagram_size_];
00079   reorder_start_ = reorder_buffer_;
00080   data_buffer_ = new uint8_t[max_datagram_size_];
00081   data_start_ = data_buffer_;
00082 }
00083 
00084 TransportUDP::~TransportUDP()
00085 {
00086   ROS_ASSERT_MSG(sock_ == ROS_INVALID_SOCKET, "TransportUDP socket [%d] was never closed", sock_);
00087   delete [] reorder_buffer_;
00088   delete [] data_buffer_;
00089 }
00090 
00091 bool TransportUDP::setSocket(int sock)
00092 {
00093   sock_ = sock;
00094   return initializeSocket();
00095 }
00096 
00097 void TransportUDP::socketUpdate(int events)
00098 {
00099   {
00100     boost::mutex::scoped_lock lock(close_mutex_);
00101 
00102     if (closed_)
00103     {
00104       return;
00105     }
00106   }
00107 
00108   if((events & POLLERR) ||
00109      (events & POLLHUP) ||
00110      (events & POLLNVAL))
00111   {
00112     ROSCPP_LOG_DEBUG("Socket %d closed with (ERR|HUP|NVAL) events %d", sock_, events);
00113     close();
00114   }
00115   else
00116   {
00117     if ((events & POLLIN) && expecting_read_)
00118     {
00119       if (read_cb_)
00120       {
00121         read_cb_(shared_from_this());
00122       }
00123     }
00124 
00125     if ((events & POLLOUT) && expecting_write_)
00126     {
00127       if (write_cb_)
00128       {
00129         write_cb_(shared_from_this());
00130       }
00131     }
00132   }
00133 
00134 }
00135 
00136 std::string TransportUDP::getTransportInfo()
00137 {
00138   std::stringstream str;
00139   str << "UDPROS connection on port " << local_port_ << " to [" << cached_remote_host_ << "]";
00140   return str.str();
00141 }
00142 
00143 bool TransportUDP::connect(const std::string& host, int port, int connection_id)
00144 {
00145   if (!isHostAllowed(host))
00146     return false; // adios amigo
00147 
00148   sock_ = socket(AF_INET, SOCK_DGRAM, 0);
00149   connection_id_ = connection_id;
00150 
00151   if (sock_ == ROS_INVALID_SOCKET)
00152   {
00153     ROS_ERROR("socket() failed with error [%s]",  last_socket_error_string());
00154     return false;
00155   }
00156 
00157   sockaddr_in sin;
00158   sin.sin_family = AF_INET;
00159   if (inet_addr(host.c_str()) == INADDR_NONE)
00160   {
00161     struct addrinfo* addr;
00162     struct addrinfo hints;
00163     memset(&hints, 0, sizeof(hints));
00164     hints.ai_family = AF_UNSPEC;
00165 
00166     if (getaddrinfo(host.c_str(), NULL, &hints, &addr) != 0)
00167     {
00168       close();
00169       ROS_ERROR("couldn't resolve host [%s]", host.c_str());
00170       return false;
00171     }
00172 
00173     bool found = false;
00174     struct addrinfo* it = addr;
00175     for (; it; it = it->ai_next)
00176     {
00177       if (it->ai_family == AF_INET)
00178       {
00179         memcpy(&sin, it->ai_addr, it->ai_addrlen);
00180         sin.sin_family = it->ai_family;
00181         sin.sin_port = htons(port);
00182 
00183         found = true;
00184         break;
00185       }
00186     }
00187 
00188     freeaddrinfo(addr);
00189 
00190     if (!found)
00191     {
00192       ROS_ERROR("Couldn't find an AF_INET address for [%s]\n", host.c_str());
00193       return false;
00194     }
00195 
00196     ROSCPP_LOG_DEBUG("Resolved host [%s] to [%s]", host.c_str(), inet_ntoa(sin.sin_addr));
00197   }
00198   else
00199   {
00200     sin.sin_addr.s_addr = inet_addr(host.c_str()); // already an IP addr
00201   }
00202 
00203   sin.sin_port = htons(port);
00204 
00205   if (::connect(sock_, (sockaddr *)&sin, sizeof(sin)))
00206   {
00207     ROSCPP_LOG_DEBUG("Connect to udpros host [%s:%d] failed with error [%s]", host.c_str(), port,  last_socket_error_string());
00208     close();
00209 
00210     return false;
00211   }
00212 
00213   // from daniel stonier:
00214 #ifdef WIN32
00215   // This is hackish, but windows fails at recv() if its slow to connect (e.g. happens with wireless)
00216   // recv() needs to check if its connected or not when its asynchronous?
00217   Sleep(100);
00218 #endif
00219 
00220   std::stringstream ss;
00221   ss << host << ":" << port << " on socket " << sock_;
00222   cached_remote_host_ = ss.str();
00223 
00224   if (!initializeSocket())
00225   {
00226     return false;
00227   }
00228 
00229   ROSCPP_LOG_DEBUG("Connect succeeded to [%s:%d] on socket [%d]", host.c_str(), port, sock_);
00230 
00231   return true;
00232 }
00233 
00234 bool TransportUDP::createIncoming(int port, bool is_server)
00235 {
00236   is_server_ = is_server;
00237 
00238   sock_ = socket(AF_INET, SOCK_DGRAM, 0);
00239 
00240   if (sock_ <= 0)
00241   {
00242     ROS_ERROR("socket() failed with error [%s]", last_socket_error_string());
00243     return false;
00244   }
00245 
00246   server_address_.sin_family = AF_INET;
00247   server_address_.sin_port = htons(port);
00248   server_address_.sin_addr.s_addr = isOnlyLocalhostAllowed() ? 
00249                                     htonl(INADDR_LOOPBACK) :
00250                                     INADDR_ANY;
00251   if (bind(sock_, (sockaddr *)&server_address_, sizeof(server_address_)) < 0)
00252   {
00253     ROS_ERROR("bind() failed with error [%s]",  last_socket_error_string());
00254     return false;
00255   }
00256 
00257   socklen_t len = sizeof(server_address_);
00258   getsockname(sock_, (sockaddr *)&server_address_, &len);
00259   server_port_ = ntohs(server_address_.sin_port);
00260   ROSCPP_LOG_DEBUG("UDPROS server listening on port [%d]", server_port_);
00261 
00262   if (!initializeSocket())
00263   {
00264     return false;
00265   }
00266 
00267   enableRead();
00268 
00269   return true;
00270 }
00271 
00272 bool TransportUDP::initializeSocket()
00273 {
00274   ROS_ASSERT(sock_ != ROS_INVALID_SOCKET);
00275 
00276   if (!(flags_ & SYNCHRONOUS))
00277   {
00278           int result = set_non_blocking(sock_);
00279           if ( result != 0 ) {
00280               ROS_ERROR("setting socket [%d] as non_blocking failed with error [%d]", sock_, result);
00281       close();
00282       return false;
00283     }
00284   }
00285 
00286   socklen_t len = sizeof(local_address_);
00287   getsockname(sock_, (sockaddr *)&local_address_, &len);
00288   local_port_ = ntohs(local_address_.sin_port);
00289 
00290   ROS_ASSERT(poll_set_ || (flags_ & SYNCHRONOUS));
00291   if (poll_set_)
00292   {
00293     poll_set_->addSocket(sock_, boost::bind(&TransportUDP::socketUpdate, this, _1), shared_from_this());
00294   }
00295 
00296   return true;
00297 }
00298 
00299 void TransportUDP::close()
00300 {
00301   Callback disconnect_cb;
00302 
00303   if (!closed_)
00304   {
00305     {
00306       boost::mutex::scoped_lock lock(close_mutex_);
00307 
00308       if (!closed_)
00309       {
00310         closed_ = true;
00311 
00312         ROSCPP_LOG_DEBUG("UDP socket [%d] closed", sock_);
00313 
00314         ROS_ASSERT(sock_ != ROS_INVALID_SOCKET);
00315 
00316         if (poll_set_)
00317         {
00318           poll_set_->delSocket(sock_);
00319         }
00320 
00321         if ( close_socket(sock_) != 0 )
00322         {
00323           ROS_ERROR("Error closing socket [%d]: [%s]", sock_, last_socket_error_string());
00324         }
00325 
00326         sock_ = ROS_INVALID_SOCKET;
00327 
00328         disconnect_cb = disconnect_cb_;
00329 
00330         disconnect_cb_ = Callback();
00331         read_cb_ = Callback();
00332         write_cb_ = Callback();
00333       }
00334     }
00335   }
00336 
00337   if (disconnect_cb)
00338   {
00339     disconnect_cb(shared_from_this());
00340   }
00341 }
00342 
00343 int32_t TransportUDP::read(uint8_t* buffer, uint32_t size)
00344 {
00345   {
00346     boost::mutex::scoped_lock lock(close_mutex_);
00347     if (closed_)
00348     {
00349       ROSCPP_LOG_DEBUG("Tried to read on a closed socket [%d]", sock_);
00350       return -1;
00351     }
00352   }
00353 
00354   ROS_ASSERT((int32_t)size > 0);
00355 
00356   uint32_t bytes_read = 0;
00357 
00358   while (bytes_read < size)
00359   {
00360     TransportUDPHeader header;
00361 
00362     // Get the data either from the reorder buffer or the socket
00363     // copy_bytes will contain the read size.
00364     // from_previous is true if the data belongs to the previous UDP datagram.
00365     uint32_t copy_bytes = 0;
00366     bool from_previous = false;
00367     if (reorder_bytes_)
00368     {
00369       if (reorder_start_ != reorder_buffer_)
00370       {
00371         from_previous = true;
00372       }
00373 
00374       copy_bytes = std::min(size - bytes_read, reorder_bytes_);
00375       header = reorder_header_;
00376       memcpy(buffer + bytes_read, reorder_start_, copy_bytes);
00377       reorder_bytes_ -= copy_bytes;
00378       reorder_start_ += copy_bytes;
00379     }
00380     else
00381     {
00382       if (data_filled_ == 0)
00383       {
00384 #if defined(WIN32)
00385         SSIZE_T num_bytes = 0;
00386         DWORD received_bytes = 0;
00387         DWORD flags = 0;
00388         WSABUF iov[2];
00389         iov[0].buf = reinterpret_cast<char*>(&header);
00390         iov[0].len = sizeof(header);
00391         iov[1].buf = reinterpret_cast<char*>(data_buffer_);
00392         iov[1].len = max_datagram_size_ - sizeof(header);
00393                 int rc  = WSARecv(sock_, iov, 2, &received_bytes, &flags, NULL, NULL);
00394                 if ( rc == SOCKET_ERROR) {
00395                   num_bytes = -1;
00396                 } else {
00397                         num_bytes = received_bytes;
00398                 }
00399 #else
00400         ssize_t num_bytes;
00401         struct iovec iov[2];
00402         iov[0].iov_base = &header;
00403         iov[0].iov_len = sizeof(header);
00404         iov[1].iov_base = data_buffer_;
00405         iov[1].iov_len = max_datagram_size_ - sizeof(header);
00406         // Read a datagram with header
00407         num_bytes = readv(sock_, iov, 2);
00408 #endif
00409         if (num_bytes < 0)
00410         {
00411           if ( last_socket_error_is_would_block() )
00412           {
00413             num_bytes = 0;
00414             break;
00415           }
00416           else
00417           {
00418             ROSCPP_LOG_DEBUG("readv() failed with error [%s]",  last_socket_error_string());
00419             close();
00420             break;
00421           }
00422         }
00423         else if (num_bytes == 0)
00424         {
00425           ROSCPP_LOG_DEBUG("Socket [%d] received 0/%d bytes, closing", sock_, size);
00426           close();
00427           return -1;
00428         }
00429         else if (num_bytes < (unsigned) sizeof(header))
00430         {
00431           ROS_ERROR("Socket [%d] received short header (%d bytes): %s", sock_, int(num_bytes),  last_socket_error_string());
00432           close();
00433           return -1;
00434         }
00435 
00436                 num_bytes -= sizeof(header);
00437         data_filled_ = num_bytes;
00438         data_start_ = data_buffer_;
00439       }
00440       else
00441       {
00442         from_previous = true;
00443       }
00444 
00445       copy_bytes = std::min(size - bytes_read, data_filled_);
00446       // Copy from the data buffer, whether it has data left in it from a previous datagram or
00447       // was just filled by readv()
00448       memcpy(buffer + bytes_read, data_start_, copy_bytes);
00449       data_filled_ -= copy_bytes;
00450       data_start_ += copy_bytes;
00451     }
00452 
00453 
00454     if (from_previous)
00455     {
00456       // We are simply reading data from the last UDP datagram, nothing to
00457       // parse
00458       bytes_read += copy_bytes;
00459     }
00460     else
00461     {
00462       // This datagram is new, process header
00463       switch (header.op_)
00464       {
00465         case ROS_UDP_DATA0:
00466           if (current_message_id_)
00467           {
00468             ROS_DEBUG("Received new message [%d:%d], while still working on [%d] (block %d of %d)", header.message_id_, header.block_, current_message_id_, last_block_ + 1, total_blocks_);
00469             reorder_header_ = header;
00470 
00471             // Copy the entire data buffer to the reorder buffer, as we will
00472             // need to replay this UDP datagram in the next call.
00473             reorder_bytes_ = data_filled_ + (data_start_ - data_buffer_);
00474             memcpy(reorder_buffer_, data_buffer_, reorder_bytes_);
00475             reorder_start_ = reorder_buffer_;
00476             current_message_id_ = 0;
00477             total_blocks_ = 0;
00478             last_block_ = 0;
00479 
00480             data_filled_ = 0;
00481             data_start_ = data_buffer_;
00482             return -1;
00483           }
00484           total_blocks_ = header.block_;
00485           last_block_ = 0;
00486           current_message_id_ = header.message_id_;
00487           break;
00488         case ROS_UDP_DATAN:
00489           if (header.message_id_ != current_message_id_)
00490           {
00491             ROS_DEBUG("Message Id mismatch: %d != %d", header.message_id_, current_message_id_);
00492             data_filled_ = 0; // discard datagram
00493             return 0;
00494           }
00495           if (header.block_ != last_block_ + 1)
00496           {
00497             ROS_DEBUG("Expected block %d, received %d", last_block_ + 1, header.block_);
00498             data_filled_ = 0; // discard datagram
00499             return 0;
00500           }
00501           last_block_ = header.block_;
00502 
00503           break;
00504         default:
00505           ROS_ERROR("Unexpected UDP header OP [%d]", header.op_);
00506           return -1;
00507       }
00508 
00509       bytes_read += copy_bytes;
00510 
00511       if (last_block_ == (total_blocks_ - 1))
00512       {
00513         current_message_id_ = 0;
00514         break;
00515       }
00516     }
00517   }
00518 
00519   return bytes_read;
00520 }
00521 
00522 int32_t TransportUDP::write(uint8_t* buffer, uint32_t size)
00523 {
00524   {
00525     boost::mutex::scoped_lock lock(close_mutex_);
00526 
00527     if (closed_)
00528     {
00529       ROSCPP_LOG_DEBUG("Tried to write on a closed socket [%d]", sock_);
00530       return -1;
00531     }
00532   }
00533 
00534   ROS_ASSERT((int32_t)size > 0);
00535 
00536   const uint32_t max_payload_size = max_datagram_size_ - sizeof(TransportUDPHeader);
00537 
00538   uint32_t bytes_sent = 0;
00539   uint32_t this_block = 0;
00540   if (++current_message_id_ == 0)
00541     ++current_message_id_;
00542   while (bytes_sent < size)
00543   {
00544     TransportUDPHeader header;
00545     header.connection_id_ = connection_id_;
00546     header.message_id_ = current_message_id_;
00547     if (this_block == 0)
00548     {
00549       header.op_ = ROS_UDP_DATA0;
00550       header.block_ = (size + max_payload_size - 1) / max_payload_size;
00551     }
00552     else
00553     {
00554       header.op_ = ROS_UDP_DATAN;
00555       header.block_ = this_block;
00556     }
00557     ++this_block;
00558 #if defined(WIN32)
00559     WSABUF iov[2];
00560         DWORD sent_bytes;
00561         SSIZE_T num_bytes = 0;
00562         DWORD flags = 0;
00563         int rc;
00564         iov[0].buf = reinterpret_cast<char*>(&header);
00565         iov[0].len = sizeof(header);
00566         iov[1].buf = reinterpret_cast<char*>(buffer + bytes_sent);
00567         iov[1].len = std::min(max_payload_size, size - bytes_sent);
00568         rc = WSASend(sock_, iov, 2, &sent_bytes, flags, NULL, NULL);
00569         num_bytes = sent_bytes;
00570         if (rc == SOCKET_ERROR) {
00571             num_bytes = -1;
00572         }
00573 #else
00574     struct iovec iov[2];
00575     iov[0].iov_base = &header;
00576     iov[0].iov_len = sizeof(header);
00577     iov[1].iov_base = buffer + bytes_sent;
00578     iov[1].iov_len = std::min(max_payload_size, size - bytes_sent);
00579     ssize_t num_bytes = writev(sock_, iov, 2);
00580 #endif
00581     //usleep(100);
00582     if (num_bytes < 0)
00583     {
00584       if( !last_socket_error_is_would_block() ) // Actually EAGAIN or EWOULDBLOCK on posix
00585       {
00586         ROSCPP_LOG_DEBUG("writev() failed with error [%s]", last_socket_error_string());
00587         close();
00588         break;
00589       }
00590       else
00591       {
00592         num_bytes = 0;
00593         --this_block;
00594       }
00595     }
00596     else if (num_bytes < (unsigned) sizeof(header))
00597     {
00598       ROSCPP_LOG_DEBUG("Socket [%d] short write (%d bytes), closing", sock_, int(num_bytes));
00599       close();
00600       break;
00601     }
00602     else
00603     {
00604       num_bytes -= sizeof(header);
00605     }
00606     bytes_sent += num_bytes;
00607   }
00608 
00609   return bytes_sent;
00610 }
00611 
00612 void TransportUDP::enableRead()
00613 {
00614   {
00615     boost::mutex::scoped_lock lock(close_mutex_);
00616   
00617     if (closed_)
00618     {
00619       return;
00620     }
00621   }
00622 
00623   if (!expecting_read_)
00624   {
00625     poll_set_->addEvents(sock_, POLLIN);
00626     expecting_read_ = true;
00627   }
00628 }
00629 
00630 void TransportUDP::disableRead()
00631 {
00632   ROS_ASSERT(!(flags_ & SYNCHRONOUS));
00633 
00634   {
00635     boost::mutex::scoped_lock lock(close_mutex_);
00636 
00637     if (closed_)
00638     {
00639       return;
00640     }
00641   }
00642 
00643   if (expecting_read_)
00644   {
00645     poll_set_->delEvents(sock_, POLLIN);
00646     expecting_read_ = false;
00647   }
00648 }
00649 
00650 void TransportUDP::enableWrite()
00651 {
00652   {
00653     boost::mutex::scoped_lock lock(close_mutex_);
00654 
00655     if (closed_)
00656     {
00657       return;
00658     }
00659   }
00660 
00661   if (!expecting_write_)
00662   {
00663     poll_set_->addEvents(sock_, POLLOUT);
00664     expecting_write_ = true;
00665   }
00666 }
00667 
00668 void TransportUDP::disableWrite()
00669 {
00670   {
00671     boost::mutex::scoped_lock lock(close_mutex_);
00672 
00673     if (closed_)
00674     {
00675       return;
00676     }
00677   }
00678 
00679   if (expecting_write_)
00680   {
00681     poll_set_->delEvents(sock_, POLLOUT);
00682     expecting_write_ = false;
00683   }
00684 }
00685 
00686 TransportUDPPtr TransportUDP::createOutgoing(std::string host, int port, int connection_id, int max_datagram_size)
00687 {
00688   ROS_ASSERT(is_server_);
00689   
00690   TransportUDPPtr transport(boost::make_shared<TransportUDP>(poll_set_, flags_, max_datagram_size));
00691   if (!transport->connect(host, port, connection_id))
00692   {
00693     ROS_ERROR("Failed to create outgoing connection");
00694     return TransportUDPPtr();
00695   }
00696   return transport;
00697 
00698 }
00699 
00700 std::string TransportUDP::getClientURI()
00701 {
00702   ROS_ASSERT(!is_server_);
00703 
00704   sockaddr_storage sas;
00705   socklen_t sas_len = sizeof(sas);
00706   getpeername(sock_, (sockaddr *)&sas, &sas_len);
00707 
00708   sockaddr_in *sin = (sockaddr_in *)&sas;
00709 
00710   char namebuf[128];
00711   int port = ntohs(sin->sin_port);
00712   strcpy(namebuf, inet_ntoa(sin->sin_addr));
00713 
00714   std::string ip = namebuf;
00715   std::stringstream uri;
00716   uri << ip << ":" << port;
00717 
00718   return uri.str();
00719 }
00720 
00721 }


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Tue Mar 7 2017 03:44:47