transport_publisher_link.cpp
Go to the documentation of this file.
00001 /*
00002  * Software License Agreement (BSD License)
00003  *
00004  *  Copyright (c) 2008, Willow Garage, Inc.
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions
00009  *  are met:
00010  *
00011  *   * Redistributions of source code must retain the above copyright
00012  *     notice, this list of conditions and the following disclaimer.
00013  *   * Redistributions in binary form must reproduce the above
00014  *     copyright notice, this list of conditions and the following
00015  *     disclaimer in the documentation and/or other materials provided
00016  *     with the distribution.
00017  *   * Neither the name of Willow Garage, Inc. nor the names of its
00018  *     contributors may be used to endorse or promote products derived
00019  *     from this software without specific prior written permission.
00020  *
00021  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024  *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025  *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027  *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028  *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029  *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030  *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031  *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032  *  POSSIBILITY OF SUCH DAMAGE.
00033  */
00034 
00035 #include <ros/platform.h>  // platform dependendant requirements
00036 
00037 #include "ros/transport_publisher_link.h"
00038 #include "ros/subscription.h"
00039 #include "ros/header.h"
00040 #include "ros/connection.h"
00041 #include "ros/transport/transport.h"
00042 #include "ros/this_node.h"
00043 #include "ros/connection_manager.h"
00044 #include "ros/file_log.h"
00045 #include "ros/poll_manager.h"
00046 #include "ros/transport/transport_tcp.h"
00047 #include "ros/timer_manager.h"
00048 #include "ros/callback_queue.h"
00049 #include "ros/internal_timer_manager.h"
00050 
00051 #include <boost/bind.hpp>
00052 
00053 #include <sstream>
00054 
00055 namespace ros
00056 {
00057 
00058 TransportPublisherLink::TransportPublisherLink(const SubscriptionPtr& parent, const std::string& xmlrpc_uri, const TransportHints& transport_hints)
00059 : PublisherLink(parent, xmlrpc_uri, transport_hints)
00060 , retry_timer_handle_(-1)
00061 , needs_retry_(false)
00062 , dropping_(false)
00063 {
00064 }
00065 
00066 TransportPublisherLink::~TransportPublisherLink()
00067 {
00068   dropping_ = true;
00069 
00070   if (retry_timer_handle_ != -1)
00071   {
00072     getInternalTimerManager()->remove(retry_timer_handle_);
00073   }
00074 
00075   connection_->drop(Connection::Destructing);
00076 }
00077 
00078 bool TransportPublisherLink::initialize(const ConnectionPtr& connection)
00079 {
00080   connection_ = connection;
00081   connection_->addDropListener(boost::bind(&TransportPublisherLink::onConnectionDropped, this, _1, _2));
00082 
00083   if (connection_->getTransport()->requiresHeader())
00084   {
00085     connection_->setHeaderReceivedCallback(boost::bind(&TransportPublisherLink::onHeaderReceived, this, _1, _2));
00086 
00087     SubscriptionPtr parent = parent_.lock();
00088 
00089     M_string header;
00090     header["topic"] = parent->getName();
00091     header["md5sum"] = parent->md5sum();
00092     header["callerid"] = this_node::getName();
00093     header["type"] = parent->datatype();
00094     header["tcp_nodelay"] = transport_hints_.getTCPNoDelay() ? "1" : "0";
00095     connection_->writeHeader(header, boost::bind(&TransportPublisherLink::onHeaderWritten, this, _1));
00096   }
00097   else
00098   {
00099     connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4));
00100   }
00101 
00102   return true;
00103 }
00104 
00105 void TransportPublisherLink::drop()
00106 {
00107   dropping_ = true;
00108   connection_->drop(Connection::Destructing);
00109 
00110   if (SubscriptionPtr parent = parent_.lock())
00111   {
00112     parent->removePublisherLink(shared_from_this());
00113   }
00114 }
00115 
00116 void TransportPublisherLink::onHeaderWritten(const ConnectionPtr& conn)
00117 {
00118   // Do nothing
00119 }
00120 
00121 bool TransportPublisherLink::onHeaderReceived(const ConnectionPtr& conn, const Header& header)
00122 {
00123   ROS_ASSERT(conn == connection_);
00124 
00125   if (!setHeader(header))
00126   {
00127     drop();
00128     return false;
00129   }
00130 
00131   if (retry_timer_handle_ != -1)
00132   {
00133     getInternalTimerManager()->remove(retry_timer_handle_);
00134     retry_timer_handle_ = -1;
00135   }
00136 
00137   connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4));
00138 
00139   return true;
00140 }
00141 
00142 void TransportPublisherLink::onMessageLength(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00143 {
00144   if (retry_timer_handle_ != -1)
00145   {
00146     getInternalTimerManager()->remove(retry_timer_handle_);
00147     retry_timer_handle_ = -1;
00148   }
00149 
00150   if (!success)
00151   {
00152     if (connection_)
00153       connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4));
00154     return;
00155   }
00156 
00157   ROS_ASSERT(conn == connection_);
00158   ROS_ASSERT(size == 4);
00159 
00160   uint32_t len = *((uint32_t*)buffer.get());
00161 
00162   if (len > 1000000000)
00163   {
00164     ROS_ERROR("a message of over a gigabyte was " \
00165                 "predicted in tcpros. that seems highly " \
00166                 "unlikely, so I'll assume protocol " \
00167                 "synchronization is lost.");
00168     drop();
00169     return;
00170   }
00171 
00172   connection_->read(len, boost::bind(&TransportPublisherLink::onMessage, this, _1, _2, _3, _4));
00173 }
00174 
00175 void TransportPublisherLink::onMessage(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00176 {
00177   if (!success && !conn)
00178     return;
00179 
00180   ROS_ASSERT(conn == connection_);
00181 
00182   if (success)
00183   {
00184     handleMessage(SerializedMessage(buffer, size), true, false);
00185   }
00186 
00187   if (success || !connection_->getTransport()->requiresHeader())
00188   {
00189     connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4));
00190   }
00191 }
00192 
00193 void TransportPublisherLink::onRetryTimer(const ros::WallTimerEvent&)
00194 {
00195   if (dropping_)
00196   {
00197     return;
00198   }
00199 
00200   if (needs_retry_ && WallTime::now() > next_retry_)
00201   {
00202     retry_period_ = std::min(retry_period_ * 2, WallDuration(20));
00203     needs_retry_ = false;
00204     SubscriptionPtr parent = parent_.lock();
00205     // TODO: support retry on more than just TCP
00206     // For now, since UDP does not have a heartbeat, we do not attempt to retry
00207     // UDP connections since an error there likely means some invalid operation has
00208     // happened.
00209     if (connection_->getTransport()->getType() == std::string("TCPROS"))
00210     {
00211       std::string topic = parent ? parent->getName() : "unknown";
00212 
00213       TransportTCPPtr old_transport = boost::dynamic_pointer_cast<TransportTCP>(connection_->getTransport());
00214       ROS_ASSERT(old_transport);
00215       const std::string& host = old_transport->getConnectedHost();
00216       int port = old_transport->getConnectedPort();
00217 
00218       ROSCPP_LOG_DEBUG("Retrying connection to [%s:%d] for topic [%s]", host.c_str(), port, topic.c_str());
00219 
00220       TransportTCPPtr transport(new TransportTCP(&PollManager::instance()->getPollSet()));
00221       if (transport->connect(host, port))
00222       {
00223         ConnectionPtr connection(new Connection);
00224         connection->initialize(transport, false, HeaderReceivedFunc());
00225         initialize(connection);
00226 
00227         ConnectionManager::instance()->addConnection(connection);
00228       }
00229       else
00230       {
00231         ROSCPP_LOG_DEBUG("connect() failed when retrying connection to [%s:%d] for topic [%s]", host.c_str(), port, topic.c_str());
00232       }
00233     }
00234     else if (parent)
00235     {
00236       parent->removePublisherLink(shared_from_this());
00237     }
00238   }
00239 }
00240 
00241 CallbackQueuePtr getInternalCallbackQueue();
00242 
00243 void TransportPublisherLink::onConnectionDropped(const ConnectionPtr& conn, Connection::DropReason reason)
00244 {
00245   if (dropping_)
00246   {
00247     return;
00248   }
00249 
00250   ROS_ASSERT(conn == connection_);
00251 
00252   SubscriptionPtr parent = parent_.lock();
00253 
00254   if (reason == Connection::TransportDisconnect)
00255   {
00256     std::string topic = parent ? parent->getName() : "unknown";
00257 
00258     ROSCPP_LOG_DEBUG("Connection to publisher [%s] to topic [%s] dropped", connection_->getTransport()->getTransportInfo().c_str(), topic.c_str());
00259 
00260     ROS_ASSERT(!needs_retry_);
00261     needs_retry_ = true;
00262     next_retry_ = WallTime::now() + retry_period_;
00263 
00264     if (retry_timer_handle_ == -1)
00265     {
00266       retry_period_ = WallDuration(0.1);
00267       next_retry_ = WallTime::now() + retry_period_;
00268       retry_timer_handle_ = getInternalTimerManager()->add(WallDuration(retry_period_),
00269           boost::bind(&TransportPublisherLink::onRetryTimer, this, _1), getInternalCallbackQueue().get(),
00270           VoidConstPtr(), false);
00271     }
00272     else
00273     {
00274       getInternalTimerManager()->setPeriod(retry_timer_handle_, retry_period_);
00275     }
00276   }
00277   else
00278   {
00279     drop();
00280   }
00281 }
00282 
00283 void TransportPublisherLink::handleMessage(const SerializedMessage& m, bool ser, bool nocopy)
00284 {
00285   stats_.bytes_received_ += m.num_bytes;
00286   stats_.messages_received_++;
00287 
00288   SubscriptionPtr parent = parent_.lock();
00289 
00290   if (parent)
00291   {
00292     stats_.drops_ += parent->handleMessage(m, ser, nocopy, getConnection()->getHeader().getValues(), shared_from_this());
00293   }
00294 }
00295 
00296 std::string TransportPublisherLink::getTransportType()
00297 {
00298   return connection_->getTransport()->getType();
00299 }
00300 
00301 } // namespace ros
00302 


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Mon Oct 6 2014 11:46:44