00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include <ros/platform.h>
00036
00037 #include "ros/transport_publisher_link.h"
00038 #include "ros/subscription.h"
00039 #include "ros/header.h"
00040 #include "ros/connection.h"
00041 #include "ros/transport/transport.h"
00042 #include "ros/this_node.h"
00043 #include "ros/connection_manager.h"
00044 #include "ros/file_log.h"
00045 #include "ros/poll_manager.h"
00046 #include "ros/transport/transport_tcp.h"
00047 #include "ros/timer_manager.h"
00048 #include "ros/callback_queue.h"
00049 #include "ros/internal_timer_manager.h"
00050
00051 #include <boost/bind.hpp>
00052
00053 #include <sstream>
00054
00055 namespace ros
00056 {
00057
00058 TransportPublisherLink::TransportPublisherLink(const SubscriptionPtr& parent, const std::string& xmlrpc_uri, const TransportHints& transport_hints)
00059 : PublisherLink(parent, xmlrpc_uri, transport_hints)
00060 , retry_timer_handle_(-1)
00061 , needs_retry_(false)
00062 , dropping_(false)
00063 {
00064 }
00065
00066 TransportPublisherLink::~TransportPublisherLink()
00067 {
00068 dropping_ = true;
00069
00070 if (retry_timer_handle_ != -1)
00071 {
00072 getInternalTimerManager()->remove(retry_timer_handle_);
00073 }
00074
00075 connection_->drop(Connection::Destructing);
00076 }
00077
00078 bool TransportPublisherLink::initialize(const ConnectionPtr& connection)
00079 {
00080 connection_ = connection;
00081
00082
00083
00084
00085 connection_->addDropListener(Connection::DropSignal::slot_type(&TransportPublisherLink::onConnectionDropped, this, _1, _2).track(shared_from_this()));
00086
00087 if (connection_->getTransport()->requiresHeader())
00088 {
00089 connection_->setHeaderReceivedCallback(boost::bind(&TransportPublisherLink::onHeaderReceived, this, _1, _2));
00090
00091 SubscriptionPtr parent = parent_.lock();
00092
00093 M_string header;
00094 header["topic"] = parent->getName();
00095 header["md5sum"] = parent->md5sum();
00096 header["callerid"] = this_node::getName();
00097 header["type"] = parent->datatype();
00098 header["tcp_nodelay"] = transport_hints_.getTCPNoDelay() ? "1" : "0";
00099 connection_->writeHeader(header, boost::bind(&TransportPublisherLink::onHeaderWritten, this, _1));
00100 }
00101 else
00102 {
00103 connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4));
00104 }
00105
00106 return true;
00107 }
00108
00109 void TransportPublisherLink::drop()
00110 {
00111 dropping_ = true;
00112 connection_->drop(Connection::Destructing);
00113
00114 if (SubscriptionPtr parent = parent_.lock())
00115 {
00116 parent->removePublisherLink(shared_from_this());
00117 }
00118 }
00119
00120 void TransportPublisherLink::onHeaderWritten(const ConnectionPtr& conn)
00121 {
00122 (void)conn;
00123
00124 }
00125
00126 bool TransportPublisherLink::onHeaderReceived(const ConnectionPtr& conn, const Header& header)
00127 {
00128 ROS_ASSERT(conn == connection_);
00129
00130 if (!setHeader(header))
00131 {
00132 drop();
00133 return false;
00134 }
00135
00136 if (retry_timer_handle_ != -1)
00137 {
00138 getInternalTimerManager()->remove(retry_timer_handle_);
00139 retry_timer_handle_ = -1;
00140 }
00141
00142 connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4));
00143
00144 return true;
00145 }
00146
00147 void TransportPublisherLink::onMessageLength(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00148 {
00149 if (retry_timer_handle_ != -1)
00150 {
00151 getInternalTimerManager()->remove(retry_timer_handle_);
00152 retry_timer_handle_ = -1;
00153 }
00154
00155 if (!success)
00156 {
00157 if (connection_)
00158 connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4));
00159 return;
00160 }
00161
00162 ROS_ASSERT(conn == connection_);
00163 ROS_ASSERT(size == 4);
00164
00165 uint32_t len = *((uint32_t*)buffer.get());
00166
00167 if (len > 1000000000)
00168 {
00169 ROS_ERROR("a message of over a gigabyte was " \
00170 "predicted in tcpros. that seems highly " \
00171 "unlikely, so I'll assume protocol " \
00172 "synchronization is lost.");
00173 drop();
00174 return;
00175 }
00176
00177 connection_->read(len, boost::bind(&TransportPublisherLink::onMessage, this, _1, _2, _3, _4));
00178 }
00179
00180 void TransportPublisherLink::onMessage(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00181 {
00182 if (!success && !conn)
00183 return;
00184
00185 ROS_ASSERT(conn == connection_);
00186
00187 if (success)
00188 {
00189 handleMessage(SerializedMessage(buffer, size), true, false);
00190 }
00191
00192 if (success || !connection_->getTransport()->requiresHeader())
00193 {
00194 connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4));
00195 }
00196 }
00197
00198 void TransportPublisherLink::onRetryTimer(const ros::WallTimerEvent&)
00199 {
00200 if (dropping_)
00201 {
00202 return;
00203 }
00204
00205 if (needs_retry_ && WallTime::now() > next_retry_)
00206 {
00207 retry_period_ = std::min(retry_period_ * 2, WallDuration(20));
00208 needs_retry_ = false;
00209 SubscriptionPtr parent = parent_.lock();
00210
00211
00212
00213
00214 if (connection_->getTransport()->getType() == std::string("TCPROS"))
00215 {
00216 std::string topic = parent ? parent->getName() : "unknown";
00217
00218 TransportTCPPtr old_transport = boost::dynamic_pointer_cast<TransportTCP>(connection_->getTransport());
00219 ROS_ASSERT(old_transport);
00220 const std::string& host = old_transport->getConnectedHost();
00221 int port = old_transport->getConnectedPort();
00222
00223 ROSCPP_LOG_DEBUG("Retrying connection to [%s:%d] for topic [%s]", host.c_str(), port, topic.c_str());
00224
00225 TransportTCPPtr transport(boost::make_shared<TransportTCP>(&PollManager::instance()->getPollSet()));
00226 if (transport->connect(host, port))
00227 {
00228 ConnectionPtr connection(boost::make_shared<Connection>());
00229 connection->initialize(transport, false, HeaderReceivedFunc());
00230 initialize(connection);
00231
00232 ConnectionManager::instance()->addConnection(connection);
00233 }
00234 else
00235 {
00236 ROSCPP_LOG_DEBUG("connect() failed when retrying connection to [%s:%d] for topic [%s]", host.c_str(), port, topic.c_str());
00237 }
00238 }
00239 else if (parent)
00240 {
00241 parent->removePublisherLink(shared_from_this());
00242 }
00243 }
00244 }
00245
00246 CallbackQueuePtr getInternalCallbackQueue();
00247
00248 void TransportPublisherLink::onConnectionDropped(const ConnectionPtr& conn, Connection::DropReason reason)
00249 {
00250 if (dropping_)
00251 {
00252 return;
00253 }
00254
00255 ROS_ASSERT(conn == connection_);
00256
00257 SubscriptionPtr parent = parent_.lock();
00258
00259 if (reason == Connection::TransportDisconnect)
00260 {
00261 std::string topic = parent ? parent->getName() : "unknown";
00262
00263 ROSCPP_LOG_DEBUG("Connection to publisher [%s] to topic [%s] dropped", connection_->getTransport()->getTransportInfo().c_str(), topic.c_str());
00264
00265 ROS_ASSERT(!needs_retry_);
00266 needs_retry_ = true;
00267 next_retry_ = WallTime::now() + retry_period_;
00268
00269 if (retry_timer_handle_ == -1)
00270 {
00271 retry_period_ = WallDuration(0.1);
00272 next_retry_ = WallTime::now() + retry_period_;
00273
00274
00275 retry_timer_handle_ = getInternalTimerManager()->add(WallDuration(retry_period_),
00276 boost::bind(&TransportPublisherLink::onRetryTimer, this, _1), getInternalCallbackQueue().get(),
00277 shared_from_this(), false);
00278 }
00279 else
00280 {
00281 getInternalTimerManager()->setPeriod(retry_timer_handle_, retry_period_);
00282 }
00283 }
00284 else
00285 {
00286 drop();
00287 }
00288 }
00289
00290 void TransportPublisherLink::handleMessage(const SerializedMessage& m, bool ser, bool nocopy)
00291 {
00292 stats_.bytes_received_ += m.num_bytes;
00293 stats_.messages_received_++;
00294
00295 SubscriptionPtr parent = parent_.lock();
00296
00297 if (parent)
00298 {
00299 stats_.drops_ += parent->handleMessage(m, ser, nocopy, getConnection()->getHeader().getValues(), shared_from_this());
00300 }
00301 }
00302
00303 std::string TransportPublisherLink::getTransportType()
00304 {
00305 return connection_->getTransport()->getType();
00306 }
00307
00308 std::string TransportPublisherLink::getTransportInfo()
00309 {
00310 return connection_->getTransport()->getTransportInfo();
00311 }
00312
00313 }
00314