$search
00001 /* 00002 * Software License Agreement (BSD License) 00003 * 00004 * Copyright (c) 2008, Willow Garage, Inc. 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions 00009 * are met: 00010 * 00011 * * Redistributions of source code must retain the above copyright 00012 * notice, this list of conditions and the following disclaimer. 00013 * * Redistributions in binary form must reproduce the above 00014 * copyright notice, this list of conditions and the following 00015 * disclaimer in the documentation and/or other materials provided 00016 * with the distribution. 00017 * * Neither the name of Willow Garage, Inc. nor the names of its 00018 * contributors may be used to endorse or promote products derived 00019 * from this software without specific prior written permission. 00020 * 00021 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 * POSSIBILITY OF SUCH DAMAGE. 00033 */ 00034 00035 #include <ros/platform.h> // platform dependendant requirements 00036 00037 #include "ros/transport_publisher_link.h" 00038 #include "ros/subscription.h" 00039 #include "ros/header.h" 00040 #include "ros/connection.h" 00041 #include "ros/transport/transport.h" 00042 #include "ros/this_node.h" 00043 #include "ros/connection_manager.h" 00044 #include "ros/file_log.h" 00045 #include "ros/poll_manager.h" 00046 #include "ros/transport/transport_tcp.h" 00047 #include "ros/timer_manager.h" 00048 #include "ros/callback_queue.h" 00049 #include "ros/internal_timer_manager.h" 00050 00051 #include <boost/bind.hpp> 00052 00053 #include <sstream> 00054 00055 namespace ros 00056 { 00057 00058 TransportPublisherLink::TransportPublisherLink(const SubscriptionPtr& parent, const std::string& xmlrpc_uri, const TransportHints& transport_hints) 00059 : PublisherLink(parent, xmlrpc_uri, transport_hints) 00060 , retry_timer_handle_(-1) 00061 , needs_retry_(false) 00062 , dropping_(false) 00063 { 00064 } 00065 00066 TransportPublisherLink::~TransportPublisherLink() 00067 { 00068 dropping_ = true; 00069 00070 if (retry_timer_handle_ != -1) 00071 { 00072 getInternalTimerManager()->remove(retry_timer_handle_); 00073 } 00074 00075 connection_->drop(Connection::Destructing); 00076 } 00077 00078 bool TransportPublisherLink::initialize(const ConnectionPtr& connection) 00079 { 00080 connection_ = connection; 00081 connection_->addDropListener(boost::bind(&TransportPublisherLink::onConnectionDropped, this, _1, _2)); 00082 00083 if (connection_->getTransport()->requiresHeader()) 00084 { 00085 connection_->setHeaderReceivedCallback(boost::bind(&TransportPublisherLink::onHeaderReceived, this, _1, _2)); 00086 00087 SubscriptionPtr parent = parent_.lock(); 00088 00089 M_string header; 00090 header["topic"] = parent->getName(); 00091 header["md5sum"] = parent->md5sum(); 00092 header["callerid"] = this_node::getName(); 00093 header["type"] = parent->datatype(); 00094 header["tcp_nodelay"] = transport_hints_.getTCPNoDelay() ? "1" : "0"; 00095 connection_->writeHeader(header, boost::bind(&TransportPublisherLink::onHeaderWritten, this, _1)); 00096 } 00097 else 00098 { 00099 connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4)); 00100 } 00101 00102 return true; 00103 } 00104 00105 void TransportPublisherLink::drop() 00106 { 00107 dropping_ = true; 00108 connection_->drop(Connection::Destructing); 00109 00110 if (SubscriptionPtr parent = parent_.lock()) 00111 { 00112 parent->removePublisherLink(shared_from_this()); 00113 } 00114 } 00115 00116 void TransportPublisherLink::onHeaderWritten(const ConnectionPtr& conn) 00117 { 00118 // Do nothing 00119 } 00120 00121 bool TransportPublisherLink::onHeaderReceived(const ConnectionPtr& conn, const Header& header) 00122 { 00123 ROS_ASSERT(conn == connection_); 00124 00125 if (!setHeader(header)) 00126 { 00127 drop(); 00128 return false; 00129 } 00130 00131 if (retry_timer_handle_ != -1) 00132 { 00133 getInternalTimerManager()->remove(retry_timer_handle_); 00134 retry_timer_handle_ = -1; 00135 } 00136 00137 connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4)); 00138 00139 return true; 00140 } 00141 00142 void TransportPublisherLink::onMessageLength(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success) 00143 { 00144 if (retry_timer_handle_ != -1) 00145 { 00146 getInternalTimerManager()->remove(retry_timer_handle_); 00147 retry_timer_handle_ = -1; 00148 } 00149 00150 if (!success) 00151 { 00152 if (connection_) 00153 connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4)); 00154 return; 00155 } 00156 00157 ROS_ASSERT(conn == connection_); 00158 ROS_ASSERT(size == 4); 00159 00160 uint32_t len = *((uint32_t*)buffer.get()); 00161 00162 if (len > 1000000000) 00163 { 00164 ROS_ERROR("a message of over a gigabyte was " \ 00165 "predicted in tcpros. that seems highly " \ 00166 "unlikely, so I'll assume protocol " \ 00167 "synchronization is lost."); 00168 drop(); 00169 return; 00170 } 00171 00172 connection_->read(len, boost::bind(&TransportPublisherLink::onMessage, this, _1, _2, _3, _4)); 00173 } 00174 00175 void TransportPublisherLink::onMessage(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success) 00176 { 00177 if (!success && !conn) 00178 return; 00179 00180 ROS_ASSERT(conn == connection_); 00181 00182 if (success) 00183 { 00184 handleMessage(SerializedMessage(buffer, size), true, false); 00185 } 00186 00187 if (success || !connection_->getTransport()->requiresHeader()) 00188 { 00189 connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4)); 00190 } 00191 } 00192 00193 void TransportPublisherLink::onRetryTimer(const ros::WallTimerEvent&) 00194 { 00195 if (dropping_) 00196 { 00197 return; 00198 } 00199 00200 if (needs_retry_ && WallTime::now() > next_retry_) 00201 { 00202 retry_period_ = std::min(retry_period_ * 2, WallDuration(20)); 00203 needs_retry_ = false; 00204 SubscriptionPtr parent = parent_.lock(); 00205 // TODO: support retry on more than just TCP 00206 // For now, since UDP does not have a heartbeat, we do not attempt to retry 00207 // UDP connections since an error there likely means some invalid operation has 00208 // happened. 00209 if (connection_->getTransport()->getType() == std::string("TCPROS")) 00210 { 00211 std::string topic = parent ? parent->getName() : "unknown"; 00212 00213 TransportTCPPtr old_transport = boost::dynamic_pointer_cast<TransportTCP>(connection_->getTransport()); 00214 ROS_ASSERT(old_transport); 00215 const std::string& host = old_transport->getConnectedHost(); 00216 int port = old_transport->getConnectedPort(); 00217 00218 ROSCPP_LOG_DEBUG("Retrying connection to [%s:%d] for topic [%s]", host.c_str(), port, topic.c_str()); 00219 00220 TransportTCPPtr transport(new TransportTCP(&PollManager::instance()->getPollSet())); 00221 if (transport->connect(host, port)) 00222 { 00223 ConnectionPtr connection(new Connection); 00224 connection->initialize(transport, false, HeaderReceivedFunc()); 00225 initialize(connection); 00226 00227 ConnectionManager::instance()->addConnection(connection); 00228 } 00229 else 00230 { 00231 ROSCPP_LOG_DEBUG("connect() failed when retrying connection to [%s:%d] for topic [%s]", host.c_str(), port, topic.c_str()); 00232 } 00233 } 00234 else if (parent) 00235 { 00236 parent->removePublisherLink(shared_from_this()); 00237 } 00238 } 00239 } 00240 00241 CallbackQueuePtr getInternalCallbackQueue(); 00242 00243 void TransportPublisherLink::onConnectionDropped(const ConnectionPtr& conn, Connection::DropReason reason) 00244 { 00245 if (dropping_) 00246 { 00247 return; 00248 } 00249 00250 ROS_ASSERT(conn == connection_); 00251 00252 SubscriptionPtr parent = parent_.lock(); 00253 00254 if (reason == Connection::TransportDisconnect) 00255 { 00256 std::string topic = parent ? parent->getName() : "unknown"; 00257 00258 ROSCPP_LOG_DEBUG("Connection to publisher [%s] to topic [%s] dropped", connection_->getTransport()->getTransportInfo().c_str(), topic.c_str()); 00259 00260 ROS_ASSERT(!needs_retry_); 00261 needs_retry_ = true; 00262 next_retry_ = WallTime::now() + retry_period_; 00263 00264 if (retry_timer_handle_ == -1) 00265 { 00266 retry_period_ = WallDuration(0.1); 00267 next_retry_ = WallTime::now() + retry_period_; 00268 retry_timer_handle_ = getInternalTimerManager()->add(WallDuration(retry_period_), 00269 boost::bind(&TransportPublisherLink::onRetryTimer, this, _1), getInternalCallbackQueue().get(), 00270 VoidConstPtr(), false); 00271 } 00272 else 00273 { 00274 getInternalTimerManager()->setPeriod(retry_timer_handle_, retry_period_); 00275 } 00276 } 00277 else 00278 { 00279 drop(); 00280 } 00281 } 00282 00283 void TransportPublisherLink::handleMessage(const SerializedMessage& m, bool ser, bool nocopy) 00284 { 00285 stats_.bytes_received_ += m.num_bytes; 00286 stats_.messages_received_++; 00287 00288 SubscriptionPtr parent = parent_.lock(); 00289 00290 if (parent) 00291 { 00292 stats_.drops_ += parent->handleMessage(m, ser, nocopy, getConnection()->getHeader().getValues(), shared_from_this()); 00293 } 00294 } 00295 00296 std::string TransportPublisherLink::getTransportType() 00297 { 00298 return connection_->getTransport()->getType(); 00299 } 00300 00301 } // namespace ros 00302