transport_udp.cpp
Go to the documentation of this file.
00001 /*
00002  * Software License Agreement (BSD License)
00003  *
00004  *  Copyright (c) 2008, Willow Garage, Inc.
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions
00009  *  are met:
00010  *
00011  *   * Redistributions of source code must retain the above copyright
00012  *     notice, this list of conditions and the following disclaimer.
00013  *   * Redistributions in binary form must reproduce the above
00014  *     copyright notice, this list of conditions and the following
00015  *     disclaimer in the documentation and/or other materials provided
00016  *     with the distribution.
00017  *   * Neither the name of Willow Garage, Inc. nor the names of its
00018  *     contributors may be used to endorse or promote products derived
00019  *     from this software without specific prior written permission.
00020  *
00021  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024  *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025  *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027  *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028  *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029  *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030  *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031  *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032  *  POSSIBILITY OF SUCH DAMAGE.
00033  */
00034 
00035 #include "ros/transport/transport_udp.h"
00036 #include "ros/poll_set.h"
00037 #include "ros/file_log.h"
00038 
00039 #include <ros/assert.h>
00040 #include <boost/bind.hpp>
00041 
00042 #include <fcntl.h>
00043 #if defined(__APPLE__)
00044   // For readv() and writev()
00045   #include <sys/types.h>
00046   #include <sys/uio.h>
00047   #include <unistd.h>
00048 #endif
00049 
00050 namespace ros
00051 {
00052 
00053 TransportUDP::TransportUDP(PollSet* poll_set, int flags, int max_datagram_size)
00054 : sock_(-1)
00055 , closed_(false)
00056 , expecting_read_(false)
00057 , expecting_write_(false)
00058 , is_server_(false)
00059 , server_port_(-1)
00060 , poll_set_(poll_set)
00061 , flags_(flags)
00062 , connection_id_(0)
00063 , current_message_id_(0)
00064 , total_blocks_(0)
00065 , last_block_(0)
00066 , max_datagram_size_(max_datagram_size)
00067 , data_filled_(0)
00068 , reorder_buffer_(0)
00069 , reorder_bytes_(0)
00070 {
00071   // This may eventually be machine dependent
00072   if (max_datagram_size_ == 0)
00073     max_datagram_size_ = 1500;
00074   reorder_buffer_ = new uint8_t[max_datagram_size_];
00075   reorder_start_ = reorder_buffer_;
00076   data_buffer_ = new uint8_t[max_datagram_size_];
00077   data_start_ = data_buffer_;
00078 }
00079 
00080 TransportUDP::~TransportUDP()
00081 {
00082   ROS_ASSERT_MSG(sock_ == ROS_INVALID_SOCKET, "TransportUDP socket [%d] was never closed", sock_);
00083   delete [] reorder_buffer_;
00084   delete [] data_buffer_;
00085 }
00086 
00087 bool TransportUDP::setSocket(int sock)
00088 {
00089   sock_ = sock;
00090   return initializeSocket();
00091 }
00092 
00093 void TransportUDP::socketUpdate(int events)
00094 {
00095   {
00096     boost::mutex::scoped_lock lock(close_mutex_);
00097 
00098     if (closed_)
00099     {
00100       return;
00101     }
00102   }
00103 
00104   if((events & POLLERR) ||
00105      (events & POLLHUP) ||
00106      (events & POLLNVAL))
00107   {
00108     ROSCPP_LOG_DEBUG("Socket %d closed with (ERR|HUP|NVAL) events %d", sock_, events);
00109     close();
00110   }
00111   else
00112   {
00113     if ((events & POLLIN) && expecting_read_)
00114     {
00115       if (read_cb_)
00116       {
00117         read_cb_(shared_from_this());
00118       }
00119     }
00120 
00121     if ((events & POLLOUT) && expecting_write_)
00122     {
00123       if (write_cb_)
00124       {
00125         write_cb_(shared_from_this());
00126       }
00127     }
00128   }
00129 
00130 }
00131 
00132 std::string TransportUDP::getTransportInfo()
00133 {
00134   return "UDPROS connection to [" + cached_remote_host_ + "]";
00135 }
00136 
00137 bool TransportUDP::connect(const std::string& host, int port, int connection_id)
00138 {
00139   sock_ = socket(AF_INET, SOCK_DGRAM, 0);
00140   connection_id_ = connection_id;
00141 
00142   if (sock_ == ROS_INVALID_SOCKET)
00143   {
00144     ROS_ERROR("socket() failed with error [%s]",  last_socket_error_string());
00145     return false;
00146   }
00147 
00148   sockaddr_in sin;
00149   sin.sin_family = AF_INET;
00150   if (inet_addr(host.c_str()) == INADDR_NONE)
00151   {
00152     struct addrinfo* addr;
00153     if (getaddrinfo(host.c_str(), NULL, NULL, &addr) != 0)
00154     {
00155       close();
00156       ROS_ERROR("couldn't resolve host [%s]", host.c_str());
00157       return false;
00158     }
00159 
00160     bool found = false;
00161     struct addrinfo* it = addr;
00162     for (; it; it = it->ai_next)
00163     {
00164       if (it->ai_family == AF_INET)
00165       {
00166         memcpy(&sin, it->ai_addr, it->ai_addrlen);
00167         sin.sin_family = it->ai_family;
00168         sin.sin_port = htons(port);
00169 
00170         found = true;
00171         break;
00172       }
00173     }
00174 
00175     freeaddrinfo(addr);
00176 
00177     if (!found)
00178     {
00179       ROS_ERROR("Couldn't find an AF_INET address for [%s]\n", host.c_str());
00180       return false;
00181     }
00182 
00183     ROSCPP_LOG_DEBUG("Resolved host [%s] to [%s]", host.c_str(), inet_ntoa(sin.sin_addr));
00184   }
00185   else
00186   {
00187     sin.sin_addr.s_addr = inet_addr(host.c_str()); // already an IP addr
00188   }
00189 
00190   sin.sin_port = htons(port);
00191 
00192   if (::connect(sock_, (sockaddr *)&sin, sizeof(sin)))
00193   {
00194     ROSCPP_LOG_DEBUG("Connect to udpros host [%s:%d] failed with error [%s]", host.c_str(), port,  last_socket_error_string());
00195     close();
00196 
00197     return false;
00198   }
00199 
00200   // from daniel stonier:
00201 #ifdef WIN32
00202   // This is hackish, but windows fails at recv() if its slow to connect (e.g. happens with wireless)
00203   // recv() needs to check if its connected or not when its asynchronous?
00204   Sleep(100);
00205 #endif
00206 
00207   if (!initializeSocket())
00208   {
00209     return false;
00210   }
00211 
00212   ROSCPP_LOG_DEBUG("Connect succeeded to [%s:%d] on socket [%d]", host.c_str(), port, sock_);
00213 
00214   return true;
00215 }
00216 
00217 bool TransportUDP::createIncoming(int port, bool is_server)
00218 {
00219   is_server_ = is_server;
00220 
00221   sock_ = socket(AF_INET, SOCK_DGRAM, 0);
00222 
00223   if (sock_ <= 0)
00224   {
00225     ROS_ERROR("socket() failed with error [%s]", last_socket_error_string());
00226     return false;
00227   }
00228 
00229   server_address_.sin_family = AF_INET;
00230   server_address_.sin_port = htons(port);
00231   server_address_.sin_addr.s_addr = INADDR_ANY;
00232   if (bind(sock_, (sockaddr *)&server_address_, sizeof(server_address_)) < 0)
00233   {
00234     ROS_ERROR("bind() failed with error [%s]",  last_socket_error_string());
00235     return false;
00236   }
00237 
00238   socklen_t len = sizeof(server_address_);
00239   getsockname(sock_, (sockaddr *)&server_address_, &len);
00240   server_port_ = ntohs(server_address_.sin_port);
00241   ROSCPP_LOG_DEBUG("UDPROS server listening on port [%d]", server_port_);
00242 
00243   if (!initializeSocket())
00244   {
00245     return false;
00246   }
00247 
00248   enableRead();
00249 
00250   return true;
00251 }
00252 
00253 bool TransportUDP::initializeSocket()
00254 {
00255   ROS_ASSERT(sock_ != ROS_INVALID_SOCKET);
00256 
00257   if (!(flags_ & SYNCHRONOUS))
00258   {
00259           int result = set_non_blocking(sock_);
00260           if ( result != 0 ) {
00261               ROS_ERROR("setting socket [%d] as non_blocking failed with error [%d]", sock_, result);
00262       close();
00263       return false;
00264     }
00265   }
00266 
00267   ROS_ASSERT(poll_set_ || (flags_ & SYNCHRONOUS));
00268   if (poll_set_)
00269   {
00270     poll_set_->addSocket(sock_, boost::bind(&TransportUDP::socketUpdate, this, _1), shared_from_this());
00271   }
00272 
00273   return true;
00274 }
00275 
00276 void TransportUDP::close()
00277 {
00278   Callback disconnect_cb;
00279 
00280   if (!closed_)
00281   {
00282     {
00283       boost::mutex::scoped_lock lock(close_mutex_);
00284 
00285       if (!closed_)
00286       {
00287         closed_ = true;
00288 
00289         ROSCPP_LOG_DEBUG("UDP socket [%d] closed", sock_);
00290 
00291         ROS_ASSERT(sock_ != ROS_INVALID_SOCKET);
00292 
00293         if (poll_set_)
00294         {
00295           poll_set_->delSocket(sock_);
00296         }
00297 
00298         if ( close_socket(sock_) != 0 )
00299         {
00300           ROS_ERROR("Error closing socket [%d]: [%s]", sock_, last_socket_error_string());
00301         }
00302 
00303         sock_ = ROS_INVALID_SOCKET;
00304 
00305         disconnect_cb = disconnect_cb_;
00306 
00307         disconnect_cb_ = Callback();
00308         read_cb_ = Callback();
00309         write_cb_ = Callback();
00310       }
00311     }
00312   }
00313 
00314   if (disconnect_cb)
00315   {
00316     disconnect_cb(shared_from_this());
00317   }
00318 }
00319 
00320 int32_t TransportUDP::read(uint8_t* buffer, uint32_t size)
00321 {
00322   {
00323     boost::mutex::scoped_lock lock(close_mutex_);
00324     if (closed_)
00325     {
00326       ROSCPP_LOG_DEBUG("Tried to read on a closed socket [%d]", sock_);
00327       return -1;
00328     }
00329   }
00330 
00331   ROS_ASSERT((int32_t)size > 0);
00332 
00333   uint32_t bytes_read = 0;
00334 
00335   while (bytes_read < size)
00336   {
00337     TransportUDPHeader header;
00338 
00339     uint32_t copy_bytes = 0;
00340     bool from_previous = false;
00341     if (reorder_bytes_)
00342     {
00343       if (reorder_start_ != reorder_buffer_)
00344       {
00345         from_previous = true;
00346       }
00347 
00348       copy_bytes = std::min(size - bytes_read, reorder_bytes_);
00349       header = reorder_header_;
00350       memcpy(buffer + bytes_read, reorder_start_, copy_bytes);
00351       reorder_bytes_ -= copy_bytes;
00352       reorder_start_ += copy_bytes;
00353     }
00354     else
00355     {
00356       if (data_filled_ == 0)
00357       {
00358 #if defined(WIN32)
00359         SSIZE_T num_bytes = 0;
00360         DWORD received_bytes = 0;
00361         DWORD flags = 0;
00362         WSABUF iov[2];
00363         iov[0].buf = reinterpret_cast<char*>(&header);
00364         iov[0].len = sizeof(header);
00365         iov[1].buf = reinterpret_cast<char*>(data_buffer_);
00366         iov[1].len = max_datagram_size_ - sizeof(header);
00367                 int rc  = WSARecv(sock_, iov, 2, &received_bytes, &flags, NULL, NULL);
00368                 if ( rc == SOCKET_ERROR) {
00369                   num_bytes = -1;
00370                 } else {
00371                         num_bytes = received_bytes;
00372                 }
00373 #else
00374         ssize_t num_bytes;
00375         struct iovec iov[2];
00376         iov[0].iov_base = &header;
00377         iov[0].iov_len = sizeof(header);
00378         iov[1].iov_base = data_buffer_;
00379         iov[1].iov_len = max_datagram_size_ - sizeof(header);
00380         // Read a datagram with header
00381         num_bytes = readv(sock_, iov, 2);
00382 #endif
00383         if (num_bytes < 0)
00384         {
00385           if ( last_socket_error_is_would_block() )
00386           {
00387             num_bytes = 0;
00388             break;
00389           }
00390           else
00391           {
00392             ROSCPP_LOG_DEBUG("readv() failed with error [%s]",  last_socket_error_string());
00393             close();
00394             break;
00395           }
00396         }
00397         else if (num_bytes == 0)
00398         {
00399           ROSCPP_LOG_DEBUG("Socket [%d] received 0/%d bytes, closing", sock_, size);
00400           close();
00401           return -1;
00402         }
00403         else if (num_bytes < (unsigned) sizeof(header))
00404         {
00405           ROS_ERROR("Socket [%d] received short header (%d bytes): %s", sock_, int(num_bytes),  last_socket_error_string());
00406           close();
00407           return -1;
00408         }
00409 
00410                 num_bytes -= sizeof(header);
00411         data_filled_ = num_bytes;
00412         data_start_ = data_buffer_;
00413       }
00414       else
00415       {
00416         from_previous = true;
00417       }
00418 
00419       copy_bytes = std::min(size - bytes_read, data_filled_);
00420       // Copy from the data buffer, whether it has data left in it from a previous datagram or
00421       // was just filled by readv()
00422       memcpy(buffer + bytes_read, data_start_, copy_bytes);
00423       data_filled_ = std::max((int64_t)0, (int64_t)data_filled_ - (int64_t)size);
00424       data_start_ += copy_bytes;
00425     }
00426 
00427 
00428     if (from_previous)
00429     {
00430       bytes_read += copy_bytes;
00431     }
00432     else
00433     {
00434       // Process header
00435       switch (header.op_)
00436       {
00437         case ROS_UDP_DATA0:
00438           if (current_message_id_)
00439           {
00440             ROS_DEBUG("Received new message [%d:%d], while still working on [%d] (block %d of %d)", header.message_id_, header.block_, current_message_id_, last_block_ + 1, total_blocks_);
00441             reorder_header_ = header;
00442             reorder_bytes_ = copy_bytes;
00443             memcpy(reorder_buffer_, buffer + bytes_read, copy_bytes);
00444             reorder_start_ = reorder_buffer_;
00445             current_message_id_ = 0;
00446             total_blocks_ = 0;
00447             last_block_ = 0;
00448 
00449             data_filled_ = 0;
00450             data_start_ = data_buffer_;
00451             return -1;
00452           }
00453           total_blocks_ = header.block_;
00454           last_block_ = 0;
00455           current_message_id_ = header.message_id_;
00456           break;
00457         case ROS_UDP_DATAN:
00458           if (header.message_id_ != current_message_id_)
00459           {
00460             ROS_DEBUG("Message Id mismatch: %d != %d", header.message_id_, current_message_id_);
00461             return 0;
00462           }
00463           if (header.block_ != last_block_ + 1)
00464           {
00465             ROS_DEBUG("Expected block %d, received %d", last_block_ + 1, header.block_);
00466             return 0;
00467           }
00468           last_block_ = header.block_;
00469 
00470           break;
00471         default:
00472           ROS_ERROR("Unexpected UDP header OP [%d]", header.op_);
00473           return -1;
00474       }
00475 
00476       bytes_read += copy_bytes;
00477 
00478       if (last_block_ == (total_blocks_ - 1))
00479       {
00480         current_message_id_ = 0;
00481         break;
00482       }
00483     }
00484   }
00485 
00486   return bytes_read;
00487 }
00488 
00489 int32_t TransportUDP::write(uint8_t* buffer, uint32_t size)
00490 {
00491   {
00492     boost::mutex::scoped_lock lock(close_mutex_);
00493 
00494     if (closed_)
00495     {
00496       ROSCPP_LOG_DEBUG("Tried to write on a closed socket [%d]", sock_);
00497       return -1;
00498     }
00499   }
00500 
00501   ROS_ASSERT((int32_t)size > 0);
00502 
00503   const uint32_t max_payload_size = max_datagram_size_ - sizeof(TransportUDPHeader);
00504 
00505   uint32_t bytes_sent = 0;
00506   uint32_t this_block = 0;
00507   if (++current_message_id_ == 0)
00508     ++current_message_id_;
00509   while (bytes_sent < size)
00510   {
00511     TransportUDPHeader header;
00512     header.connection_id_ = connection_id_;
00513     header.message_id_ = current_message_id_;
00514     if (this_block == 0)
00515     {
00516       header.op_ = ROS_UDP_DATA0;
00517       header.block_ = (size + max_payload_size - 1) / max_payload_size;
00518     }
00519     else
00520     {
00521       header.op_ = ROS_UDP_DATAN;
00522       header.block_ = this_block;
00523     }
00524     ++this_block;
00525 #if defined(WIN32)
00526     WSABUF iov[2];
00527         DWORD sent_bytes;
00528         SSIZE_T num_bytes = 0;
00529         DWORD flags = 0;
00530         int rc;
00531         iov[0].buf = reinterpret_cast<char*>(&header);
00532         iov[0].len = sizeof(header);
00533         iov[1].buf = reinterpret_cast<char*>(buffer + bytes_sent);
00534         iov[1].len = std::min(max_payload_size, size - bytes_sent);
00535         rc = WSASend(sock_, iov, 2, &sent_bytes, flags, NULL, NULL);
00536         num_bytes = sent_bytes;
00537         if (rc == SOCKET_ERROR) {
00538             num_bytes = -1;
00539         }
00540 #else
00541     struct iovec iov[2];
00542     iov[0].iov_base = &header;
00543     iov[0].iov_len = sizeof(header);
00544     iov[1].iov_base = buffer + bytes_sent;
00545     iov[1].iov_len = std::min(max_payload_size, size - bytes_sent);
00546     ssize_t num_bytes = writev(sock_, iov, 2);
00547 #endif
00548     //usleep(100);
00549     if (num_bytes < 0)
00550     {
00551       if( !last_socket_error_is_would_block() ) // Actually EAGAIN or EWOULDBLOCK on posix
00552       {
00553         ROSCPP_LOG_DEBUG("writev() failed with error [%s]", last_socket_error_string());
00554         close();
00555         break;
00556       }
00557       else
00558       {
00559         num_bytes = 0;
00560       }
00561     }
00562     else if (num_bytes < (unsigned) sizeof(header))
00563     {
00564       ROSCPP_LOG_DEBUG("Socket [%d] short write (%d bytes), closing", sock_, int(num_bytes));
00565       close();
00566       break;
00567     }
00568     else
00569     {
00570       num_bytes -= sizeof(header);
00571     }
00572     bytes_sent += num_bytes;
00573   }
00574 
00575   return bytes_sent;
00576 }
00577 
00578 void TransportUDP::enableRead()
00579 {
00580   {
00581     boost::mutex::scoped_lock lock(close_mutex_);
00582   
00583     if (closed_)
00584     {
00585       return;
00586     }
00587   }
00588 
00589   if (!expecting_read_)
00590   {
00591     poll_set_->addEvents(sock_, POLLIN);
00592     expecting_read_ = true;
00593   }
00594 }
00595 
00596 void TransportUDP::disableRead()
00597 {
00598   ROS_ASSERT(!(flags_ & SYNCHRONOUS));
00599 
00600   {
00601     boost::mutex::scoped_lock lock(close_mutex_);
00602 
00603     if (closed_)
00604     {
00605       return;
00606     }
00607   }
00608 
00609   if (expecting_read_)
00610   {
00611     poll_set_->delEvents(sock_, POLLIN);
00612     expecting_read_ = false;
00613   }
00614 }
00615 
00616 void TransportUDP::enableWrite()
00617 {
00618   {
00619     boost::mutex::scoped_lock lock(close_mutex_);
00620 
00621     if (closed_)
00622     {
00623       return;
00624     }
00625   }
00626 
00627   if (!expecting_write_)
00628   {
00629     poll_set_->addEvents(sock_, POLLOUT);
00630     expecting_write_ = true;
00631   }
00632 }
00633 
00634 void TransportUDP::disableWrite()
00635 {
00636   {
00637     boost::mutex::scoped_lock lock(close_mutex_);
00638 
00639     if (closed_)
00640     {
00641       return;
00642     }
00643   }
00644 
00645   if (expecting_write_)
00646   {
00647     poll_set_->delEvents(sock_, POLLOUT);
00648     expecting_write_ = false;
00649   }
00650 }
00651 
00652 TransportUDPPtr TransportUDP::createOutgoing(std::string host, int port, int connection_id, int max_datagram_size)
00653 {
00654   ROS_ASSERT(is_server_);
00655   
00656   TransportUDPPtr transport(new TransportUDP(poll_set_, flags_, max_datagram_size));
00657   if (!transport->connect(host, port, connection_id))
00658   {
00659     ROS_ERROR("Failed to create outgoing connection");
00660     return TransportUDPPtr();
00661   }
00662   return transport;
00663 
00664 }
00665 
00666 }


roscpp
Author(s): Morgan Quigley mquigley@cs.stanford.edu, Josh Faust jfaust@willowgarage.com, Brian Gerkey gerkey@willowgarage.com, Troy Straszheim straszheim@willowgarage.com
autogenerated on Sat Dec 28 2013 17:35:52