transport_publisher_link.cpp
Go to the documentation of this file.
1 /*
2  * Software License Agreement (BSD License)
3  *
4  * Copyright (c) 2008, Willow Garage, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above
14  * copyright notice, this list of conditions and the following
15  * disclaimer in the documentation and/or other materials provided
16  * with the distribution.
17  * * Neither the name of Willow Garage, Inc. nor the names of its
18  * contributors may be used to endorse or promote products derived
19  * from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include <ros/platform.h> // platform dependendant requirements
36 
38 #include "ros/subscription.h"
39 #include "ros/header.h"
40 #include "ros/connection.h"
42 #include "ros/this_node.h"
43 #include "ros/connection_manager.h"
44 #include "ros/file_log.h"
45 #include "ros/poll_manager.h"
47 #include "ros/timer_manager.h"
48 #include "ros/callback_queue.h"
50 
51 #include <boost/bind.hpp>
52 
53 #include <sstream>
54 
55 namespace ros
56 {
57 
58 TransportPublisherLink::TransportPublisherLink(const SubscriptionPtr& parent, const std::string& xmlrpc_uri, const TransportHints& transport_hints)
59 : PublisherLink(parent, xmlrpc_uri, transport_hints)
60 , retry_timer_handle_(-1)
61 , needs_retry_(false)
62 , dropping_(false)
63 {
64 }
65 
67 {
68  dropping_ = true;
69 
70  if (retry_timer_handle_ != -1)
71  {
73  }
74 
76 }
77 
79 {
80  connection_ = connection;
81  // slot_type is used to automatically track the TransporPublisherLink class' existence
82  // and disconnect when this class' reference count is decremented to 0. It increments
83  // then decrements the shared_from_this reference count around calls to the
84  // onConnectionDropped function, preventing a coredump in the middle of execution.
85  connection_->addDropListener(Connection::DropSignal::slot_type(&TransportPublisherLink::onConnectionDropped, this, _1, _2).track(shared_from_this()));
86 
87  if (connection_->getTransport()->requiresHeader())
88  {
89  connection_->setHeaderReceivedCallback(boost::bind(&TransportPublisherLink::onHeaderReceived, this, _1, _2));
90 
91  SubscriptionPtr parent = parent_.lock();
92  if (!parent)
93  {
94  return false;
95  }
96 
98  header["topic"] = parent->getName();
99  header["md5sum"] = parent->md5sum();
100  header["callerid"] = this_node::getName();
101  header["type"] = parent->datatype();
102  header["tcp_nodelay"] = transport_hints_.getTCPNoDelay() ? "1" : "0";
103  connection_->writeHeader(header, boost::bind(&TransportPublisherLink::onHeaderWritten, this, _1));
104  }
105  else
106  {
107  connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4));
108  }
109 
110  return true;
111 }
112 
114 {
115  dropping_ = true;
117 
118  if (SubscriptionPtr parent = parent_.lock())
119  {
120  parent->removePublisherLink(shared_from_this());
121  }
122 }
123 
125 {
126  (void)conn;
127  // Do nothing
128 }
129 
131 {
132  (void)conn;
133  ROS_ASSERT(conn == connection_);
134 
135  if (!setHeader(header))
136  {
137  drop();
138  return false;
139  }
140 
141  if (retry_timer_handle_ != -1)
142  {
144  retry_timer_handle_ = -1;
145  }
146 
147  connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4));
148 
149  return true;
150 }
151 
152 void TransportPublisherLink::onMessageLength(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
153 {
154  (void)conn;
155  (void)size;
156  if (retry_timer_handle_ != -1)
157  {
159  retry_timer_handle_ = -1;
160  }
161 
162  if (!success)
163  {
164  if (connection_)
165  connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4));
166  return;
167  }
168 
169  ROS_ASSERT(conn == connection_);
170  ROS_ASSERT(size == 4);
171 
172  uint32_t len = *((uint32_t*)buffer.get());
173 
174  if (len > 1000000000)
175  {
176  ROS_ERROR("a message of over a gigabyte was " \
177  "predicted in tcpros. that seems highly " \
178  "unlikely, so I'll assume protocol " \
179  "synchronization is lost.");
180  drop();
181  return;
182  }
183 
184  connection_->read(len, boost::bind(&TransportPublisherLink::onMessage, this, _1, _2, _3, _4));
185 }
186 
187 void TransportPublisherLink::onMessage(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
188 {
189  if (!success && !conn)
190  return;
191 
192  ROS_ASSERT(conn == connection_);
193 
194  if (success)
195  {
196  handleMessage(SerializedMessage(buffer, size), true, false);
197  }
198 
199  if (success || !connection_->getTransport()->requiresHeader())
200  {
201  connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4));
202  }
203 }
204 
206 {
207  if (dropping_)
208  {
209  return;
210  }
211 
213  {
214  retry_period_ = std::min(retry_period_ * 2, WallDuration(20));
215  needs_retry_ = false;
216  SubscriptionPtr parent = parent_.lock();
217  // TODO: support retry on more than just TCP
218  // For now, since UDP does not have a heartbeat, we do not attempt to retry
219  // UDP connections since an error there likely means some invalid operation has
220  // happened.
221  if (connection_->getTransport()->getType() == std::string("TCPROS"))
222  {
223  std::string topic = parent ? parent->getName() : "unknown";
224 
225  TransportTCPPtr old_transport = boost::dynamic_pointer_cast<TransportTCP>(connection_->getTransport());
226  ROS_ASSERT(old_transport);
227  const std::string& host = old_transport->getConnectedHost();
228  int port = old_transport->getConnectedPort();
229 
230  ROSCPP_CONN_LOG_DEBUG("Retrying connection to [%s:%d] for topic [%s]", host.c_str(), port, topic.c_str());
231 
232  TransportTCPPtr transport(boost::make_shared<TransportTCP>(&PollManager::instance()->getPollSet()));
233  if (transport->connect(host, port))
234  {
235  ConnectionPtr connection(boost::make_shared<Connection>());
236  connection->initialize(transport, false, HeaderReceivedFunc());
237  if (initialize(connection))
238  {
239  ConnectionManager::instance()->addConnection(connection);
240  }
241  }
242  else
243  {
244  ROSCPP_CONN_LOG_DEBUG("connect() failed when retrying connection to [%s:%d] for topic [%s]", host.c_str(), port, topic.c_str());
245  }
246  }
247  else if (parent)
248  {
249  parent->removePublisherLink(shared_from_this());
250  }
251  }
252 }
253 
255 
257 {
258  (void)conn;
259  if (dropping_)
260  {
261  return;
262  }
263 
264  ROS_ASSERT(conn == connection_);
265 
266  SubscriptionPtr parent = parent_.lock();
267 
268  if (reason == Connection::TransportDisconnect)
269  {
270  std::string topic = parent ? parent->getName() : "unknown";
271 
272  ROSCPP_CONN_LOG_DEBUG("Connection to publisher [%s] to topic [%s] dropped", connection_->getTransport()->getTransportInfo().c_str(), topic.c_str());
273 
275  needs_retry_ = true;
277 
278  if (retry_timer_handle_ == -1)
279  {
282  // shared_from_this() shared_ptr is used to ensure TransportPublisherLink is not
283  // destroyed in the middle of onRetryTimer execution
285  boost::bind(&TransportPublisherLink::onRetryTimer, this, _1), getInternalCallbackQueue().get(),
286  shared_from_this(), false);
287  }
288  else
289  {
291  }
292  }
293  else
294  {
295  drop();
296  }
297 }
298 
299 void TransportPublisherLink::handleMessage(const SerializedMessage& m, bool ser, bool nocopy)
300 {
303 
304  SubscriptionPtr parent = parent_.lock();
305 
306  if (parent)
307  {
308  stats_.drops_ += parent->handleMessage(m, ser, nocopy, getConnection()->getHeader().getValues(), shared_from_this());
309  }
310 }
311 
313 {
314  return connection_->getTransport()->getType();
315 }
316 
318 {
319  return connection_->getTransport()->getTransportInfo();
320 }
321 
322 } // namespace ros
323 
bool getTCPNoDelay()
Returns whether or not this TransportHints has specified TCP_NODELAY.
ROSCPP_DECL InternalTimerManagerPtr getInternalTimerManager()
ROSCPP_DECL const std::string & getName()
Returns the name of the current node.
Definition: this_node.cpp:74
TCPROS transport.
Definition: transport_tcp.h:56
boost::function< bool(const ConnectionPtr &, const Header &)> HeaderReceivedFunc
Definition: connection.h:62
Provides a way of specifying network transport hints to ros::NodeHandle::subscribe() and someday ros:...
std::map< std::string, std::string > M_string
static SteadyTime now()
static const ConnectionManagerPtr & instance()
CallbackQueuePtr getInternalCallbackQueue()
Definition: init.cpp:260
Structure passed as a parameter to the callback invoked by a ros::SteadyTimer.
Definition: forwards.h:171
static const PollManagerPtr & instance()
#define ROSCPP_CONN_LOG_DEBUG(...)
Definition: file_log.h:36
const std::string header
#define ROS_ASSERT(cond)
#define ROS_ERROR(...)


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas
autogenerated on Mon Feb 28 2022 23:33:27