00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 
00033 
00034 
00035 #include <ros/platform.h>  
00036 
00037 #include "ros/transport_publisher_link.h"
00038 #include "ros/subscription.h"
00039 #include "ros/header.h"
00040 #include "ros/connection.h"
00041 #include "ros/transport/transport.h"
00042 #include "ros/this_node.h"
00043 #include "ros/connection_manager.h"
00044 #include "ros/file_log.h"
00045 #include "ros/poll_manager.h"
00046 #include "ros/transport/transport_tcp.h"
00047 #include "ros/timer_manager.h"
00048 #include "ros/callback_queue.h"
00049 #include "ros/internal_timer_manager.h"
00050 
00051 #include <boost/bind.hpp>
00052 
00053 #include <sstream>
00054 
00055 namespace ros
00056 {
00057 
00058 TransportPublisherLink::TransportPublisherLink(const SubscriptionPtr& parent, const std::string& xmlrpc_uri, const TransportHints& transport_hints)
00059 : PublisherLink(parent, xmlrpc_uri, transport_hints)
00060 , retry_timer_handle_(-1)
00061 , needs_retry_(false)
00062 , dropping_(false)
00063 {
00064 }
00065 
00066 TransportPublisherLink::~TransportPublisherLink()
00067 {
00068   dropping_ = true;
00069 
00070   if (retry_timer_handle_ != -1)
00071   {
00072     getInternalTimerManager()->remove(retry_timer_handle_);
00073   }
00074 
00075   connection_->drop(Connection::Destructing);
00076 }
00077 
00078 bool TransportPublisherLink::initialize(const ConnectionPtr& connection)
00079 {
00080   connection_ = connection;
00081   
00082   
00083   
00084   
00085   connection_->addDropListener(Connection::DropSignal::slot_type(&TransportPublisherLink::onConnectionDropped, this, _1, _2).track(shared_from_this()));
00086 
00087   if (connection_->getTransport()->requiresHeader())
00088   {
00089     connection_->setHeaderReceivedCallback(boost::bind(&TransportPublisherLink::onHeaderReceived, this, _1, _2));
00090 
00091     SubscriptionPtr parent = parent_.lock();
00092 
00093     M_string header;
00094     header["topic"] = parent->getName();
00095     header["md5sum"] = parent->md5sum();
00096     header["callerid"] = this_node::getName();
00097     header["type"] = parent->datatype();
00098     header["tcp_nodelay"] = transport_hints_.getTCPNoDelay() ? "1" : "0";
00099     connection_->writeHeader(header, boost::bind(&TransportPublisherLink::onHeaderWritten, this, _1));
00100   }
00101   else
00102   {
00103     connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4));
00104   }
00105 
00106   return true;
00107 }
00108 
00109 void TransportPublisherLink::drop()
00110 {
00111   dropping_ = true;
00112   connection_->drop(Connection::Destructing);
00113 
00114   if (SubscriptionPtr parent = parent_.lock())
00115   {
00116     parent->removePublisherLink(shared_from_this());
00117   }
00118 }
00119 
00120 void TransportPublisherLink::onHeaderWritten(const ConnectionPtr& conn)
00121 {
00122   (void)conn;
00123   
00124 }
00125 
00126 bool TransportPublisherLink::onHeaderReceived(const ConnectionPtr& conn, const Header& header)
00127 {
00128   ROS_ASSERT(conn == connection_);
00129 
00130   if (!setHeader(header))
00131   {
00132     drop();
00133     return false;
00134   }
00135 
00136   if (retry_timer_handle_ != -1)
00137   {
00138     getInternalTimerManager()->remove(retry_timer_handle_);
00139     retry_timer_handle_ = -1;
00140   }
00141 
00142   connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4));
00143 
00144   return true;
00145 }
00146 
00147 void TransportPublisherLink::onMessageLength(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00148 {
00149   if (retry_timer_handle_ != -1)
00150   {
00151     getInternalTimerManager()->remove(retry_timer_handle_);
00152     retry_timer_handle_ = -1;
00153   }
00154 
00155   if (!success)
00156   {
00157     if (connection_)
00158       connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4));
00159     return;
00160   }
00161 
00162   ROS_ASSERT(conn == connection_);
00163   ROS_ASSERT(size == 4);
00164 
00165   uint32_t len = *((uint32_t*)buffer.get());
00166 
00167   if (len > 1000000000)
00168   {
00169     ROS_ERROR("a message of over a gigabyte was " \
00170                 "predicted in tcpros. that seems highly " \
00171                 "unlikely, so I'll assume protocol " \
00172                 "synchronization is lost.");
00173     drop();
00174     return;
00175   }
00176 
00177   connection_->read(len, boost::bind(&TransportPublisherLink::onMessage, this, _1, _2, _3, _4));
00178 }
00179 
00180 void TransportPublisherLink::onMessage(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00181 {
00182   if (!success && !conn)
00183     return;
00184 
00185   ROS_ASSERT(conn == connection_);
00186 
00187   if (success)
00188   {
00189     handleMessage(SerializedMessage(buffer, size), true, false);
00190   }
00191 
00192   if (success || !connection_->getTransport()->requiresHeader())
00193   {
00194     connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4));
00195   }
00196 }
00197 
00198 void TransportPublisherLink::onRetryTimer(const ros::WallTimerEvent&)
00199 {
00200   if (dropping_)
00201   {
00202     return;
00203   }
00204 
00205   if (needs_retry_ && WallTime::now() > next_retry_)
00206   {
00207     retry_period_ = std::min(retry_period_ * 2, WallDuration(20));
00208     needs_retry_ = false;
00209     SubscriptionPtr parent = parent_.lock();
00210     
00211     
00212     
00213     
00214     if (connection_->getTransport()->getType() == std::string("TCPROS"))
00215     {
00216       std::string topic = parent ? parent->getName() : "unknown";
00217 
00218       TransportTCPPtr old_transport = boost::dynamic_pointer_cast<TransportTCP>(connection_->getTransport());
00219       ROS_ASSERT(old_transport);
00220       const std::string& host = old_transport->getConnectedHost();
00221       int port = old_transport->getConnectedPort();
00222 
00223       ROSCPP_LOG_DEBUG("Retrying connection to [%s:%d] for topic [%s]", host.c_str(), port, topic.c_str());
00224 
00225       TransportTCPPtr transport(boost::make_shared<TransportTCP>(&PollManager::instance()->getPollSet()));
00226       if (transport->connect(host, port))
00227       {
00228         ConnectionPtr connection(boost::make_shared<Connection>());
00229         connection->initialize(transport, false, HeaderReceivedFunc());
00230         initialize(connection);
00231 
00232         ConnectionManager::instance()->addConnection(connection);
00233       }
00234       else
00235       {
00236         ROSCPP_LOG_DEBUG("connect() failed when retrying connection to [%s:%d] for topic [%s]", host.c_str(), port, topic.c_str());
00237       }
00238     }
00239     else if (parent)
00240     {
00241       parent->removePublisherLink(shared_from_this());
00242     }
00243   }
00244 }
00245 
00246 CallbackQueuePtr getInternalCallbackQueue();
00247 
00248 void TransportPublisherLink::onConnectionDropped(const ConnectionPtr& conn, Connection::DropReason reason)
00249 {
00250   if (dropping_)
00251   {
00252     return;
00253   }
00254 
00255   ROS_ASSERT(conn == connection_);
00256 
00257   SubscriptionPtr parent = parent_.lock();
00258 
00259   if (reason == Connection::TransportDisconnect)
00260   {
00261     std::string topic = parent ? parent->getName() : "unknown";
00262 
00263     ROSCPP_LOG_DEBUG("Connection to publisher [%s] to topic [%s] dropped", connection_->getTransport()->getTransportInfo().c_str(), topic.c_str());
00264 
00265     ROS_ASSERT(!needs_retry_);
00266     needs_retry_ = true;
00267     next_retry_ = WallTime::now() + retry_period_;
00268 
00269     if (retry_timer_handle_ == -1)
00270     {
00271       retry_period_ = WallDuration(0.1);
00272       next_retry_ = WallTime::now() + retry_period_;
00273       
00274       
00275       retry_timer_handle_ = getInternalTimerManager()->add(WallDuration(retry_period_),
00276           boost::bind(&TransportPublisherLink::onRetryTimer, this, _1), getInternalCallbackQueue().get(),
00277           shared_from_this(), false);
00278     }
00279     else
00280     {
00281       getInternalTimerManager()->setPeriod(retry_timer_handle_, retry_period_);
00282     }
00283   }
00284   else
00285   {
00286     drop();
00287   }
00288 }
00289 
00290 void TransportPublisherLink::handleMessage(const SerializedMessage& m, bool ser, bool nocopy)
00291 {
00292   stats_.bytes_received_ += m.num_bytes;
00293   stats_.messages_received_++;
00294 
00295   SubscriptionPtr parent = parent_.lock();
00296 
00297   if (parent)
00298   {
00299     stats_.drops_ += parent->handleMessage(m, ser, nocopy, getConnection()->getHeader().getValues(), shared_from_this());
00300   }
00301 }
00302 
00303 std::string TransportPublisherLink::getTransportType()
00304 {
00305   return connection_->getTransport()->getType();
00306 }
00307 
00308 std::string TransportPublisherLink::getTransportInfo()
00309 {
00310   return connection_->getTransport()->getTransportInfo();
00311 }
00312 
00313 } 
00314