transport_publisher_link.cpp
Go to the documentation of this file.
1 /*
2  * Software License Agreement (BSD License)
3  *
4  * Copyright (c) 2008, Willow Garage, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above
14  * copyright notice, this list of conditions and the following
15  * disclaimer in the documentation and/or other materials provided
16  * with the distribution.
17  * * Neither the name of Willow Garage, Inc. nor the names of its
18  * contributors may be used to endorse or promote products derived
19  * from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include <ros/platform.h> // platform dependendant requirements
36 
38 #include "ros/subscription.h"
39 #include "ros/header.h"
40 #include "ros/connection.h"
42 #include "ros/this_node.h"
43 #include "ros/connection_manager.h"
44 #include "ros/file_log.h"
45 #include "ros/poll_manager.h"
47 #include "ros/timer_manager.h"
48 #include "ros/callback_queue.h"
50 
51 #include <boost/bind.hpp>
52 
53 #include <sstream>
54 
55 namespace ros
56 {
57 
58 TransportPublisherLink::TransportPublisherLink(const SubscriptionPtr& parent, const std::string& xmlrpc_uri, const TransportHints& transport_hints)
59 : PublisherLink(parent, xmlrpc_uri, transport_hints)
60 , retry_timer_handle_(-1)
61 , needs_retry_(false)
62 , dropping_(false)
63 {
64 }
65 
67 {
68  dropping_ = true;
69 
70  if (retry_timer_handle_ != -1)
71  {
73  }
74 
76 }
77 
79 {
80  connection_ = connection;
81  // slot_type is used to automatically track the TransporPublisherLink class' existence
82  // and disconnect when this class' reference count is decremented to 0. It increments
83  // then decrements the shared_from_this reference count around calls to the
84  // onConnectionDropped function, preventing a coredump in the middle of execution.
85  connection_->addDropListener(Connection::DropSignal::slot_type(&TransportPublisherLink::onConnectionDropped, this, _1, _2).track(shared_from_this()));
86 
87  if (connection_->getTransport()->requiresHeader())
88  {
89  connection_->setHeaderReceivedCallback(boost::bind(&TransportPublisherLink::onHeaderReceived, this, _1, _2));
90 
91  SubscriptionPtr parent = parent_.lock();
92 
94  header["topic"] = parent->getName();
95  header["md5sum"] = parent->md5sum();
96  header["callerid"] = this_node::getName();
97  header["type"] = parent->datatype();
98  header["tcp_nodelay"] = transport_hints_.getTCPNoDelay() ? "1" : "0";
99  connection_->writeHeader(header, boost::bind(&TransportPublisherLink::onHeaderWritten, this, _1));
100  }
101  else
102  {
103  connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4));
104  }
105 
106  return true;
107 }
108 
110 {
111  dropping_ = true;
113 
114  if (SubscriptionPtr parent = parent_.lock())
115  {
116  parent->removePublisherLink(shared_from_this());
117  }
118 }
119 
121 {
122  (void)conn;
123  // Do nothing
124 }
125 
127 {
128  (void)conn;
129  ROS_ASSERT(conn == connection_);
130 
131  if (!setHeader(header))
132  {
133  drop();
134  return false;
135  }
136 
137  if (retry_timer_handle_ != -1)
138  {
140  retry_timer_handle_ = -1;
141  }
142 
143  connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4));
144 
145  return true;
146 }
147 
148 void TransportPublisherLink::onMessageLength(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
149 {
150  (void)conn;
151  (void)size;
152  if (retry_timer_handle_ != -1)
153  {
155  retry_timer_handle_ = -1;
156  }
157 
158  if (!success)
159  {
160  if (connection_)
161  connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4));
162  return;
163  }
164 
165  ROS_ASSERT(conn == connection_);
166  ROS_ASSERT(size == 4);
167 
168  uint32_t len = *((uint32_t*)buffer.get());
169 
170  if (len > 1000000000)
171  {
172  ROS_ERROR("a message of over a gigabyte was " \
173  "predicted in tcpros. that seems highly " \
174  "unlikely, so I'll assume protocol " \
175  "synchronization is lost.");
176  drop();
177  return;
178  }
179 
180  connection_->read(len, boost::bind(&TransportPublisherLink::onMessage, this, _1, _2, _3, _4));
181 }
182 
183 void TransportPublisherLink::onMessage(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
184 {
185  if (!success && !conn)
186  return;
187 
188  ROS_ASSERT(conn == connection_);
189 
190  if (success)
191  {
192  handleMessage(SerializedMessage(buffer, size), true, false);
193  }
194 
195  if (success || !connection_->getTransport()->requiresHeader())
196  {
197  connection_->read(4, boost::bind(&TransportPublisherLink::onMessageLength, this, _1, _2, _3, _4));
198  }
199 }
200 
202 {
203  if (dropping_)
204  {
205  return;
206  }
207 
209  {
210  retry_period_ = std::min(retry_period_ * 2, WallDuration(20));
211  needs_retry_ = false;
212  SubscriptionPtr parent = parent_.lock();
213  // TODO: support retry on more than just TCP
214  // For now, since UDP does not have a heartbeat, we do not attempt to retry
215  // UDP connections since an error there likely means some invalid operation has
216  // happened.
217  if (connection_->getTransport()->getType() == std::string("TCPROS"))
218  {
219  std::string topic = parent ? parent->getName() : "unknown";
220 
221  TransportTCPPtr old_transport = boost::dynamic_pointer_cast<TransportTCP>(connection_->getTransport());
222  ROS_ASSERT(old_transport);
223  const std::string& host = old_transport->getConnectedHost();
224  int port = old_transport->getConnectedPort();
225 
226  ROSCPP_CONN_LOG_DEBUG("Retrying connection to [%s:%d] for topic [%s]", host.c_str(), port, topic.c_str());
227 
228  TransportTCPPtr transport(boost::make_shared<TransportTCP>(&PollManager::instance()->getPollSet()));
229  if (transport->connect(host, port))
230  {
231  ConnectionPtr connection(boost::make_shared<Connection>());
232  connection->initialize(transport, false, HeaderReceivedFunc());
233  initialize(connection);
234 
235  ConnectionManager::instance()->addConnection(connection);
236  }
237  else
238  {
239  ROSCPP_CONN_LOG_DEBUG("connect() failed when retrying connection to [%s:%d] for topic [%s]", host.c_str(), port, topic.c_str());
240  }
241  }
242  else if (parent)
243  {
244  parent->removePublisherLink(shared_from_this());
245  }
246  }
247 }
248 
250 
252 {
253  (void)conn;
254  if (dropping_)
255  {
256  return;
257  }
258 
259  ROS_ASSERT(conn == connection_);
260 
261  SubscriptionPtr parent = parent_.lock();
262 
263  if (reason == Connection::TransportDisconnect)
264  {
265  std::string topic = parent ? parent->getName() : "unknown";
266 
267  ROSCPP_CONN_LOG_DEBUG("Connection to publisher [%s] to topic [%s] dropped", connection_->getTransport()->getTransportInfo().c_str(), topic.c_str());
268 
270  needs_retry_ = true;
272 
273  if (retry_timer_handle_ == -1)
274  {
277  // shared_from_this() shared_ptr is used to ensure TransportPublisherLink is not
278  // destroyed in the middle of onRetryTimer execution
280  boost::bind(&TransportPublisherLink::onRetryTimer, this, _1), getInternalCallbackQueue().get(),
281  shared_from_this(), false);
282  }
283  else
284  {
286  }
287  }
288  else
289  {
290  drop();
291  }
292 }
293 
294 void TransportPublisherLink::handleMessage(const SerializedMessage& m, bool ser, bool nocopy)
295 {
298 
299  SubscriptionPtr parent = parent_.lock();
300 
301  if (parent)
302  {
303  stats_.drops_ += parent->handleMessage(m, ser, nocopy, getConnection()->getHeader().getValues(), shared_from_this());
304  }
305 }
306 
308 {
309  return connection_->getTransport()->getType();
310 }
311 
313 {
314  return connection_->getTransport()->getTransportInfo();
315 }
316 
317 } // namespace ros
318 
bool getTCPNoDelay()
Returns whether or not this TransportHints has specified TCP_NODELAY.
ROSCPP_DECL InternalTimerManagerPtr getInternalTimerManager()
ROSCPP_DECL const std::string & getName()
Returns the name of the current node.
Definition: this_node.cpp:81
std_msgs::Header * header(M &m)
TCPROS transport.
Definition: transport_tcp.h:56
boost::function< bool(const ConnectionPtr &, const Header &)> HeaderReceivedFunc
Definition: connection.h:62
Provides a way of specifying network transport hints to ros::NodeHandle::subscribe() and someday ros:...
std::map< std::string, std::string > M_string
static SteadyTime now()
static const ConnectionManagerPtr & instance()
CallbackQueuePtr getInternalCallbackQueue()
Definition: init.cpp:260
Structure passed as a parameter to the callback invoked by a ros::SteadyTimer.
Definition: forwards.h:167
static const PollManagerPtr & instance()
#define ROSCPP_CONN_LOG_DEBUG(...)
Definition: file_log.h:36
#define ROS_ASSERT(cond)
#define ROS_ERROR(...)


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas
autogenerated on Mon Nov 2 2020 03:52:26