00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include <ros/platform.h>
00036
00037 #include "ros/transport_publisher_link.h"
00038 #include "ros/subscription.h"
00039 #include "ros/header.h"
00040 #include "ros/connection.h"
00041 #include "ros/transport/transport.h"
00042 #include "ros/this_node.h"
00043 #include "ros/connection_manager.h"
00044 #include "ros/file_log.h"
00045 #include "ros/poll_manager.h"
00046 #include "ros/transport/transport_tcp.h"
00047 #include "ros/timer_manager.h"
00048 #include "ros/callback_queue.h"
00049 #include "ros/internal_timer_manager.h"
00050
00051 #include <boost/bind.hpp>
00052
00053 #include <sstream>
00054
00055 namespace ros
00056 {
00057
00058 TransportPublisherLink::TransportPublisherLink(const SubscriptionPtr& parent, const std::string& xmlrpc_uri, const TransportHints& transport_hints)
00059 : PublisherLink(parent, xmlrpc_uri, transport_hints)
00060 , retry_timer_handle_(-1)
00061 , needs_retry_(false)
00062 , dropping_(false)
00063 {
00064 }
00065
00066 TransportPublisherLink::~TransportPublisherLink()
00067 {
00068 dropping_ = true;
00069
00070 if (retry_timer_handle_ != -1)
00071 {
00072 getInternalTimerManager()->remove(retry_timer_handle_);
00073 }
00074
00075 connection_->drop(Connection::Destructing);
00076 }
00077
00078 bool TransportPublisherLink::initialize(const ConnectionPtr& connection)
00079 {
00080 connection_ = connection;
00081 connection_->addDropListener(boost::bind(&TransportPublisherLink::onConnectionDropped, this, _1, _2));
00082
00083 if (connection_->getTransport()->requiresHeader())
00084 {
00085 connection_->setHeaderReceivedCallback(boost::bind(&TransportPublisherLink::onHeaderReceived, this, _1, _2));
00086
00087 SubscriptionPtr parent = parent_.lock();
00088
00089 M_string header;
00090 header["topic"] = parent->getName();
00091 header["md5sum"] = parent->md5sum();
00092 header["callerid"] = this_node::getName();
00093 header["type"] = parent->datatype();
00094 header["tcp_nodelay"] = transport_hints_.getTCPNoDelay() ? "1" : "0";
00095 connection_->writeHeader(header, boost::bind(&TransportPublisherLink::onHeaderWritten, this, _1));
00096 }
00097 else
00098 {
00099 connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4));
00100 }
00101
00102 return true;
00103 }
00104
00105 void TransportPublisherLink::drop()
00106 {
00107 dropping_ = true;
00108 connection_->drop(Connection::Destructing);
00109
00110 if (SubscriptionPtr parent = parent_.lock())
00111 {
00112 parent->removePublisherLink(shared_from_this());
00113 }
00114 }
00115
00116 void TransportPublisherLink::onHeaderWritten(const ConnectionPtr& conn)
00117 {
00118
00119 }
00120
00121 bool TransportPublisherLink::onHeaderReceived(const ConnectionPtr& conn, const Header& header)
00122 {
00123 ROS_ASSERT(conn == connection_);
00124
00125 if (!setHeader(header))
00126 {
00127 drop();
00128 return false;
00129 }
00130
00131 if (retry_timer_handle_ != -1)
00132 {
00133 getInternalTimerManager()->remove(retry_timer_handle_);
00134 retry_timer_handle_ = -1;
00135 }
00136
00137 connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4));
00138
00139 return true;
00140 }
00141
00142 void TransportPublisherLink::onMessageLength(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00143 {
00144 if (retry_timer_handle_ != -1)
00145 {
00146 getInternalTimerManager()->remove(retry_timer_handle_);
00147 retry_timer_handle_ = -1;
00148 }
00149
00150 if (!success)
00151 {
00152 if (connection_)
00153 connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4));
00154 return;
00155 }
00156
00157 ROS_ASSERT(conn == connection_);
00158 ROS_ASSERT(size == 4);
00159
00160 uint32_t len = *((uint32_t*)buffer.get());
00161
00162 if (len > 1000000000)
00163 {
00164 ROS_ERROR("a message of over a gigabyte was " \
00165 "predicted in tcpros. that seems highly " \
00166 "unlikely, so I'll assume protocol " \
00167 "synchronization is lost.");
00168 drop();
00169 return;
00170 }
00171
00172 connection_->read(len, boost::bind(&TransportPublisherLink::onMessage, this, _1, _2, _3, _4));
00173 }
00174
00175 void TransportPublisherLink::onMessage(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00176 {
00177 if (!success && !conn)
00178 return;
00179
00180 ROS_ASSERT(conn == connection_);
00181
00182 if (success)
00183 {
00184 handleMessage(SerializedMessage(buffer, size), true, false);
00185 }
00186
00187 if (success || !connection_->getTransport()->requiresHeader())
00188 {
00189 connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4));
00190 }
00191 }
00192
00193 void TransportPublisherLink::onRetryTimer(const ros::WallTimerEvent&)
00194 {
00195 if (dropping_)
00196 {
00197 return;
00198 }
00199
00200 if (needs_retry_ && WallTime::now() > next_retry_)
00201 {
00202 retry_period_ = std::min(retry_period_ * 2, WallDuration(20));
00203 needs_retry_ = false;
00204 SubscriptionPtr parent = parent_.lock();
00205
00206
00207
00208
00209 if (connection_->getTransport()->getType() == std::string("TCPROS"))
00210 {
00211 std::string topic = parent ? parent->getName() : "unknown";
00212
00213 TransportTCPPtr old_transport = boost::dynamic_pointer_cast<TransportTCP>(connection_->getTransport());
00214 ROS_ASSERT(old_transport);
00215 const std::string& host = old_transport->getConnectedHost();
00216 int port = old_transport->getConnectedPort();
00217
00218 ROSCPP_LOG_DEBUG("Retrying connection to [%s:%d] for topic [%s]", host.c_str(), port, topic.c_str());
00219
00220 TransportTCPPtr transport(new TransportTCP(&PollManager::instance()->getPollSet()));
00221 if (transport->connect(host, port))
00222 {
00223 ConnectionPtr connection(new Connection);
00224 connection->initialize(transport, false, HeaderReceivedFunc());
00225 initialize(connection);
00226
00227 ConnectionManager::instance()->addConnection(connection);
00228 }
00229 else
00230 {
00231 ROSCPP_LOG_DEBUG("connect() failed when retrying connection to [%s:%d] for topic [%s]", host.c_str(), port, topic.c_str());
00232 }
00233 }
00234 else if (parent)
00235 {
00236 parent->removePublisherLink(shared_from_this());
00237 }
00238 }
00239 }
00240
00241 CallbackQueuePtr getInternalCallbackQueue();
00242
00243 void TransportPublisherLink::onConnectionDropped(const ConnectionPtr& conn, Connection::DropReason reason)
00244 {
00245 if (dropping_)
00246 {
00247 return;
00248 }
00249
00250 ROS_ASSERT(conn == connection_);
00251
00252 SubscriptionPtr parent = parent_.lock();
00253
00254 if (reason == Connection::TransportDisconnect)
00255 {
00256 std::string topic = parent ? parent->getName() : "unknown";
00257
00258 ROSCPP_LOG_DEBUG("Connection to publisher [%s] to topic [%s] dropped", connection_->getTransport()->getTransportInfo().c_str(), topic.c_str());
00259
00260 ROS_ASSERT(!needs_retry_);
00261 needs_retry_ = true;
00262 next_retry_ = WallTime::now() + retry_period_;
00263
00264 if (retry_timer_handle_ == -1)
00265 {
00266 retry_period_ = WallDuration(0.1);
00267 next_retry_ = WallTime::now() + retry_period_;
00268 retry_timer_handle_ = getInternalTimerManager()->add(WallDuration(retry_period_),
00269 boost::bind(&TransportPublisherLink::onRetryTimer, this, _1), getInternalCallbackQueue().get(),
00270 VoidConstPtr(), false);
00271 }
00272 else
00273 {
00274 getInternalTimerManager()->setPeriod(retry_timer_handle_, retry_period_);
00275 }
00276 }
00277 else
00278 {
00279 drop();
00280 }
00281 }
00282
00283 void TransportPublisherLink::handleMessage(const SerializedMessage& m, bool ser, bool nocopy)
00284 {
00285 stats_.bytes_received_ += m.num_bytes;
00286 stats_.messages_received_++;
00287
00288 SubscriptionPtr parent = parent_.lock();
00289
00290 if (parent)
00291 {
00292 stats_.drops_ += parent->handleMessage(m, ser, nocopy, getConnection()->getHeader().getValues(), shared_from_this());
00293 }
00294 }
00295
00296 std::string TransportPublisherLink::getTransportType()
00297 {
00298 return connection_->getTransport()->getType();
00299 }
00300
00301 }
00302