transport_publisher_link.cpp
Go to the documentation of this file.
00001 /*
00002  * Software License Agreement (BSD License)
00003  *
00004  *  Copyright (c) 2008, Willow Garage, Inc.
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions
00009  *  are met:
00010  *
00011  *   * Redistributions of source code must retain the above copyright
00012  *     notice, this list of conditions and the following disclaimer.
00013  *   * Redistributions in binary form must reproduce the above
00014  *     copyright notice, this list of conditions and the following
00015  *     disclaimer in the documentation and/or other materials provided
00016  *     with the distribution.
00017  *   * Neither the name of Willow Garage, Inc. nor the names of its
00018  *     contributors may be used to endorse or promote products derived
00019  *     from this software without specific prior written permission.
00020  *
00021  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024  *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025  *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027  *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028  *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029  *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030  *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031  *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032  *  POSSIBILITY OF SUCH DAMAGE.
00033  */
00034 
00035 #include <ros/platform.h>  // platform dependendant requirements
00036 
00037 #include "ros/transport_publisher_link.h"
00038 #include "ros/subscription.h"
00039 #include "ros/header.h"
00040 #include "ros/connection.h"
00041 #include "ros/transport/transport.h"
00042 #include "ros/this_node.h"
00043 #include "ros/connection_manager.h"
00044 #include "ros/file_log.h"
00045 #include "ros/poll_manager.h"
00046 #include "ros/transport/transport_tcp.h"
00047 #include "ros/timer_manager.h"
00048 #include "ros/callback_queue.h"
00049 #include "ros/internal_timer_manager.h"
00050 
00051 #include <boost/bind.hpp>
00052 
00053 #include <sstream>
00054 
00055 namespace ros
00056 {
00057 
00058 TransportPublisherLink::TransportPublisherLink(const SubscriptionPtr& parent, const std::string& xmlrpc_uri, const TransportHints& transport_hints)
00059 : PublisherLink(parent, xmlrpc_uri, transport_hints)
00060 , retry_timer_handle_(-1)
00061 , needs_retry_(false)
00062 , dropping_(false)
00063 {
00064 }
00065 
00066 TransportPublisherLink::~TransportPublisherLink()
00067 {
00068   dropping_ = true;
00069 
00070   if (retry_timer_handle_ != -1)
00071   {
00072     getInternalTimerManager()->remove(retry_timer_handle_);
00073   }
00074 
00075   connection_->drop(Connection::Destructing);
00076 }
00077 
00078 bool TransportPublisherLink::initialize(const ConnectionPtr& connection)
00079 {
00080   connection_ = connection;
00081   // slot_type is used to automatically track the TransporPublisherLink class' existence
00082   // and disconnect when this class' reference count is decremented to 0. It increments
00083   // then decrements the shared_from_this reference count around calls to the
00084   // onConnectionDropped function, preventing a coredump in the middle of execution.
00085   connection_->addDropListener(Connection::DropSignal::slot_type(&TransportPublisherLink::onConnectionDropped, this, _1, _2).track(shared_from_this()));
00086 
00087   if (connection_->getTransport()->requiresHeader())
00088   {
00089     connection_->setHeaderReceivedCallback(boost::bind(&TransportPublisherLink::onHeaderReceived, this, _1, _2));
00090 
00091     SubscriptionPtr parent = parent_.lock();
00092 
00093     M_string header;
00094     header["topic"] = parent->getName();
00095     header["md5sum"] = parent->md5sum();
00096     header["callerid"] = this_node::getName();
00097     header["type"] = parent->datatype();
00098     header["tcp_nodelay"] = transport_hints_.getTCPNoDelay() ? "1" : "0";
00099     connection_->writeHeader(header, boost::bind(&TransportPublisherLink::onHeaderWritten, this, _1));
00100   }
00101   else
00102   {
00103     connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4));
00104   }
00105 
00106   return true;
00107 }
00108 
00109 void TransportPublisherLink::drop()
00110 {
00111   dropping_ = true;
00112   connection_->drop(Connection::Destructing);
00113 
00114   if (SubscriptionPtr parent = parent_.lock())
00115   {
00116     parent->removePublisherLink(shared_from_this());
00117   }
00118 }
00119 
00120 void TransportPublisherLink::onHeaderWritten(const ConnectionPtr& conn)
00121 {
00122   (void)conn;
00123   // Do nothing
00124 }
00125 
00126 bool TransportPublisherLink::onHeaderReceived(const ConnectionPtr& conn, const Header& header)
00127 {
00128   ROS_ASSERT(conn == connection_);
00129 
00130   if (!setHeader(header))
00131   {
00132     drop();
00133     return false;
00134   }
00135 
00136   if (retry_timer_handle_ != -1)
00137   {
00138     getInternalTimerManager()->remove(retry_timer_handle_);
00139     retry_timer_handle_ = -1;
00140   }
00141 
00142   connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4));
00143 
00144   return true;
00145 }
00146 
00147 void TransportPublisherLink::onMessageLength(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00148 {
00149   if (retry_timer_handle_ != -1)
00150   {
00151     getInternalTimerManager()->remove(retry_timer_handle_);
00152     retry_timer_handle_ = -1;
00153   }
00154 
00155   if (!success)
00156   {
00157     if (connection_)
00158       connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4));
00159     return;
00160   }
00161 
00162   ROS_ASSERT(conn == connection_);
00163   ROS_ASSERT(size == 4);
00164 
00165   uint32_t len = *((uint32_t*)buffer.get());
00166 
00167   if (len > 1000000000)
00168   {
00169     ROS_ERROR("a message of over a gigabyte was " \
00170                 "predicted in tcpros. that seems highly " \
00171                 "unlikely, so I'll assume protocol " \
00172                 "synchronization is lost.");
00173     drop();
00174     return;
00175   }
00176 
00177   connection_->read(len, boost::bind(&TransportPublisherLink::onMessage, this, _1, _2, _3, _4));
00178 }
00179 
00180 void TransportPublisherLink::onMessage(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00181 {
00182   if (!success && !conn)
00183     return;
00184 
00185   ROS_ASSERT(conn == connection_);
00186 
00187   if (success)
00188   {
00189     handleMessage(SerializedMessage(buffer, size), true, false);
00190   }
00191 
00192   if (success || !connection_->getTransport()->requiresHeader())
00193   {
00194     connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4));
00195   }
00196 }
00197 
00198 void TransportPublisherLink::onRetryTimer(const ros::WallTimerEvent&)
00199 {
00200   if (dropping_)
00201   {
00202     return;
00203   }
00204 
00205   if (needs_retry_ && WallTime::now() > next_retry_)
00206   {
00207     retry_period_ = std::min(retry_period_ * 2, WallDuration(20));
00208     needs_retry_ = false;
00209     SubscriptionPtr parent = parent_.lock();
00210     // TODO: support retry on more than just TCP
00211     // For now, since UDP does not have a heartbeat, we do not attempt to retry
00212     // UDP connections since an error there likely means some invalid operation has
00213     // happened.
00214     if (connection_->getTransport()->getType() == std::string("TCPROS"))
00215     {
00216       std::string topic = parent ? parent->getName() : "unknown";
00217 
00218       TransportTCPPtr old_transport = boost::dynamic_pointer_cast<TransportTCP>(connection_->getTransport());
00219       ROS_ASSERT(old_transport);
00220       const std::string& host = old_transport->getConnectedHost();
00221       int port = old_transport->getConnectedPort();
00222 
00223       ROSCPP_LOG_DEBUG("Retrying connection to [%s:%d] for topic [%s]", host.c_str(), port, topic.c_str());
00224 
00225       TransportTCPPtr transport(boost::make_shared<TransportTCP>(&PollManager::instance()->getPollSet()));
00226       if (transport->connect(host, port))
00227       {
00228         ConnectionPtr connection(boost::make_shared<Connection>());
00229         connection->initialize(transport, false, HeaderReceivedFunc());
00230         initialize(connection);
00231 
00232         ConnectionManager::instance()->addConnection(connection);
00233       }
00234       else
00235       {
00236         ROSCPP_LOG_DEBUG("connect() failed when retrying connection to [%s:%d] for topic [%s]", host.c_str(), port, topic.c_str());
00237       }
00238     }
00239     else if (parent)
00240     {
00241       parent->removePublisherLink(shared_from_this());
00242     }
00243   }
00244 }
00245 
00246 CallbackQueuePtr getInternalCallbackQueue();
00247 
00248 void TransportPublisherLink::onConnectionDropped(const ConnectionPtr& conn, Connection::DropReason reason)
00249 {
00250   if (dropping_)
00251   {
00252     return;
00253   }
00254 
00255   ROS_ASSERT(conn == connection_);
00256 
00257   SubscriptionPtr parent = parent_.lock();
00258 
00259   if (reason == Connection::TransportDisconnect)
00260   {
00261     std::string topic = parent ? parent->getName() : "unknown";
00262 
00263     ROSCPP_LOG_DEBUG("Connection to publisher [%s] to topic [%s] dropped", connection_->getTransport()->getTransportInfo().c_str(), topic.c_str());
00264 
00265     ROS_ASSERT(!needs_retry_);
00266     needs_retry_ = true;
00267     next_retry_ = WallTime::now() + retry_period_;
00268 
00269     if (retry_timer_handle_ == -1)
00270     {
00271       retry_period_ = WallDuration(0.1);
00272       next_retry_ = WallTime::now() + retry_period_;
00273       // shared_from_this() shared_ptr is used to ensure TransportPublisherLink is not
00274       // destroyed in the middle of onRetryTimer execution
00275       retry_timer_handle_ = getInternalTimerManager()->add(WallDuration(retry_period_),
00276           boost::bind(&TransportPublisherLink::onRetryTimer, this, _1), getInternalCallbackQueue().get(),
00277           shared_from_this(), false);
00278     }
00279     else
00280     {
00281       getInternalTimerManager()->setPeriod(retry_timer_handle_, retry_period_);
00282     }
00283   }
00284   else
00285   {
00286     drop();
00287   }
00288 }
00289 
00290 void TransportPublisherLink::handleMessage(const SerializedMessage& m, bool ser, bool nocopy)
00291 {
00292   stats_.bytes_received_ += m.num_bytes;
00293   stats_.messages_received_++;
00294 
00295   SubscriptionPtr parent = parent_.lock();
00296 
00297   if (parent)
00298   {
00299     stats_.drops_ += parent->handleMessage(m, ser, nocopy, getConnection()->getHeader().getValues(), shared_from_this());
00300   }
00301 }
00302 
00303 std::string TransportPublisherLink::getTransportType()
00304 {
00305   return connection_->getTransport()->getType();
00306 }
00307 
00308 std::string TransportPublisherLink::getTransportInfo()
00309 {
00310   return connection_->getTransport()->getTransportInfo();
00311 }
00312 
00313 } // namespace ros
00314 


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Tue Mar 7 2017 03:44:47