publication.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2008, Morgan Quigley and Willow Garage, Inc.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are met:
6  * * Redistributions of source code must retain the above copyright notice,
7  * this list of conditions and the following disclaimer.
8  * * Redistributions in binary form must reproduce the above copyright
9  * notice, this list of conditions and the following disclaimer in the
10  * documentation and/or other materials provided with the distribution.
11  * * Neither the names of Stanford University or Willow Garage, Inc. nor the names of its
12  * contributors may be used to endorse or promote products derived from
13  * this software without specific prior written permission.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
19  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25  * POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #include "ros/publication.h"
29 #include "ros/subscriber_link.h"
30 #include "ros/connection.h"
33 #include "ros/serialization.h"
34 #include <std_msgs/Header.h>
35 
36 namespace ros
37 {
38 
40 {
41 public:
42  PeerConnDisconnCallback(const SubscriberStatusCallback& callback, const SubscriberLinkPtr& sub_link, bool use_tracked_object, const VoidConstWPtr& tracked_object)
43  : callback_(callback)
44  , sub_link_(sub_link)
45  , use_tracked_object_(use_tracked_object)
46  , tracked_object_(tracked_object)
47  {
48  }
49 
50  virtual CallResult call()
51  {
52  VoidConstPtr tracker;
54  {
55  tracker = tracked_object_.lock();
56 
57  if (!tracker)
58  {
59  return Invalid;
60  }
61  }
62 
64  callback_(pub);
65 
66  return Success;
67  }
68 
69 private:
74 };
75 
76 Publication::Publication(const std::string &name,
77  const std::string &datatype,
78  const std::string &_md5sum,
79  const std::string& message_definition,
80  size_t max_queue,
81  bool latch,
82  bool has_header)
83 : name_(name),
84  datatype_(datatype),
85  md5sum_(_md5sum),
86  message_definition_(message_definition),
87  max_queue_(max_queue),
88  seq_(0),
89  dropped_(false),
90  latch_(latch),
91  has_header_(has_header),
92  intraprocess_subscriber_count_(0)
93 {
94 }
95 
97 {
98  drop();
99 }
100 
102 {
103  boost::mutex::scoped_lock lock(callbacks_mutex_);
104 
105  callbacks_.push_back(callbacks);
106 
107  // Add connect callbacks for all current subscriptions if this publisher wants them
108  if (callbacks->connect_ && callbacks->callback_queue_)
109  {
110  boost::mutex::scoped_lock lock(subscriber_links_mutex_);
111  V_SubscriberLink::iterator it = subscriber_links_.begin();
112  V_SubscriberLink::iterator end = subscriber_links_.end();
113  for (; it != end; ++it)
114  {
115  const SubscriberLinkPtr& sub_link = *it;
116  CallbackInterfacePtr cb(boost::make_shared<PeerConnDisconnCallback>(callbacks->connect_, sub_link, callbacks->has_tracked_object_, callbacks->tracked_object_));
117  callbacks->callback_queue_->addCallback(cb, (uint64_t)callbacks.get());
118  }
119  }
120 }
121 
123 {
124  boost::mutex::scoped_lock lock(callbacks_mutex_);
125 
126  V_Callback::iterator it = std::find(callbacks_.begin(), callbacks_.end(), callbacks);
127  if (it != callbacks_.end())
128  {
129  const SubscriberCallbacksPtr& cb = *it;
130  if (cb->callback_queue_)
131  {
132  cb->callback_queue_->removeByID((uint64_t)cb.get());
133  }
134  callbacks_.erase(it);
135  }
136 }
137 
139 {
140  // grab a lock here, to ensure that no subscription callback will
141  // be invoked after we return
142  {
143  boost::mutex::scoped_lock lock(publish_queue_mutex_);
144  boost::mutex::scoped_lock lock2(subscriber_links_mutex_);
145 
146  if (dropped_)
147  {
148  return;
149  }
150 
151  dropped_ = true;
152  }
153 
155 }
156 
158 {
159  boost::mutex::scoped_lock lock(subscriber_links_mutex_);
160  if (dropped_)
161  {
162  return false;
163  }
164 
165  ROS_ASSERT(m.buf);
166 
167  uint32_t seq = incrementSequence();
168  if (has_header_)
169  {
170  // If we have a header, we know it's immediately after the message length
171  // Deserialize it, write the sequence, and then serialize it again.
172  namespace ser = ros::serialization;
173  std_msgs::Header header;
174  ser::IStream istream(m.buf.get() + 4, m.num_bytes - 4);
175  ser::deserialize(istream, header);
176  header.seq = seq;
177  ser::OStream ostream(m.buf.get() + 4, m.num_bytes - 4);
178  ser::serialize(ostream, header);
179  }
180 
181  for(V_SubscriberLink::iterator i = subscriber_links_.begin();
182  i != subscriber_links_.end(); ++i)
183  {
184  const SubscriberLinkPtr& sub_link = (*i);
185  sub_link->enqueueMessage(m, true, false);
186  }
187 
188  if (latch_)
189  {
190  last_message_ = m;
191  }
192 
193  return true;
194 }
195 
197 {
198  {
199  boost::mutex::scoped_lock lock(subscriber_links_mutex_);
200 
201  if (dropped_)
202  {
203  return;
204  }
205 
206  subscriber_links_.push_back(sub_link);
207 
208  if (sub_link->isIntraprocess())
209  {
211  }
212  }
213 
214  if (latch_ && last_message_.buf)
215  {
216  sub_link->enqueueMessage(last_message_, true, true);
217  }
218 
219  // This call invokes the subscribe callback if there is one.
220  // This must happen *after* the push_back above, in case the
221  // callback uses publish().
222  peerConnect(sub_link);
223 }
224 
226 {
227  SubscriberLinkPtr link;
228  {
229  boost::mutex::scoped_lock lock(subscriber_links_mutex_);
230 
231  if (dropped_)
232  {
233  return;
234  }
235 
236  if (sub_link->isIntraprocess())
237  {
239  }
240 
241  V_SubscriberLink::iterator it = std::find(subscriber_links_.begin(), subscriber_links_.end(), sub_link);
242  if (it != subscriber_links_.end())
243  {
244  link = *it;
245  subscriber_links_.erase(it);
246  }
247  }
248 
249  if (link)
250  {
251  peerDisconnect(link);
252  }
253 }
254 
256 {
257  XmlRpc::XmlRpcValue stats;
258  stats[0] = name_;
259  XmlRpc::XmlRpcValue conn_data;
260  conn_data.setSize(0); // force to be an array, even if it's empty
261 
262  boost::mutex::scoped_lock lock(subscriber_links_mutex_);
263 
264  uint32_t cidx = 0;
265  for (V_SubscriberLink::iterator c = subscriber_links_.begin();
266  c != subscriber_links_.end(); ++c, cidx++)
267  {
268  const SubscriberLink::Stats& s = (*c)->getStats();
269  conn_data[cidx][0] = (*c)->getConnectionID();
270  // todo: figure out what to do here... the bytes_sent will wrap around
271  // on some flows within a reasonable amount of time. xmlrpc++ doesn't
272  // seem to give me a nice way to do 64-bit ints, perhaps that's a
273  // limitation of xml-rpc, not sure. alternatively we could send the number
274  // of KB transmitted to gain a few order of magnitude.
275  conn_data[cidx][1] = (int)s.bytes_sent_;
276  conn_data[cidx][2] = (int)s.message_data_sent_;
277  conn_data[cidx][3] = (int)s.messages_sent_;
278  conn_data[cidx][4] = 0; // not sure what is meant by connected
279  }
280 
281  stats[1] = conn_data;
282  return stats;
283 }
284 
285 // Publisher : [(connection_id, destination_caller_id, direction, transport, topic_name, connected, connection_info_string)*]
286 // e.g. [(2, '/listener', 'o', 'TCPROS', '/chatter', 1, 'TCPROS connection on port 55878 to [127.0.0.1:44273 on socket 7]')]
288 {
289  boost::mutex::scoped_lock lock(subscriber_links_mutex_);
290 
291  for (V_SubscriberLink::iterator c = subscriber_links_.begin();
292  c != subscriber_links_.end(); ++c)
293  {
294  XmlRpc::XmlRpcValue curr_info;
295  curr_info[0] = (int)(*c)->getConnectionID();
296  curr_info[1] = (*c)->getDestinationCallerID();
297  curr_info[2] = "o";
298  curr_info[3] = (*c)->getTransportType();
299  curr_info[4] = name_;
300  curr_info[5] = true; // For length compatibility with rospy
301  curr_info[6] = (*c)->getTransportInfo();
302  info[info.size()] = curr_info;
303  }
304 }
305 
307 {
308  // Swap our publishers list with a local one so we can only lock for a short period of time, because a
309  // side effect of our calling drop() on connections can be re-locking the publishers mutex
310  V_SubscriberLink local_publishers;
311 
312  {
313  boost::mutex::scoped_lock lock(subscriber_links_mutex_);
314 
315  local_publishers.swap(subscriber_links_);
316  }
317 
318  for (V_SubscriberLink::iterator i = local_publishers.begin();
319  i != local_publishers.end(); ++i)
320  {
321  (*i)->drop();
322  }
323 }
324 
326 {
327  boost::mutex::scoped_lock lock(callbacks_mutex_);
328 
329  V_Callback::iterator it = callbacks_.begin();
330  V_Callback::iterator end = callbacks_.end();
331  for (; it != end; ++it)
332  {
333  const SubscriberCallbacksPtr& cbs = *it;
334  if (cbs->connect_ && cbs->callback_queue_)
335  {
336  CallbackInterfacePtr cb(boost::make_shared<PeerConnDisconnCallback>(cbs->connect_, sub_link, cbs->has_tracked_object_, cbs->tracked_object_));
337  cbs->callback_queue_->addCallback(cb, (uint64_t)cbs.get());
338  }
339  }
340 }
341 
343 {
344  boost::mutex::scoped_lock lock(callbacks_mutex_);
345 
346  V_Callback::iterator it = callbacks_.begin();
347  V_Callback::iterator end = callbacks_.end();
348  for (; it != end; ++it)
349  {
350  const SubscriberCallbacksPtr& cbs = *it;
351  if (cbs->disconnect_ && cbs->callback_queue_)
352  {
353  CallbackInterfacePtr cb(boost::make_shared<PeerConnDisconnCallback>(cbs->disconnect_, sub_link, cbs->has_tracked_object_, cbs->tracked_object_));
354  cbs->callback_queue_->addCallback(cb, (uint64_t)cbs.get());
355  }
356  }
357 }
358 
360 {
361  boost::mutex::scoped_lock lock(callbacks_mutex_);
362  return callbacks_.size();
363 }
364 
366 {
367  boost::mutex::scoped_lock lock(seq_mutex_);
368  uint32_t old_seq = seq_;
369  ++seq_;
370 
371  return old_seq;
372 }
373 
375 {
376  boost::mutex::scoped_lock lock(subscriber_links_mutex_);
377  return (uint32_t)subscriber_links_.size();
378 }
379 
380 void Publication::getPublishTypes(bool& serialize, bool& nocopy, const std::type_info& ti)
381 {
382  boost::mutex::scoped_lock lock(subscriber_links_mutex_);
383  V_SubscriberLink::const_iterator it = subscriber_links_.begin();
384  V_SubscriberLink::const_iterator end = subscriber_links_.end();
385  for (; it != end; ++it)
386  {
387  const SubscriberLinkPtr& sub = *it;
388  bool s = false;
389  bool n = false;
390  sub->getPublishTypes(s, n, ti);
391  serialize = serialize || s;
392  nocopy = nocopy || n;
393 
394  if (serialize && nocopy)
395  {
396  break;
397  }
398  }
399 }
400 
402 {
403  boost::mutex::scoped_lock lock(subscriber_links_mutex_);
404  return !subscriber_links_.empty();
405 }
406 
408 {
409  if (m.message)
410  {
411  boost::mutex::scoped_lock lock(subscriber_links_mutex_);
412  V_SubscriberLink::const_iterator it = subscriber_links_.begin();
413  V_SubscriberLink::const_iterator end = subscriber_links_.end();
414  for (; it != end; ++it)
415  {
416  const SubscriberLinkPtr& sub = *it;
417  if (sub->isIntraprocess())
418  {
419  sub->enqueueMessage(m, false, true);
420  }
421  }
422 
423  m.message.reset();
424  }
425 
426  if (m.buf)
427  {
428  boost::mutex::scoped_lock lock(publish_queue_mutex_);
429  publish_queue_.push_back(m);
430  }
431 }
432 
434 {
435  V_SerializedMessage queue;
436  {
437  boost::mutex::scoped_lock lock(publish_queue_mutex_);
438 
439  if (dropped_)
440  {
441  return;
442  }
443 
444  queue.insert(queue.end(), publish_queue_.begin(), publish_queue_.end());
445  publish_queue_.clear();
446  }
447 
448  if (queue.empty())
449  {
450  return;
451  }
452 
453  V_SerializedMessage::iterator it = queue.begin();
454  V_SerializedMessage::iterator end = queue.end();
455  for (; it != end; ++it)
456  {
457  enqueueMessage(*it);
458  }
459 }
460 
461 bool Publication::validateHeader(const Header& header, std::string& error_msg)
462 {
463  std::string md5sum, topic, client_callerid;
464  if (!header.getValue("md5sum", md5sum)
465  || !header.getValue("topic", topic)
466  || !header.getValue("callerid", client_callerid))
467  {
468  std::string msg("Header from subscriber did not have the required elements: md5sum, topic, callerid");
469 
470  ROS_ERROR("%s", msg.c_str());
471  error_msg = msg;
472 
473  return false;
474  }
475 
476  // Check whether the topic has been deleted from
477  // advertised_topics through a call to unadvertise(), which could
478  // have happened while we were waiting for the subscriber to
479  // provide the md5sum.
480  if(isDropped())
481  {
482  std::string msg = std::string("received a tcpros connection for a nonexistent topic [") +
483  topic + std::string("] from [" + client_callerid +"].");
484 
485  ROS_ERROR("%s", msg.c_str());
486  error_msg = msg;
487 
488  return false;
489  }
490 
491  if (getMD5Sum() != md5sum &&
492  (md5sum != std::string("*") && getMD5Sum() != std::string("*")))
493  {
494  std::string datatype;
495  header.getValue("type", datatype);
496 
497  std::string msg = std::string("Client [") + client_callerid + std::string("] wants topic ") + topic +
498  std::string(" to have datatype/md5sum [") + datatype + "/" + md5sum +
499  std::string("], but our version has [") + getDataType() + "/" + getMD5Sum() +
500  std::string("]. Dropping connection.");
501 
502  ROS_ERROR("%s", msg.c_str());
503  error_msg = msg;
504 
505  return false;
506  }
507 
508  return true;
509 }
510 
511 } // namespace ros
Publication(const std::string &name, const std::string &datatype, const std::string &_md5sum, const std::string &message_definition, size_t max_queue, bool latch, bool has_header)
Definition: publication.cpp:76
CallResult
Possible results for the call() method.
boost::mutex seq_mutex_
Definition: publication.h:167
uint32_t incrementSequence()
void processPublishQueue()
boost::function< void(const SingleSubscriberPublisher &)> SubscriberStatusCallback
Definition: forwards.h:94
SerializedMessage last_message_
Definition: publication.h:181
XmlRpc::XmlRpcValue getStats()
Get the accumulated stats for this publication.
void dropAllConnections()
XmlRpcServer s
uint32_t getNumSubscribers()
Returns the number of subscribers this publication has.
bool enqueueMessage(const SerializedMessage &m)
queues an outgoing message into each of the publishers, so that it gets sent to every subscriber ...
boost::mutex callbacks_mutex_
Definition: publication.h:171
PeerConnDisconnCallback(const SubscriberStatusCallback &callback, const SubscriberLinkPtr &sub_link, bool use_tracked_object, const VoidConstWPtr &tracked_object)
Definition: publication.cpp:42
Abstract interface for items which can be added to a CallbackQueueInterface.
void addSubscriberLink(const SubscriberLinkPtr &sub_link)
Adds a publisher to our list.
const std::string & getDataType() const
Returns the data type of the message published by this publication.
Definition: publication.h:104
void getPublishTypes(bool &serialize, bool &nocopy, const std::type_info &ti)
V_SerializedMessage publish_queue_
Definition: publication.h:186
Allows publication of a message to a single subscriber. Only available inside subscriber connection c...
void publish(SerializedMessage &m)
V_SubscriberLink subscriber_links_
Definition: publication.h:173
void removeSubscriberLink(const SubscriberLinkPtr &sub_link)
Removes a publisher from our list (deleting it if it&#39;s the last reference)
SubscriberLinkPtr sub_link_
Definition: publication.cpp:71
bool hasSubscribers()
Returns whether or not this publication has any subscribers.
void addCallbacks(const SubscriberCallbacksPtr &callbacks)
uint32_t intraprocess_subscriber_count_
Definition: publication.h:183
const char * datatype()
SubscriberStatusCallback callback_
Definition: publication.cpp:70
void setSize(int size)
bool isDropped()
Returns if this publication is valid or not.
Definition: publication.h:136
boost::weak_ptr< void const > VoidConstWPtr
Definition: forwards.h:53
boost::mutex publish_queue_mutex_
Definition: publication.h:187
bool validateHeader(const Header &h, std::string &error_msg)
void removeCallbacks(const SubscriberCallbacksPtr &callbacks)
void peerConnect(const SubscriberLinkPtr &sub_link)
Called when a new peer has connected. Calls the connection callback.
std::vector< SerializedMessage > V_SerializedMessage
Definition: publication.h:185
boost::mutex subscriber_links_mutex_
Definition: publication.h:175
boost::shared_array< uint8_t > buf
std::vector< SubscriberLinkPtr > V_SubscriberLink
Definition: forwards.h:71
std::string name_
Definition: publication.h:161
bool getValue(const std::string &key, std::string &value) const
const std::string & getMD5Sum() const
Returns the md5sum of the message published by this publication.
Definition: publication.h:108
const char * md5sum()
void getInfo(XmlRpc::XmlRpcValue &info)
Get the accumulated info for this publication.
const std::string header
size_t getNumCallbacks()
void peerDisconnect(const SubscriberLinkPtr &sub_link)
Called when a peer has disconnected. Calls the disconnection callback.
#define ROS_ASSERT(cond)
boost::shared_ptr< void const > message
virtual CallResult call()
Call this callback.
Definition: publication.cpp:50
#define ROS_ERROR(...)
void drop()
Drop this publication. Disconnects all publishers.
V_Callback callbacks_
Definition: publication.h:170


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas
autogenerated on Mon Feb 28 2022 23:33:27