transport_udp.cpp
Go to the documentation of this file.
00001 /*
00002  * Software License Agreement (BSD License)
00003  *
00004  *  Copyright (c) 2008, Willow Garage, Inc.
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions
00009  *  are met:
00010  *
00011  *   * Redistributions of source code must retain the above copyright
00012  *     notice, this list of conditions and the following disclaimer.
00013  *   * Redistributions in binary form must reproduce the above
00014  *     copyright notice, this list of conditions and the following
00015  *     disclaimer in the documentation and/or other materials provided
00016  *     with the distribution.
00017  *   * Neither the name of Willow Garage, Inc. nor the names of its
00018  *     contributors may be used to endorse or promote products derived
00019  *     from this software without specific prior written permission.
00020  *
00021  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024  *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025  *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027  *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028  *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029  *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030  *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031  *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032  *  POSSIBILITY OF SUCH DAMAGE.
00033  */
00034 
00035 #include "ros/transport/transport_udp.h"
00036 #include "ros/poll_set.h"
00037 #include "ros/file_log.h"
00038 
00039 #include <ros/assert.h>
00040 #include <boost/bind.hpp>
00041 
00042 #include <fcntl.h>
00043 #if defined(__APPLE__)
00044   // For readv() and writev()
00045   #include <sys/types.h>
00046   #include <sys/uio.h>
00047   #include <unistd.h>
00048 #endif
00049 
00050 namespace ros
00051 {
00052 
00053 TransportUDP::TransportUDP(PollSet* poll_set, int flags, int max_datagram_size)
00054 : sock_(-1)
00055 , closed_(false)
00056 , expecting_read_(false)
00057 , expecting_write_(false)
00058 , is_server_(false)
00059 , server_port_(-1)
00060 , poll_set_(poll_set)
00061 , flags_(flags)
00062 , connection_id_(0)
00063 , current_message_id_(0)
00064 , total_blocks_(0)
00065 , last_block_(0)
00066 , max_datagram_size_(max_datagram_size)
00067 , data_filled_(0)
00068 , reorder_buffer_(0)
00069 , reorder_bytes_(0)
00070 {
00071   // This may eventually be machine dependent
00072   if (max_datagram_size_ == 0)
00073     max_datagram_size_ = 1500;
00074   reorder_buffer_ = new uint8_t[max_datagram_size_];
00075   reorder_start_ = reorder_buffer_;
00076   data_buffer_ = new uint8_t[max_datagram_size_];
00077   data_start_ = data_buffer_;
00078 }
00079 
00080 TransportUDP::~TransportUDP()
00081 {
00082   ROS_ASSERT_MSG(sock_ == ROS_INVALID_SOCKET, "TransportUDP socket [%d] was never closed", sock_);
00083   delete [] reorder_buffer_;
00084   delete [] data_buffer_;
00085 }
00086 
00087 bool TransportUDP::setSocket(int sock)
00088 {
00089   sock_ = sock;
00090   return initializeSocket();
00091 }
00092 
00093 void TransportUDP::socketUpdate(int events)
00094 {
00095   {
00096     boost::mutex::scoped_lock lock(close_mutex_);
00097 
00098     if (closed_)
00099     {
00100       return;
00101     }
00102   }
00103 
00104   if((events & POLLERR) ||
00105      (events & POLLHUP) ||
00106      (events & POLLNVAL))
00107   {
00108     ROSCPP_LOG_DEBUG("Socket %d closed with (ERR|HUP|NVAL) events %d", sock_, events);
00109     close();
00110   }
00111   else
00112   {
00113     if ((events & POLLIN) && expecting_read_)
00114     {
00115       if (read_cb_)
00116       {
00117         read_cb_(shared_from_this());
00118       }
00119     }
00120 
00121     if ((events & POLLOUT) && expecting_write_)
00122     {
00123       if (write_cb_)
00124       {
00125         write_cb_(shared_from_this());
00126       }
00127     }
00128   }
00129 
00130 }
00131 
00132 std::string TransportUDP::getTransportInfo()
00133 {
00134   return "UDPROS connection to [" + cached_remote_host_ + "]";
00135 }
00136 
00137 bool TransportUDP::connect(const std::string& host, int port, int connection_id)
00138 {
00139   sock_ = socket(AF_INET, SOCK_DGRAM, 0);
00140   connection_id_ = connection_id;
00141 
00142   if (sock_ == ROS_INVALID_SOCKET)
00143   {
00144     ROS_ERROR("socket() failed with error [%s]",  last_socket_error_string());
00145     return false;
00146   }
00147 
00148   sockaddr_in sin;
00149   sin.sin_family = AF_INET;
00150   if (inet_addr(host.c_str()) == INADDR_NONE)
00151   {
00152     struct addrinfo* addr;
00153     struct addrinfo hints;
00154     memset(&hints, 0, sizeof(hints));
00155     hints.ai_family = AF_UNSPEC;
00156 
00157     if (getaddrinfo(host.c_str(), NULL, &hints, &addr) != 0)
00158     {
00159       close();
00160       ROS_ERROR("couldn't resolve host [%s]", host.c_str());
00161       return false;
00162     }
00163 
00164     bool found = false;
00165     struct addrinfo* it = addr;
00166     for (; it; it = it->ai_next)
00167     {
00168       if (it->ai_family == AF_INET)
00169       {
00170         memcpy(&sin, it->ai_addr, it->ai_addrlen);
00171         sin.sin_family = it->ai_family;
00172         sin.sin_port = htons(port);
00173 
00174         found = true;
00175         break;
00176       }
00177     }
00178 
00179     freeaddrinfo(addr);
00180 
00181     if (!found)
00182     {
00183       ROS_ERROR("Couldn't find an AF_INET address for [%s]\n", host.c_str());
00184       return false;
00185     }
00186 
00187     ROSCPP_LOG_DEBUG("Resolved host [%s] to [%s]", host.c_str(), inet_ntoa(sin.sin_addr));
00188   }
00189   else
00190   {
00191     sin.sin_addr.s_addr = inet_addr(host.c_str()); // already an IP addr
00192   }
00193 
00194   sin.sin_port = htons(port);
00195 
00196   if (::connect(sock_, (sockaddr *)&sin, sizeof(sin)))
00197   {
00198     ROSCPP_LOG_DEBUG("Connect to udpros host [%s:%d] failed with error [%s]", host.c_str(), port,  last_socket_error_string());
00199     close();
00200 
00201     return false;
00202   }
00203 
00204   // from daniel stonier:
00205 #ifdef WIN32
00206   // This is hackish, but windows fails at recv() if its slow to connect (e.g. happens with wireless)
00207   // recv() needs to check if its connected or not when its asynchronous?
00208   Sleep(100);
00209 #endif
00210 
00211   if (!initializeSocket())
00212   {
00213     return false;
00214   }
00215 
00216   ROSCPP_LOG_DEBUG("Connect succeeded to [%s:%d] on socket [%d]", host.c_str(), port, sock_);
00217 
00218   return true;
00219 }
00220 
00221 bool TransportUDP::createIncoming(int port, bool is_server)
00222 {
00223   is_server_ = is_server;
00224 
00225   sock_ = socket(AF_INET, SOCK_DGRAM, 0);
00226 
00227   if (sock_ <= 0)
00228   {
00229     ROS_ERROR("socket() failed with error [%s]", last_socket_error_string());
00230     return false;
00231   }
00232 
00233   server_address_.sin_family = AF_INET;
00234   server_address_.sin_port = htons(port);
00235   server_address_.sin_addr.s_addr = INADDR_ANY;
00236   if (bind(sock_, (sockaddr *)&server_address_, sizeof(server_address_)) < 0)
00237   {
00238     ROS_ERROR("bind() failed with error [%s]",  last_socket_error_string());
00239     return false;
00240   }
00241 
00242   socklen_t len = sizeof(server_address_);
00243   getsockname(sock_, (sockaddr *)&server_address_, &len);
00244   server_port_ = ntohs(server_address_.sin_port);
00245   ROSCPP_LOG_DEBUG("UDPROS server listening on port [%d]", server_port_);
00246 
00247   if (!initializeSocket())
00248   {
00249     return false;
00250   }
00251 
00252   enableRead();
00253 
00254   return true;
00255 }
00256 
00257 bool TransportUDP::initializeSocket()
00258 {
00259   ROS_ASSERT(sock_ != ROS_INVALID_SOCKET);
00260 
00261   if (!(flags_ & SYNCHRONOUS))
00262   {
00263           int result = set_non_blocking(sock_);
00264           if ( result != 0 ) {
00265               ROS_ERROR("setting socket [%d] as non_blocking failed with error [%d]", sock_, result);
00266       close();
00267       return false;
00268     }
00269   }
00270 
00271   ROS_ASSERT(poll_set_ || (flags_ & SYNCHRONOUS));
00272   if (poll_set_)
00273   {
00274     poll_set_->addSocket(sock_, boost::bind(&TransportUDP::socketUpdate, this, _1), shared_from_this());
00275   }
00276 
00277   return true;
00278 }
00279 
00280 void TransportUDP::close()
00281 {
00282   Callback disconnect_cb;
00283 
00284   if (!closed_)
00285   {
00286     {
00287       boost::mutex::scoped_lock lock(close_mutex_);
00288 
00289       if (!closed_)
00290       {
00291         closed_ = true;
00292 
00293         ROSCPP_LOG_DEBUG("UDP socket [%d] closed", sock_);
00294 
00295         ROS_ASSERT(sock_ != ROS_INVALID_SOCKET);
00296 
00297         if (poll_set_)
00298         {
00299           poll_set_->delSocket(sock_);
00300         }
00301 
00302         if ( close_socket(sock_) != 0 )
00303         {
00304           ROS_ERROR("Error closing socket [%d]: [%s]", sock_, last_socket_error_string());
00305         }
00306 
00307         sock_ = ROS_INVALID_SOCKET;
00308 
00309         disconnect_cb = disconnect_cb_;
00310 
00311         disconnect_cb_ = Callback();
00312         read_cb_ = Callback();
00313         write_cb_ = Callback();
00314       }
00315     }
00316   }
00317 
00318   if (disconnect_cb)
00319   {
00320     disconnect_cb(shared_from_this());
00321   }
00322 }
00323 
00324 int32_t TransportUDP::read(uint8_t* buffer, uint32_t size)
00325 {
00326   {
00327     boost::mutex::scoped_lock lock(close_mutex_);
00328     if (closed_)
00329     {
00330       ROSCPP_LOG_DEBUG("Tried to read on a closed socket [%d]", sock_);
00331       return -1;
00332     }
00333   }
00334 
00335   ROS_ASSERT((int32_t)size > 0);
00336 
00337   uint32_t bytes_read = 0;
00338 
00339   while (bytes_read < size)
00340   {
00341     TransportUDPHeader header;
00342 
00343     // Get the data either from the reorder buffer or the socket
00344     // copy_bytes will contain the read size.
00345     // from_previous is true if the data belongs to the previous UDP datagram.
00346     uint32_t copy_bytes = 0;
00347     bool from_previous = false;
00348     if (reorder_bytes_)
00349     {
00350       if (reorder_start_ != reorder_buffer_)
00351       {
00352         from_previous = true;
00353       }
00354 
00355       copy_bytes = std::min(size - bytes_read, reorder_bytes_);
00356       header = reorder_header_;
00357       memcpy(buffer + bytes_read, reorder_start_, copy_bytes);
00358       reorder_bytes_ -= copy_bytes;
00359       reorder_start_ += copy_bytes;
00360     }
00361     else
00362     {
00363       if (data_filled_ == 0)
00364       {
00365 #if defined(WIN32)
00366         SSIZE_T num_bytes = 0;
00367         DWORD received_bytes = 0;
00368         DWORD flags = 0;
00369         WSABUF iov[2];
00370         iov[0].buf = reinterpret_cast<char*>(&header);
00371         iov[0].len = sizeof(header);
00372         iov[1].buf = reinterpret_cast<char*>(data_buffer_);
00373         iov[1].len = max_datagram_size_ - sizeof(header);
00374                 int rc  = WSARecv(sock_, iov, 2, &received_bytes, &flags, NULL, NULL);
00375                 if ( rc == SOCKET_ERROR) {
00376                   num_bytes = -1;
00377                 } else {
00378                         num_bytes = received_bytes;
00379                 }
00380 #else
00381         ssize_t num_bytes;
00382         struct iovec iov[2];
00383         iov[0].iov_base = &header;
00384         iov[0].iov_len = sizeof(header);
00385         iov[1].iov_base = data_buffer_;
00386         iov[1].iov_len = max_datagram_size_ - sizeof(header);
00387         // Read a datagram with header
00388         num_bytes = readv(sock_, iov, 2);
00389 #endif
00390         if (num_bytes < 0)
00391         {
00392           if ( last_socket_error_is_would_block() )
00393           {
00394             num_bytes = 0;
00395             break;
00396           }
00397           else
00398           {
00399             ROSCPP_LOG_DEBUG("readv() failed with error [%s]",  last_socket_error_string());
00400             close();
00401             break;
00402           }
00403         }
00404         else if (num_bytes == 0)
00405         {
00406           ROSCPP_LOG_DEBUG("Socket [%d] received 0/%d bytes, closing", sock_, size);
00407           close();
00408           return -1;
00409         }
00410         else if (num_bytes < (unsigned) sizeof(header))
00411         {
00412           ROS_ERROR("Socket [%d] received short header (%d bytes): %s", sock_, int(num_bytes),  last_socket_error_string());
00413           close();
00414           return -1;
00415         }
00416 
00417                 num_bytes -= sizeof(header);
00418         data_filled_ = num_bytes;
00419         data_start_ = data_buffer_;
00420       }
00421       else
00422       {
00423         from_previous = true;
00424       }
00425 
00426       copy_bytes = std::min(size - bytes_read, data_filled_);
00427       // Copy from the data buffer, whether it has data left in it from a previous datagram or
00428       // was just filled by readv()
00429       memcpy(buffer + bytes_read, data_start_, copy_bytes);
00430       data_filled_ -= copy_bytes;
00431       data_start_ += copy_bytes;
00432     }
00433 
00434 
00435     if (from_previous)
00436     {
00437       // We are simply reading data from the last UDP datagram, nothing to
00438       // parse
00439       bytes_read += copy_bytes;
00440     }
00441     else
00442     {
00443       // This datagram is new, process header
00444       switch (header.op_)
00445       {
00446         case ROS_UDP_DATA0:
00447           if (current_message_id_)
00448           {
00449             ROS_DEBUG("Received new message [%d:%d], while still working on [%d] (block %d of %d)", header.message_id_, header.block_, current_message_id_, last_block_ + 1, total_blocks_);
00450             reorder_header_ = header;
00451 
00452             // Copy the entire data buffer to the reorder buffer, as we will
00453             // need to replay this UDP datagram in the next call.
00454             reorder_bytes_ = data_filled_ + (data_start_ - data_buffer_);
00455             memcpy(reorder_buffer_, data_buffer_, reorder_bytes_);
00456             reorder_start_ = reorder_buffer_;
00457             current_message_id_ = 0;
00458             total_blocks_ = 0;
00459             last_block_ = 0;
00460 
00461             data_filled_ = 0;
00462             data_start_ = data_buffer_;
00463             return -1;
00464           }
00465           total_blocks_ = header.block_;
00466           last_block_ = 0;
00467           current_message_id_ = header.message_id_;
00468           break;
00469         case ROS_UDP_DATAN:
00470           if (header.message_id_ != current_message_id_)
00471           {
00472             ROS_DEBUG("Message Id mismatch: %d != %d", header.message_id_, current_message_id_);
00473             data_filled_ = 0; // discard datagram
00474             return 0;
00475           }
00476           if (header.block_ != last_block_ + 1)
00477           {
00478             ROS_DEBUG("Expected block %d, received %d", last_block_ + 1, header.block_);
00479             data_filled_ = 0; // discard datagram
00480             return 0;
00481           }
00482           last_block_ = header.block_;
00483 
00484           break;
00485         default:
00486           ROS_ERROR("Unexpected UDP header OP [%d]", header.op_);
00487           return -1;
00488       }
00489 
00490       bytes_read += copy_bytes;
00491 
00492       if (last_block_ == (total_blocks_ - 1))
00493       {
00494         current_message_id_ = 0;
00495         break;
00496       }
00497     }
00498   }
00499 
00500   return bytes_read;
00501 }
00502 
00503 int32_t TransportUDP::write(uint8_t* buffer, uint32_t size)
00504 {
00505   {
00506     boost::mutex::scoped_lock lock(close_mutex_);
00507 
00508     if (closed_)
00509     {
00510       ROSCPP_LOG_DEBUG("Tried to write on a closed socket [%d]", sock_);
00511       return -1;
00512     }
00513   }
00514 
00515   ROS_ASSERT((int32_t)size > 0);
00516 
00517   const uint32_t max_payload_size = max_datagram_size_ - sizeof(TransportUDPHeader);
00518 
00519   uint32_t bytes_sent = 0;
00520   uint32_t this_block = 0;
00521   if (++current_message_id_ == 0)
00522     ++current_message_id_;
00523   while (bytes_sent < size)
00524   {
00525     TransportUDPHeader header;
00526     header.connection_id_ = connection_id_;
00527     header.message_id_ = current_message_id_;
00528     if (this_block == 0)
00529     {
00530       header.op_ = ROS_UDP_DATA0;
00531       header.block_ = (size + max_payload_size - 1) / max_payload_size;
00532     }
00533     else
00534     {
00535       header.op_ = ROS_UDP_DATAN;
00536       header.block_ = this_block;
00537     }
00538     ++this_block;
00539 #if defined(WIN32)
00540     WSABUF iov[2];
00541         DWORD sent_bytes;
00542         SSIZE_T num_bytes = 0;
00543         DWORD flags = 0;
00544         int rc;
00545         iov[0].buf = reinterpret_cast<char*>(&header);
00546         iov[0].len = sizeof(header);
00547         iov[1].buf = reinterpret_cast<char*>(buffer + bytes_sent);
00548         iov[1].len = std::min(max_payload_size, size - bytes_sent);
00549         rc = WSASend(sock_, iov, 2, &sent_bytes, flags, NULL, NULL);
00550         num_bytes = sent_bytes;
00551         if (rc == SOCKET_ERROR) {
00552             num_bytes = -1;
00553         }
00554 #else
00555     struct iovec iov[2];
00556     iov[0].iov_base = &header;
00557     iov[0].iov_len = sizeof(header);
00558     iov[1].iov_base = buffer + bytes_sent;
00559     iov[1].iov_len = std::min(max_payload_size, size - bytes_sent);
00560     ssize_t num_bytes = writev(sock_, iov, 2);
00561 #endif
00562     //usleep(100);
00563     if (num_bytes < 0)
00564     {
00565       if( !last_socket_error_is_would_block() ) // Actually EAGAIN or EWOULDBLOCK on posix
00566       {
00567         ROSCPP_LOG_DEBUG("writev() failed with error [%s]", last_socket_error_string());
00568         close();
00569         break;
00570       }
00571       else
00572       {
00573         num_bytes = 0;
00574       }
00575     }
00576     else if (num_bytes < (unsigned) sizeof(header))
00577     {
00578       ROSCPP_LOG_DEBUG("Socket [%d] short write (%d bytes), closing", sock_, int(num_bytes));
00579       close();
00580       break;
00581     }
00582     else
00583     {
00584       num_bytes -= sizeof(header);
00585     }
00586     bytes_sent += num_bytes;
00587   }
00588 
00589   return bytes_sent;
00590 }
00591 
00592 void TransportUDP::enableRead()
00593 {
00594   {
00595     boost::mutex::scoped_lock lock(close_mutex_);
00596   
00597     if (closed_)
00598     {
00599       return;
00600     }
00601   }
00602 
00603   if (!expecting_read_)
00604   {
00605     poll_set_->addEvents(sock_, POLLIN);
00606     expecting_read_ = true;
00607   }
00608 }
00609 
00610 void TransportUDP::disableRead()
00611 {
00612   ROS_ASSERT(!(flags_ & SYNCHRONOUS));
00613 
00614   {
00615     boost::mutex::scoped_lock lock(close_mutex_);
00616 
00617     if (closed_)
00618     {
00619       return;
00620     }
00621   }
00622 
00623   if (expecting_read_)
00624   {
00625     poll_set_->delEvents(sock_, POLLIN);
00626     expecting_read_ = false;
00627   }
00628 }
00629 
00630 void TransportUDP::enableWrite()
00631 {
00632   {
00633     boost::mutex::scoped_lock lock(close_mutex_);
00634 
00635     if (closed_)
00636     {
00637       return;
00638     }
00639   }
00640 
00641   if (!expecting_write_)
00642   {
00643     poll_set_->addEvents(sock_, POLLOUT);
00644     expecting_write_ = true;
00645   }
00646 }
00647 
00648 void TransportUDP::disableWrite()
00649 {
00650   {
00651     boost::mutex::scoped_lock lock(close_mutex_);
00652 
00653     if (closed_)
00654     {
00655       return;
00656     }
00657   }
00658 
00659   if (expecting_write_)
00660   {
00661     poll_set_->delEvents(sock_, POLLOUT);
00662     expecting_write_ = false;
00663   }
00664 }
00665 
00666 TransportUDPPtr TransportUDP::createOutgoing(std::string host, int port, int connection_id, int max_datagram_size)
00667 {
00668   ROS_ASSERT(is_server_);
00669   
00670   TransportUDPPtr transport(new TransportUDP(poll_set_, flags_, max_datagram_size));
00671   if (!transport->connect(host, port, connection_id))
00672   {
00673     ROS_ERROR("Failed to create outgoing connection");
00674     return TransportUDPPtr();
00675   }
00676   return transport;
00677 
00678 }
00679 
00680 }


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Fri Aug 28 2015 12:33:11