transport_publisher_link.cpp
Go to the documentation of this file.
1 /*
2  * Software License Agreement (BSD License)
3  *
4  * Copyright (c) 2008, Willow Garage, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above
14  * copyright notice, this list of conditions and the following
15  * disclaimer in the documentation and/or other materials provided
16  * with the distribution.
17  * * Neither the name of Willow Garage, Inc. nor the names of its
18  * contributors may be used to endorse or promote products derived
19  * from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include <ros/platform.h> // platform dependendant requirements
36 
38 #include "ros/subscription.h"
39 #include "ros/header.h"
40 #include "ros/connection.h"
42 #include "ros/this_node.h"
43 #include "ros/connection_manager.h"
44 #include "ros/file_log.h"
45 #include "ros/poll_manager.h"
47 #include "ros/timer_manager.h"
48 #include "ros/callback_queue.h"
50 
51 #include <boost/bind.hpp>
52 
53 #include <sstream>
54 
55 namespace ros
56 {
57 
58 TransportPublisherLink::TransportPublisherLink(const SubscriptionPtr& parent, const std::string& xmlrpc_uri, const TransportHints& transport_hints)
59 : PublisherLink(parent, xmlrpc_uri, transport_hints)
60 , retry_timer_handle_(-1)
61 , needs_retry_(false)
62 , dropping_(false)
63 {
64 }
65 
67 {
68  dropping_ = true;
69 
70  if (retry_timer_handle_ != -1)
71  {
73  }
74 
76  connection_->removeDropListener(dropped_conn_);
77 }
78 
80 {
81  connection_ = connection;
82  // slot_type is used to automatically track the TransporPublisherLink class' existence
83  // and disconnect when this class' reference count is decremented to 0. It increments
84  // then decrements the shared_from_this reference count around calls to the
85  // onConnectionDropped function, preventing a coredump in the middle of execution.
86  dropped_conn_ = connection_->addDropListener(Connection::DropSignal::slot_type(&TransportPublisherLink::onConnectionDropped, this, boost::placeholders::_1, boost::placeholders::_2).track(shared_from_this()));
87 
88  if (connection_->getTransport()->requiresHeader())
89  {
90  connection_->setHeaderReceivedCallback(boost::bind(&TransportPublisherLink::onHeaderReceived, this, boost::placeholders::_1, boost::placeholders::_2));
91 
92  SubscriptionPtr parent = parent_.lock();
93  if (!parent)
94  {
95  return false;
96  }
97 
99  header["topic"] = parent->getName();
100  header["md5sum"] = parent->md5sum();
101  header["callerid"] = this_node::getName();
102  header["type"] = parent->datatype();
103  header["tcp_nodelay"] = transport_hints_.getTCPNoDelay() ? "1" : "0";
104  connection_->writeHeader(header, boost::bind(&TransportPublisherLink::onHeaderWritten, this, boost::placeholders::_1));
105  }
106  else
107  {
108  connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, boost::placeholders::_1, boost::placeholders::_2, boost::placeholders::_3, boost::placeholders::_4));
109  }
110 
111  return true;
112 }
113 
115 {
116  dropping_ = true;
118 
119  if (SubscriptionPtr parent = parent_.lock())
120  {
121  parent->removePublisherLink(shared_from_this());
122  }
123 }
124 
126 {
127  (void)conn;
128  // Do nothing
129 }
130 
132 {
133  (void)conn;
134  ROS_ASSERT(conn == connection_);
135 
136  if (!setHeader(header))
137  {
138  drop();
139  return false;
140  }
141 
142  if (retry_timer_handle_ != -1)
143  {
145  retry_timer_handle_ = -1;
146  }
147 
148  connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, boost::placeholders::_1, boost::placeholders::_2, boost::placeholders::_3, boost::placeholders::_4));
149 
150  return true;
151 }
152 
153 void TransportPublisherLink::onMessageLength(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
154 {
155  (void)conn;
156  (void)size;
157  if (retry_timer_handle_ != -1)
158  {
160  retry_timer_handle_ = -1;
161  }
162 
163  if (!success)
164  {
165  if (connection_)
166  connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, boost::placeholders::_1, boost::placeholders::_2, boost::placeholders::_3, boost::placeholders::_4));
167  return;
168  }
169 
170  ROS_ASSERT(conn == connection_);
171  ROS_ASSERT(size == 4);
172 
173  uint32_t len = *((uint32_t*)buffer.get());
174 
175  if (len > 1000000000)
176  {
177  ROS_ERROR("a message of over a gigabyte was " \
178  "predicted in tcpros. that seems highly " \
179  "unlikely, so I'll assume protocol " \
180  "synchronization is lost.");
181  drop();
182  return;
183  }
184 
185  connection_->read(len, boost::bind(&TransportPublisherLink::onMessage, this, boost::placeholders::_1, boost::placeholders::_2, boost::placeholders::_3, boost::placeholders::_4));
186 }
187 
188 void TransportPublisherLink::onMessage(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
189 {
190  if (!success && !conn)
191  return;
192 
193  ROS_ASSERT(conn == connection_);
194 
195  if (success)
196  {
197  handleMessage(SerializedMessage(buffer, size), true, false);
198  }
199 
200  if (success || !connection_->getTransport()->requiresHeader())
201  {
202  connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, boost::placeholders::_1, boost::placeholders::_2, boost::placeholders::_3, boost::placeholders::_4));
203  }
204 }
205 
207 {
208  if (dropping_)
209  {
210  return;
211  }
212 
214  {
215  retry_period_ = std::min(retry_period_ * 2, WallDuration(20));
216  needs_retry_ = false;
217  SubscriptionPtr parent = parent_.lock();
218  // TODO: support retry on more than just TCP
219  // For now, since UDP does not have a heartbeat, we do not attempt to retry
220  // UDP connections since an error there likely means some invalid operation has
221  // happened.
222  if (connection_->getTransport()->getType() == std::string("TCPROS"))
223  {
224  std::string topic = parent ? parent->getName() : "unknown";
225 
226  TransportTCPPtr old_transport = boost::dynamic_pointer_cast<TransportTCP>(connection_->getTransport());
227  ROS_ASSERT(old_transport);
228  const std::string& host = old_transport->getConnectedHost();
229  int port = old_transport->getConnectedPort();
230 
231  ROSCPP_CONN_LOG_DEBUG("Retrying connection to [%s:%d] for topic [%s]", host.c_str(), port, topic.c_str());
232 
233  TransportTCPPtr transport(boost::make_shared<TransportTCP>(&PollManager::instance()->getPollSet()));
234  if (transport->connect(host, port))
235  {
236  ConnectionPtr connection(boost::make_shared<Connection>());
237  connection->initialize(transport, false, HeaderReceivedFunc());
238  if (initialize(connection))
239  {
240  ConnectionManager::instance()->addConnection(connection);
241  }
242  }
243  else
244  {
245  ROSCPP_CONN_LOG_DEBUG("connect() failed when retrying connection to [%s:%d] for topic [%s]", host.c_str(), port, topic.c_str());
246  }
247  }
248  else if (parent)
249  {
250  parent->removePublisherLink(shared_from_this());
251  }
252  }
253 }
254 
256 
258 {
259  (void)conn;
260  if (dropping_)
261  {
262  return;
263  }
264 
265  ROS_ASSERT(conn == connection_);
266 
267  SubscriptionPtr parent = parent_.lock();
268 
269  if (reason == Connection::TransportDisconnect)
270  {
271  std::string topic = parent ? parent->getName() : "unknown";
272 
273  ROSCPP_CONN_LOG_DEBUG("Connection to publisher [%s] to topic [%s] dropped", connection_->getTransport()->getTransportInfo().c_str(), topic.c_str());
274 
276  needs_retry_ = true;
278 
279  if (retry_timer_handle_ == -1)
280  {
283  // shared_from_this() shared_ptr is used to ensure TransportPublisherLink is not
284  // destroyed in the middle of onRetryTimer execution
286  boost::bind(&TransportPublisherLink::onRetryTimer, this, boost::placeholders::_1), getInternalCallbackQueue().get(),
287  shared_from_this(), false);
288  }
289  else
290  {
292  }
293  }
294  else
295  {
296  drop();
297  }
298 }
299 
300 void TransportPublisherLink::handleMessage(const SerializedMessage& m, bool ser, bool nocopy)
301 {
304 
305  SubscriptionPtr parent = parent_.lock();
306 
307  if (parent)
308  {
309  stats_.drops_ += parent->handleMessage(m, ser, nocopy, getConnection()->getHeader().getValues(), shared_from_this());
310  }
311 }
312 
314 {
315  return connection_->getTransport()->getType();
316 }
317 
319 {
320  return connection_->getTransport()->getTransportInfo();
321 }
322 
323 } // namespace ros
324 
connection_manager.h
this_node.h
ros::SerializedMessage
ROSCPP_CONN_LOG_DEBUG
#define ROSCPP_CONN_LOG_DEBUG(...)
Definition: file_log.h:36
boost::shared_ptr
ros
ros::Header
ros::TransportHints
Provides a way of specifying network transport hints to ros::NodeHandle::subscribe() and someday ros:...
Definition: transport_hints.h:54
ros::Connection::TransportDisconnect
@ TransportDisconnect
Definition: connection.h:75
subscription.h
ros::SteadyTime::now
static SteadyTime now()
timer_manager.h
transport_tcp.h
boost::shared_array< uint8_t >
ros::SteadyTimerEvent
Structure passed as a parameter to the callback invoked by a ros::SteadyTimer.
Definition: forwards.h:172
ros::PollManager::instance
static const PollManagerPtr & instance()
Definition: poll_manager.cpp:36
ros::SerializedMessage::num_bytes
size_t num_bytes
ros::Connection::DropReason
DropReason
Definition: connection.h:73
ros::Connection::Destructing
@ Destructing
Definition: connection.h:77
ros::this_node::getName
const ROSCPP_DECL std::string & getName()
Returns the name of the current node.
Definition: this_node.cpp:74
connection.h
poll_manager.h
ros::param::get
ROSCPP_DECL bool get(const std::string &key, std::string &s)
Get a string value from the parameter server.
Definition: param.cpp:409
internal_timer_manager.h
ros::HeaderReceivedFunc
boost::function< bool(const ConnectionPtr &, const Header &)> HeaderReceivedFunc
Definition: connection.h:62
ros::getInternalTimerManager
ROSCPP_DECL InternalTimerManagerPtr getInternalTimerManager()
Definition: internal_timer_manager.cpp:36
ROS_ERROR
#define ROS_ERROR(...)
ros::ConnectionManager::instance
static const ConnectionManagerPtr & instance()
Definition: connection_manager.cpp:43
header.h
ros::getInternalCallbackQueue
CallbackQueuePtr getInternalCallbackQueue()
Definition: init.cpp:261
transport.h
platform.h
callback_queue.h
ros::WallDuration
header
const std::string header
ROS_ASSERT
#define ROS_ASSERT(cond)
ros::TransportHints::getTCPNoDelay
bool getTCPNoDelay()
Returns whether or not this TransportHints has specified TCP_NODELAY.
Definition: transport_hints.h:91
file_log.h
ros::M_string
std::map< std::string, std::string > M_string


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas , Jacob Perron
autogenerated on Thu Nov 23 2023 04:01:44