publication.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2008, Morgan Quigley and Willow Garage, Inc.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are met:
6  * * Redistributions of source code must retain the above copyright notice,
7  * this list of conditions and the following disclaimer.
8  * * Redistributions in binary form must reproduce the above copyright
9  * notice, this list of conditions and the following disclaimer in the
10  * documentation and/or other materials provided with the distribution.
11  * * Neither the names of Stanford University or Willow Garage, Inc. nor the names of its
12  * contributors may be used to endorse or promote products derived from
13  * this software without specific prior written permission.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
19  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25  * POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #include "ros/publication.h"
29 #include "ros/subscriber_link.h"
30 #include "ros/connection.h"
33 #include "ros/serialization.h"
34 #include <std_msgs/Header.h>
35 
36 namespace ros
37 {
38 
40 {
41 public:
42  PeerConnDisconnCallback(const SubscriberStatusCallback& callback, const SubscriberLinkPtr& sub_link, bool use_tracked_object, const VoidConstWPtr& tracked_object)
43  : callback_(callback)
44  , sub_link_(sub_link)
45  , use_tracked_object_(use_tracked_object)
46  , tracked_object_(tracked_object)
47  {
48  }
49 
50  virtual CallResult call()
51  {
52  VoidConstPtr tracker;
54  {
55  tracker = tracked_object_.lock();
56 
57  if (!tracker)
58  {
59  return Invalid;
60  }
61  }
62 
64  callback_(pub);
65 
66  return Success;
67  }
68 
69 private:
74 };
75 
76 Publication::Publication(const std::string &name,
77  const std::string &datatype,
78  const std::string &_md5sum,
79  const std::string& message_definition,
80  size_t max_queue,
81  bool /* unused */,
82  bool has_header)
83 : name_(name),
84  datatype_(datatype),
85  md5sum_(_md5sum),
86  message_definition_(message_definition),
87  max_queue_(max_queue),
88  seq_(0),
89  dropped_(false),
90  latch_(false),
91  has_header_(has_header),
92  intraprocess_subscriber_count_(0)
93 {
94 }
95 
97 {
98  drop();
99 }
100 
102 {
103  boost::mutex::scoped_lock lock(callbacks_mutex_);
104 
105  callbacks_.push_back(callbacks);
106 
107  // Add connect callbacks for all current subscriptions if this publisher wants them
108  if (callbacks->connect_ && callbacks->callback_queue_)
109  {
110  boost::mutex::scoped_lock lock(subscriber_links_mutex_);
111  V_SubscriberLink::iterator it = subscriber_links_.begin();
112  V_SubscriberLink::iterator end = subscriber_links_.end();
113  for (; it != end; ++it)
114  {
115  const SubscriberLinkPtr& sub_link = *it;
116  CallbackInterfacePtr cb(boost::make_shared<PeerConnDisconnCallback>(callbacks->connect_, sub_link, callbacks->has_tracked_object_, callbacks->tracked_object_));
117  callbacks->callback_queue_->addCallback(cb, (uint64_t)callbacks.get());
118  }
119  }
120 
121  // Publication singleton is latched if any of its callbacks have a latched message handler.
122  if (callbacks->push_latched_message_)
123  {
124  latch_ = true;
125  }
126 }
127 
129 {
130  boost::mutex::scoped_lock lock(callbacks_mutex_);
131 
132  V_Callback::iterator it = std::find(callbacks_.begin(), callbacks_.end(), callbacks);
133  if (it != callbacks_.end())
134  {
135  const SubscriberCallbacksPtr& cb = *it;
136  if (cb->callback_queue_)
137  {
138  cb->callback_queue_->removeByID((uint64_t)cb.get());
139  }
140  callbacks_.erase(it);
141  }
142 
143  // IF the removed callbacks was latched, check for remaining latched callbacks,
144  // and if none remain, clear the latch status on the publication singleton.
145  if (callbacks->push_latched_message_)
146  {
147  V_Callback::iterator it = callbacks_.begin();
148  V_Callback::iterator end = callbacks_.end();
149  for (; it != end; ++it)
150  {
151  if ((*it)->push_latched_message_)
152  {
153  return;
154  }
155  }
156  latch_ = false;
157  }
158 }
159 
161 {
162  // grab a lock here, to ensure that no subscription callback will
163  // be invoked after we return
164  {
165  boost::mutex::scoped_lock lock(publish_queue_mutex_);
166  boost::mutex::scoped_lock lock2(subscriber_links_mutex_);
167 
168  if (dropped_)
169  {
170  return;
171  }
172 
173  dropped_ = true;
174  }
175 
177 }
178 
180 {
181  boost::mutex::scoped_lock lock(subscriber_links_mutex_);
182  if (dropped_)
183  {
184  return false;
185  }
186 
187  ROS_ASSERT(m.buf);
188 
189  uint32_t seq = incrementSequence();
190  if (has_header_)
191  {
192  // If we have a header, we know it's immediately after the message length
193  // Deserialize it, write the sequence, and then serialize it again.
194  namespace ser = ros::serialization;
195  std_msgs::Header header;
196  ser::IStream istream(m.buf.get() + 4, m.num_bytes - 4);
197  ser::deserialize(istream, header);
198  header.seq = seq;
199  ser::OStream ostream(m.buf.get() + 4, m.num_bytes - 4);
200  ser::serialize(ostream, header);
201  }
202 
203  for(V_SubscriberLink::iterator i = subscriber_links_.begin();
204  i != subscriber_links_.end(); ++i)
205  {
206  const SubscriberLinkPtr& sub_link = (*i);
207  sub_link->enqueueMessage(m, true, false);
208  }
209 
210  return true;
211 }
212 
214 {
215  {
216  boost::mutex::scoped_lock lock(subscriber_links_mutex_);
217 
218  if (dropped_)
219  {
220  return;
221  }
222 
223  subscriber_links_.push_back(sub_link);
224 
225  if (sub_link->isIntraprocess())
226  {
228  }
229  }
230 
231  // This call invokes the subscribe callback if there is one.
232  // This must happen *after* the push_back above, in case the
233  // callback uses publish().
234  peerConnect(sub_link);
235 }
236 
238 {
239  SubscriberLinkPtr link;
240  {
241  boost::mutex::scoped_lock lock(subscriber_links_mutex_);
242 
243  if (dropped_)
244  {
245  return;
246  }
247 
248  if (sub_link->isIntraprocess())
249  {
251  }
252 
253  V_SubscriberLink::iterator it = std::find(subscriber_links_.begin(), subscriber_links_.end(), sub_link);
254  if (it != subscriber_links_.end())
255  {
256  link = *it;
257  subscriber_links_.erase(it);
258  }
259  }
260 
261  if (link)
262  {
263  peerDisconnect(link);
264  }
265 }
266 
268 {
269  XmlRpc::XmlRpcValue stats;
270  stats[0] = name_;
271  XmlRpc::XmlRpcValue conn_data;
272  conn_data.setSize(0); // force to be an array, even if it's empty
273 
274  boost::mutex::scoped_lock lock(subscriber_links_mutex_);
275 
276  uint32_t cidx = 0;
277  for (V_SubscriberLink::iterator c = subscriber_links_.begin();
278  c != subscriber_links_.end(); ++c, cidx++)
279  {
280  const SubscriberLink::Stats& s = (*c)->getStats();
281  conn_data[cidx][0] = (*c)->getConnectionID();
282  // todo: figure out what to do here... the bytes_sent will wrap around
283  // on some flows within a reasonable amount of time. xmlrpc++ doesn't
284  // seem to give me a nice way to do 64-bit ints, perhaps that's a
285  // limitation of xml-rpc, not sure. alternatively we could send the number
286  // of KB transmitted to gain a few order of magnitude.
287  conn_data[cidx][1] = (int)s.bytes_sent_;
288  conn_data[cidx][2] = (int)s.message_data_sent_;
289  conn_data[cidx][3] = (int)s.messages_sent_;
290  conn_data[cidx][4] = 0; // not sure what is meant by connected
291  }
292 
293  stats[1] = conn_data;
294  return stats;
295 }
296 
297 // Publisher : [(connection_id, destination_caller_id, direction, transport, topic_name, connected, connection_info_string)*]
298 // e.g. [(2, '/listener', 'o', 'TCPROS', '/chatter', 1, 'TCPROS connection on port 55878 to [127.0.0.1:44273 on socket 7]')]
300 {
301  boost::mutex::scoped_lock lock(subscriber_links_mutex_);
302 
303  for (V_SubscriberLink::iterator c = subscriber_links_.begin();
304  c != subscriber_links_.end(); ++c)
305  {
306  XmlRpc::XmlRpcValue curr_info;
307  curr_info[0] = (int)(*c)->getConnectionID();
308  curr_info[1] = (*c)->getDestinationCallerID();
309  curr_info[2] = "o";
310  curr_info[3] = (*c)->getTransportType();
311  curr_info[4] = name_;
312  curr_info[5] = true; // For length compatibility with rospy
313  curr_info[6] = (*c)->getTransportInfo();
314  info[info.size()] = curr_info;
315  }
316 }
317 
319 {
320  // Swap our publishers list with a local one so we can only lock for a short period of time, because a
321  // side effect of our calling drop() on connections can be re-locking the publishers mutex
322  V_SubscriberLink local_publishers;
323 
324  {
325  boost::mutex::scoped_lock lock(subscriber_links_mutex_);
326 
327  local_publishers.swap(subscriber_links_);
328  }
329 
330  for (V_SubscriberLink::iterator i = local_publishers.begin();
331  i != local_publishers.end(); ++i)
332  {
333  (*i)->drop();
334  }
335 }
336 
338 {
339  boost::mutex::scoped_lock lock(callbacks_mutex_);
340 
341  V_Callback::iterator it = callbacks_.begin();
342  V_Callback::iterator end = callbacks_.end();
343  for (; it != end; ++it)
344  {
345  const SubscriberCallbacksPtr& cbs = *it;
346  if (cbs->push_latched_message_)
347  {
348  cbs->push_latched_message_(sub_link);
349  }
350  if (cbs->connect_ && cbs->callback_queue_)
351  {
352  CallbackInterfacePtr cb(boost::make_shared<PeerConnDisconnCallback>(cbs->connect_, sub_link, cbs->has_tracked_object_, cbs->tracked_object_));
353  cbs->callback_queue_->addCallback(cb, (uint64_t)cbs.get());
354  }
355  }
356 }
357 
359 {
360  boost::mutex::scoped_lock lock(callbacks_mutex_);
361 
362  V_Callback::iterator it = callbacks_.begin();
363  V_Callback::iterator end = callbacks_.end();
364  for (; it != end; ++it)
365  {
366  const SubscriberCallbacksPtr& cbs = *it;
367  if (cbs->disconnect_ && cbs->callback_queue_)
368  {
369  CallbackInterfacePtr cb(boost::make_shared<PeerConnDisconnCallback>(cbs->disconnect_, sub_link, cbs->has_tracked_object_, cbs->tracked_object_));
370  cbs->callback_queue_->addCallback(cb, (uint64_t)cbs.get());
371  }
372  }
373 }
374 
376 {
377  boost::mutex::scoped_lock lock(callbacks_mutex_);
378  return callbacks_.size();
379 }
380 
382 {
383  boost::mutex::scoped_lock lock(seq_mutex_);
384  uint32_t old_seq = seq_;
385  ++seq_;
386 
387  return old_seq;
388 }
389 
391 {
392  boost::mutex::scoped_lock lock(subscriber_links_mutex_);
393  return (uint32_t)subscriber_links_.size();
394 }
395 
397 {
398  boost::mutex::scoped_lock lock(callbacks_mutex_);
399  return latch_;
400 }
401 
402 void Publication::getPublishTypes(bool& serialize, bool& nocopy, const std::type_info& ti)
403 {
404  boost::mutex::scoped_lock lock(subscriber_links_mutex_);
405  V_SubscriberLink::const_iterator it = subscriber_links_.begin();
406  V_SubscriberLink::const_iterator end = subscriber_links_.end();
407  for (; it != end; ++it)
408  {
409  const SubscriberLinkPtr& sub = *it;
410  bool s = false;
411  bool n = false;
412  sub->getPublishTypes(s, n, ti);
413  serialize = serialize || s;
414  nocopy = nocopy || n;
415 
416  if (serialize && nocopy)
417  {
418  break;
419  }
420  }
421 }
422 
424 {
425  boost::mutex::scoped_lock lock(subscriber_links_mutex_);
426  return !subscriber_links_.empty();
427 }
428 
430 {
431  if (m.message)
432  {
433  boost::mutex::scoped_lock lock(subscriber_links_mutex_);
434  V_SubscriberLink::const_iterator it = subscriber_links_.begin();
435  V_SubscriberLink::const_iterator end = subscriber_links_.end();
436  for (; it != end; ++it)
437  {
438  const SubscriberLinkPtr& sub = *it;
439  if (sub->isIntraprocess())
440  {
441  sub->enqueueMessage(m, false, true);
442  }
443  }
444 
445  m.message.reset();
446  }
447 
448  if (m.buf)
449  {
450  boost::mutex::scoped_lock lock(publish_queue_mutex_);
451  publish_queue_.push_back(m);
452  }
453 }
454 
456 {
457  V_SerializedMessage queue;
458  {
459  boost::mutex::scoped_lock lock(publish_queue_mutex_);
460 
461  if (dropped_)
462  {
463  return;
464  }
465 
466  queue.insert(queue.end(), publish_queue_.begin(), publish_queue_.end());
467  publish_queue_.clear();
468  }
469 
470  if (queue.empty())
471  {
472  return;
473  }
474 
475  V_SerializedMessage::iterator it = queue.begin();
476  V_SerializedMessage::iterator end = queue.end();
477  for (; it != end; ++it)
478  {
479  enqueueMessage(*it);
480  }
481 }
482 
483 bool Publication::validateHeader(const Header& header, std::string& error_msg)
484 {
485  std::string md5sum, topic, client_callerid;
486  if (!header.getValue("md5sum", md5sum)
487  || !header.getValue("topic", topic)
488  || !header.getValue("callerid", client_callerid))
489  {
490  std::string msg("Header from subscriber did not have the required elements: md5sum, topic, callerid");
491 
492  ROS_ERROR("%s", msg.c_str());
493  error_msg = msg;
494 
495  return false;
496  }
497 
498  // Check whether the topic has been deleted from
499  // advertised_topics through a call to unadvertise(), which could
500  // have happened while we were waiting for the subscriber to
501  // provide the md5sum.
502  if(isDropped())
503  {
504  std::string msg = std::string("received a tcpros connection for a nonexistent topic [") +
505  topic + std::string("] from [" + client_callerid +"].");
506 
507  ROS_ERROR("%s", msg.c_str());
508  error_msg = msg;
509 
510  return false;
511  }
512 
513  if (getMD5Sum() != md5sum &&
514  (md5sum != std::string("*") && getMD5Sum() != std::string("*")))
515  {
516  std::string datatype;
517  header.getValue("type", datatype);
518 
519  std::string msg = std::string("Client [") + client_callerid + std::string("] wants topic ") + topic +
520  std::string(" to have datatype/md5sum [") + datatype + "/" + md5sum +
521  std::string("], but our version has [") + getDataType() + "/" + getMD5Sum() +
522  std::string("]. Dropping connection.");
523 
524  ROS_ERROR("%s", msg.c_str());
525  error_msg = msg;
526 
527  return false;
528  }
529 
530  return true;
531 }
532 
533 } // namespace ros
ros::CallbackInterface::Invalid
@ Invalid
Call no longer valid.
Definition: callback_queue_interface.h:58
XmlRpc::XmlRpcValue::size
int size() const
ros::Publication::dropped_
bool dropped_
Definition: publication.h:177
ros::SerializedMessage
ros::Publication::V_SerializedMessage
std::vector< SerializedMessage > V_SerializedMessage
Definition: publication.h:184
callback_queue_interface.h
ros::Publication::isDropped
bool isDropped()
Returns if this publication is valid or not.
Definition: publication.h:136
md5sum
const char * md5sum()
ros::Publication::subscriber_links_mutex_
boost::mutex subscriber_links_mutex_
Definition: publication.h:175
single_subscriber_publisher.h
boost::shared_ptr< SubscriberLink >
ros::PeerConnDisconnCallback::use_tracked_object_
bool use_tracked_object_
Definition: publication.cpp:72
ros::Publication::callbacks_
V_Callback callbacks_
Definition: publication.h:170
ros::Publication::addCallbacks
void addCallbacks(const SubscriberCallbacksPtr &callbacks)
Definition: publication.cpp:101
ros::Publication::processPublishQueue
void processPublishQueue()
Definition: publication.cpp:455
ros::Publication::peerConnect
void peerConnect(const SubscriberLinkPtr &sub_link)
Called when a new peer has connected. Calls the connection callback.
Definition: publication.cpp:337
ros::V_SubscriberLink
std::vector< SubscriberLinkPtr > V_SubscriberLink
Definition: forwards.h:71
s
XmlRpcServer s
ros
ros::Publication::callbacks_mutex_
boost::mutex callbacks_mutex_
Definition: publication.h:171
ros::Publication::getNumSubscribers
uint32_t getNumSubscribers()
Returns the number of subscribers this publication has.
Definition: publication.cpp:390
ros::Header
ros::serialization
ros::Publication::removeCallbacks
void removeCallbacks(const SubscriberCallbacksPtr &callbacks)
Definition: publication.cpp:128
ros::Publication::getNumCallbacks
size_t getNumCallbacks()
Definition: publication.cpp:375
ros::PeerConnDisconnCallback
Definition: publication.cpp:39
ros::SerializedMessage::message
boost::shared_ptr< void const > message
ros::Publication::getMD5Sum
const std::string & getMD5Sum() const
Returns the md5sum of the message published by this publication.
Definition: publication.h:108
ros::Publication::Publication
Publication(const std::string &name, const std::string &datatype, const std::string &_md5sum, const std::string &message_definition, size_t max_queue, bool, bool has_header)
Definition: publication.cpp:76
ros::PeerConnDisconnCallback::sub_link_
SubscriberLinkPtr sub_link_
Definition: publication.cpp:71
ros::Publication::publish_queue_
V_SerializedMessage publish_queue_
Definition: publication.h:185
ros::SubscriberStatusCallback
boost::function< void(const SingleSubscriberPublisher &)> SubscriberStatusCallback
Definition: forwards.h:94
ros::Publication::publish_queue_mutex_
boost::mutex publish_queue_mutex_
Definition: publication.h:186
ros::Publication::publish
void publish(SerializedMessage &m)
Definition: publication.cpp:429
ros::Publication::drop
void drop()
Drop this publication. Disconnects all publishers.
Definition: publication.cpp:160
ros::CallbackInterface::CallResult
CallResult
Possible results for the call() method.
Definition: callback_queue_interface.h:54
ros::Publication::getInfo
void getInfo(XmlRpc::XmlRpcValue &info)
Get the accumulated info for this publication.
Definition: publication.cpp:299
ros::SingleSubscriberPublisher
Allows publication of a message to a single subscriber. Only available inside subscriber connection c...
Definition: single_subscriber_publisher.h:43
ros::Publication::getPublishTypes
void getPublishTypes(bool &serialize, bool &nocopy, const std::type_info &ti)
Definition: publication.cpp:402
ros::Publication::isLatched
bool isLatched()
Definition: publication.cpp:396
ros::Publication::latch_
bool latch_
Definition: publication.h:179
ros::Publication::seq_mutex_
boost::mutex seq_mutex_
Definition: publication.h:167
serialization.h
ros::Publication::enqueueMessage
bool enqueueMessage(const SerializedMessage &m)
queues an outgoing message into each of the publishers, so that it gets sent to every subscriber
Definition: publication.cpp:179
ros::CallbackInterface
Abstract interface for items which can be added to a CallbackQueueInterface.
Definition: callback_queue_interface.h:48
ros::SerializedMessage::num_bytes
size_t num_bytes
ros::CallbackInterface::Success
@ Success
Call succeeded.
Definition: callback_queue_interface.h:56
ros::PeerConnDisconnCallback::tracked_object_
VoidConstWPtr tracked_object_
Definition: publication.cpp:73
ros::Publication::removeSubscriberLink
void removeSubscriberLink(const SubscriberLinkPtr &sub_link)
Removes a publisher from our list (deleting it if it's the last reference)
Definition: publication.cpp:237
ros::VoidConstWPtr
boost::weak_ptr< void const > VoidConstWPtr
Definition: forwards.h:53
ros::Publication::getStats
XmlRpc::XmlRpcValue getStats()
Get the accumulated stats for this publication.
Definition: publication.cpp:267
connection.h
ros::Publication::intraprocess_subscriber_count_
uint32_t intraprocess_subscriber_count_
Definition: publication.h:182
ros::Publication::incrementSequence
uint32_t incrementSequence()
Definition: publication.cpp:381
ros::Publication::addSubscriberLink
void addSubscriberLink(const SubscriberLinkPtr &sub_link)
Adds a publisher to our list.
Definition: publication.cpp:213
ros::PeerConnDisconnCallback::call
virtual CallResult call()
Call this callback.
Definition: publication.cpp:50
publication.h
ros::Publication::has_header_
bool has_header_
Definition: publication.h:180
ROS_ERROR
#define ROS_ERROR(...)
ros::PeerConnDisconnCallback::callback_
SubscriberStatusCallback callback_
Definition: publication.cpp:70
XmlRpc::XmlRpcValue::setSize
void setSize(int size)
datatype
const char * datatype()
ros::Publication::subscriber_links_
V_SubscriberLink subscriber_links_
Definition: publication.h:173
ros::Publication::~Publication
~Publication()
Definition: publication.cpp:96
header
const std::string header
ROS_ASSERT
#define ROS_ASSERT(cond)
ros::PeerConnDisconnCallback::PeerConnDisconnCallback
PeerConnDisconnCallback(const SubscriberStatusCallback &callback, const SubscriberLinkPtr &sub_link, bool use_tracked_object, const VoidConstWPtr &tracked_object)
Definition: publication.cpp:42
ros::Publication::dropAllConnections
void dropAllConnections()
Definition: publication.cpp:318
ros::Publication::name_
std::string name_
Definition: publication.h:161
serialize
void serialize(Stream &stream, const boost::array< T, N > &t)
ros::Publication::peerDisconnect
void peerDisconnect(const SubscriberLinkPtr &sub_link)
Called when a peer has disconnected. Calls the disconnection callback.
Definition: publication.cpp:358
ros::SerializedMessage::buf
boost::shared_array< uint8_t > buf
XmlRpc::XmlRpcValue
ros::Publication::seq_
uint32_t seq_
Definition: publication.h:166
ros::Publication::validateHeader
bool validateHeader(const Header &h, std::string &error_msg)
Definition: publication.cpp:483
ros::Publication::getDataType
const std::string & getDataType() const
Returns the data type of the message published by this publication.
Definition: publication.h:104
ros::Publication::hasSubscribers
bool hasSubscribers()
Returns whether or not this publication has any subscribers.
Definition: publication.cpp:423


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas , Jacob Perron
autogenerated on Thu Nov 23 2023 04:01:44