00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "ros/transport_publisher_link.h"
00036 #include "ros/subscription.h"
00037 #include "ros/header.h"
00038 #include "ros/connection.h"
00039 #include "ros/transport/transport.h"
00040 #include "ros/this_node.h"
00041 #include "ros/connection_manager.h"
00042 #include "ros/file_log.h"
00043 #include "ros/poll_manager.h"
00044 #include "ros/transport/transport_tcp.h"
00045 #include "ros/timer_manager.h"
00046 #include "ros/callback_queue.h"
00047 #include "ros/internal_timer_manager.h"
00048
00049 #include <boost/bind.hpp>
00050
00051 #include <sstream>
00052
00053 namespace ros
00054 {
00055
00056 TransportPublisherLink::TransportPublisherLink(const SubscriptionPtr& parent, const std::string& xmlrpc_uri, const TransportHints& transport_hints)
00057 : PublisherLink(parent, xmlrpc_uri, transport_hints)
00058 , retry_timer_handle_(-1)
00059 , needs_retry_(false)
00060 , dropping_(false)
00061 {
00062 }
00063
00064 TransportPublisherLink::~TransportPublisherLink()
00065 {
00066 dropping_ = true;
00067
00068 if (retry_timer_handle_ != -1)
00069 {
00070 getInternalTimerManager()->remove(retry_timer_handle_);
00071 }
00072
00073 connection_->drop(Connection::Destructing);
00074 }
00075
00076 bool TransportPublisherLink::initialize(const ConnectionPtr& connection)
00077 {
00078 connection_ = connection;
00079 connection_->addDropListener(boost::bind(&TransportPublisherLink::onConnectionDropped, this, _1, _2));
00080
00081 if (connection_->getTransport()->requiresHeader())
00082 {
00083 connection_->setHeaderReceivedCallback(boost::bind(&TransportPublisherLink::onHeaderReceived, this, _1, _2));
00084
00085 SubscriptionPtr parent = parent_.lock();
00086
00087 M_string header;
00088 header["topic"] = parent->getName();
00089 header["md5sum"] = parent->md5sum();
00090 header["callerid"] = this_node::getName();
00091 header["type"] = parent->datatype();
00092 header["tcp_nodelay"] = transport_hints_.getTCPNoDelay() ? "1" : "0";
00093 connection_->writeHeader(header, boost::bind(&TransportPublisherLink::onHeaderWritten, this, _1));
00094 }
00095 else
00096 {
00097 connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4));
00098 }
00099
00100 return true;
00101 }
00102
00103 void TransportPublisherLink::drop()
00104 {
00105 dropping_ = true;
00106 connection_->drop(Connection::Destructing);
00107
00108 if (SubscriptionPtr parent = parent_.lock())
00109 {
00110 parent->removePublisherLink(shared_from_this());
00111 }
00112 }
00113
00114 void TransportPublisherLink::onHeaderWritten(const ConnectionPtr& conn)
00115 {
00116
00117 }
00118
00119 bool TransportPublisherLink::onHeaderReceived(const ConnectionPtr& conn, const Header& header)
00120 {
00121 ROS_ASSERT(conn == connection_);
00122
00123 if (!setHeader(header))
00124 {
00125 drop();
00126 return false;
00127 }
00128
00129 if (retry_timer_handle_ != -1)
00130 {
00131 getInternalTimerManager()->remove(retry_timer_handle_);
00132 retry_timer_handle_ = -1;
00133 }
00134
00135 connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4));
00136
00137 return true;
00138 }
00139
00140 void TransportPublisherLink::onMessageLength(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00141 {
00142 if (retry_timer_handle_ != -1)
00143 {
00144 getInternalTimerManager()->remove(retry_timer_handle_);
00145 retry_timer_handle_ = -1;
00146 }
00147
00148 if (!success)
00149 {
00150 if (connection_)
00151 connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4));
00152 return;
00153 }
00154
00155 ROS_ASSERT(conn == connection_);
00156 ROS_ASSERT(size == 4);
00157
00158 uint32_t len = *((uint32_t*)buffer.get());
00159
00160 if (len > 1000000000)
00161 {
00162 ROS_ERROR("a message of over a gigabyte was " \
00163 "predicted in tcpros. that seems highly " \
00164 "unlikely, so I'll assume protocol " \
00165 "synchronization is lost.");
00166 drop();
00167 return;
00168 }
00169
00170 connection_->read(len, boost::bind(&TransportPublisherLink::onMessage, this, _1, _2, _3, _4));
00171 }
00172
00173 void TransportPublisherLink::onMessage(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
00174 {
00175 if (!success && !conn)
00176 return;
00177
00178 ROS_ASSERT(conn == connection_);
00179
00180 if (success)
00181 {
00182 handleMessage(SerializedMessage(buffer, size), true, false);
00183 }
00184
00185 if (success || !connection_->getTransport()->requiresHeader())
00186 {
00187 connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4));
00188 }
00189 }
00190
00191 void TransportPublisherLink::onRetryTimer(const ros::WallTimerEvent&)
00192 {
00193 if (dropping_)
00194 {
00195 return;
00196 }
00197
00198 if (needs_retry_ && WallTime::now() > next_retry_)
00199 {
00200 retry_period_ = std::min(retry_period_ * 2, WallDuration(20));
00201 needs_retry_ = false;
00202 SubscriptionPtr parent = parent_.lock();
00203
00204
00205
00206
00207 if (connection_->getTransport()->getType() == std::string("TCPROS"))
00208 {
00209 std::string topic = parent ? parent->getName() : "unknown";
00210
00211 TransportTCPPtr old_transport = boost::dynamic_pointer_cast<TransportTCP>(connection_->getTransport());
00212 ROS_ASSERT(old_transport);
00213 const std::string& host = old_transport->getConnectedHost();
00214 int port = old_transport->getConnectedPort();
00215
00216 ROSCPP_LOG_DEBUG("Retrying connection to [%s:%d] for topic [%s]", host.c_str(), port, topic.c_str());
00217
00218 TransportTCPPtr transport(new TransportTCP(&PollManager::instance()->getPollSet()));
00219 if (transport->connect(host, port))
00220 {
00221 ConnectionPtr connection(new Connection);
00222 connection->initialize(transport, false, HeaderReceivedFunc());
00223 initialize(connection);
00224
00225 ConnectionManager::instance()->addConnection(connection);
00226 }
00227 else
00228 {
00229 ROSCPP_LOG_DEBUG("connect() failed when retrying connection to [%s:%d] for topic [%s]", host.c_str(), port, topic.c_str());
00230 }
00231 }
00232 else if (parent)
00233 {
00234 parent->removePublisherLink(shared_from_this());
00235 }
00236 }
00237 }
00238
00239 CallbackQueuePtr getInternalCallbackQueue();
00240
00241 void TransportPublisherLink::onConnectionDropped(const ConnectionPtr& conn, Connection::DropReason reason)
00242 {
00243 if (dropping_)
00244 {
00245 return;
00246 }
00247
00248 ROS_ASSERT(conn == connection_);
00249
00250 SubscriptionPtr parent = parent_.lock();
00251
00252 if (reason == Connection::TransportDisconnect)
00253 {
00254 std::string topic = parent ? parent->getName() : "unknown";
00255
00256 ROSCPP_LOG_DEBUG("Connection to publisher [%s] to topic [%s] dropped", connection_->getTransport()->getTransportInfo().c_str(), topic.c_str());
00257
00258 ROS_ASSERT(!needs_retry_);
00259 needs_retry_ = true;
00260 next_retry_ = WallTime::now() + retry_period_;
00261
00262 if (retry_timer_handle_ == -1)
00263 {
00264 retry_period_ = WallDuration(0.1);
00265 next_retry_ = WallTime::now() + retry_period_;
00266 retry_timer_handle_ = getInternalTimerManager()->add(WallDuration(retry_period_),
00267 boost::bind(&TransportPublisherLink::onRetryTimer, this, _1), getInternalCallbackQueue().get(),
00268 VoidConstPtr(), false);
00269 }
00270 else
00271 {
00272 getInternalTimerManager()->setPeriod(retry_timer_handle_, retry_period_);
00273 }
00274 }
00275 else
00276 {
00277 drop();
00278 }
00279 }
00280
00281 void TransportPublisherLink::handleMessage(const SerializedMessage& m, bool ser, bool nocopy)
00282 {
00283 stats_.bytes_received_ += m.num_bytes;
00284 stats_.messages_received_++;
00285
00286 SubscriptionPtr parent = parent_.lock();
00287
00288 if (parent)
00289 {
00290 stats_.drops_ += parent->handleMessage(m, ser, nocopy, getConnection()->getHeader().getValues(), shared_from_this());
00291 }
00292 }
00293
00294 std::string TransportPublisherLink::getTransportType()
00295 {
00296 return connection_->getTransport()->getType();
00297 }
00298
00299 }
00300