topic_manager.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2009, Willow Garage, Inc.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are met:
6  * * Redistributions of source code must retain the above copyright notice,
7  * this list of conditions and the following disclaimer.
8  * * Redistributions in binary form must reproduce the above copyright
9  * notice, this list of conditions and the following disclaimer in the
10  * documentation and/or other materials provided with the distribution.
11  * * Neither the names of Willow Garage, Inc. nor the names of its
12  * contributors may be used to endorse or promote products derived from
13  * this software without specific prior written permission.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
19  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25  * POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #include "ros/topic_manager.h"
29 #include "ros/xmlrpc_manager.h"
30 #include "ros/connection_manager.h"
31 #include "ros/poll_manager.h"
32 #include "ros/publication.h"
33 #include "ros/subscription.h"
34 #include "ros/this_node.h"
35 #include "ros/network.h"
36 #include "ros/master.h"
39 #include "ros/rosout_appender.h"
40 #include "ros/init.h"
41 #include "ros/file_log.h"
42 #include "ros/subscribe_options.h"
43 
44 #include "xmlrpcpp/XmlRpc.h"
45 
46 #include <boost/bind/bind.hpp>
47 #include <ros/console.h>
48 
49 using namespace XmlRpc; // A battle to be fought later
50 using namespace std; // sigh
51 
53 
54 namespace ros
55 {
56 
57 const TopicManagerPtr& TopicManager::instance()
58 {
59  static TopicManagerPtr topic_manager = boost::make_shared<TopicManager>();
60  return topic_manager;
61 }
62 
63 TopicManager::TopicManager()
64 : shutting_down_(false)
65 {
66 }
67 
69 {
70  shutdown();
71 }
72 
74 {
75  boost::mutex::scoped_lock shutdown_lock(shutting_down_mutex_);
76  shutting_down_ = false;
77 
81 
82  xmlrpc_manager_->bind("publisherUpdate", boost::bind(&TopicManager::pubUpdateCallback, this, boost::placeholders::_1, boost::placeholders::_2));
83  xmlrpc_manager_->bind("requestTopic", boost::bind(&TopicManager::requestTopicCallback, this, boost::placeholders::_1, boost::placeholders::_2));
84  xmlrpc_manager_->bind("getBusStats", boost::bind(&TopicManager::getBusStatsCallback, this, boost::placeholders::_1, boost::placeholders::_2));
85  xmlrpc_manager_->bind("getBusInfo", boost::bind(&TopicManager::getBusInfoCallback, this, boost::placeholders::_1, boost::placeholders::_2));
86  xmlrpc_manager_->bind("getSubscriptions", boost::bind(&TopicManager::getSubscriptionsCallback, this, boost::placeholders::_1, boost::placeholders::_2));
87  xmlrpc_manager_->bind("getPublications", boost::bind(&TopicManager::getPublicationsCallback, this, boost::placeholders::_1, boost::placeholders::_2));
88 
89  poll_manager_->addPollThreadListener(boost::bind(&TopicManager::processPublishQueues, this));
90 }
91 
93 {
94  boost::mutex::scoped_lock shutdown_lock(shutting_down_mutex_);
95  if (shutting_down_)
96  {
97  return;
98  }
99 
100  {
102  shutting_down_ = true;
103  subs_mutex_.unlock();
104  advertised_topics_mutex_.unlock();
105  }
106 
107  // actually one should call poll_manager_->removePollThreadListener(), but the connection is not stored above
108  poll_manager_->shutdown();
109 
110  xmlrpc_manager_->unbind("publisherUpdate");
111  xmlrpc_manager_->unbind("requestTopic");
112  xmlrpc_manager_->unbind("getBusStats");
113  xmlrpc_manager_->unbind("getBusInfo");
114  xmlrpc_manager_->unbind("getSubscriptions");
115  xmlrpc_manager_->unbind("getPublications");
116 
117  ROSCPP_LOG_DEBUG("Shutting down topics...");
118  ROSCPP_LOG_DEBUG(" shutting down publishers");
119  {
120  boost::recursive_mutex::scoped_lock adv_lock(advertised_topics_mutex_);
121 
122  for (V_Publication::iterator i = advertised_topics_.begin();
123  i != advertised_topics_.end(); ++i)
124  {
125  if(!(*i)->isDropped())
126  {
127  unregisterPublisher((*i)->getName());
128  }
129  (*i)->drop();
130  }
131  advertised_topics_.clear();
132  }
133 
134  // unregister all of our subscriptions
135  ROSCPP_LOG_DEBUG(" shutting down subscribers");
136  {
137  boost::mutex::scoped_lock subs_lock(subs_mutex_);
138 
139  for (L_Subscription::iterator s = subscriptions_.begin(); s != subscriptions_.end(); ++s)
140  {
141  // Remove us as a subscriber from the master
142  unregisterSubscriber((*s)->getName());
143  // now, drop our side of the connection
144  (*s)->shutdown();
145  }
146  subscriptions_.clear();
147  }
148 }
149 
151 {
152  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
153 
154  V_Publication::iterator it = advertised_topics_.begin();
155  V_Publication::iterator end = advertised_topics_.end();
156  for (; it != end; ++it)
157  {
158  const PublicationPtr& pub = *it;
159  pub->processPublishQueue();
160  }
161 }
162 
164 {
165  boost::mutex::scoped_lock lock(advertised_topic_names_mutex_);
166 
167  topics.resize(advertised_topic_names_.size());
168  std::copy(advertised_topic_names_.begin(),
170  topics.begin());
171 }
172 
174 {
175  boost::mutex::scoped_lock lock(subs_mutex_);
176 
177  topics.reserve(subscriptions_.size());
178  L_Subscription::const_iterator it = subscriptions_.begin();
179  L_Subscription::const_iterator end = subscriptions_.end();
180  for (; it != end; ++it)
181  {
182  const SubscriptionPtr& sub = *it;
183  topics.push_back(sub->getName());
184  }
185 }
186 
188 {
189  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
190 
191  return lookupPublicationWithoutLock(topic);
192 }
193 
194 bool md5sumsMatch(const std::string& lhs, const std::string& rhs)
195 {
196  return lhs == "*" || rhs == "*" || lhs == rhs;
197 }
198 
200 {
201  // spin through the subscriptions and see if we find a match. if so, use it.
202  bool found = false;
203  bool found_topic = false;
204 
205  SubscriptionPtr sub;
206 
207  {
208  if (isShuttingDown())
209  {
210  return false;
211  }
212 
213  for (L_Subscription::iterator s = subscriptions_.begin();
214  s != subscriptions_.end() && !found; ++s)
215  {
216  sub = *s;
217  if (!sub->isDropped() && sub->getName() == ops.topic)
218  {
219  found_topic = true;
220  if (md5sumsMatch(ops.md5sum, sub->md5sum()))
221  {
222  found = true;
223  }
224  break;
225  }
226  }
227  }
228 
229  if (found_topic && !found)
230  {
231  std::stringstream ss;
232  ss << "Tried to subscribe to a topic with the same name but different md5sum as a topic that was already subscribed [" << ops.datatype << "/" << ops.md5sum << " vs. " << sub->datatype() << "/" << sub->md5sum() << "]";
233  throw ConflictingSubscriptionException(ss.str());
234  }
235  else if (found)
236  {
237  if (!sub->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object, ops.allow_concurrent_callbacks))
238  {
239  return false;
240  }
241  }
242 
243  return found;
244 }
245 
246 // this function has the subscription code that doesn't need to be templated.
248 {
249  boost::mutex::scoped_lock lock(subs_mutex_);
250 
251  if (addSubCallback(ops))
252  {
253  return true;
254  }
255 
256  if (isShuttingDown())
257  {
258  return false;
259  }
260 
261  if (ops.md5sum.empty())
262  {
263  throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] with an empty md5sum");
264  }
265 
266  if (ops.datatype.empty())
267  {
268  throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] with an empty datatype");
269  }
270 
271  if (!ops.helper)
272  {
273  throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] without a callback");
274  }
275 
276  const std::string& md5sum = ops.md5sum;
277  std::string datatype = ops.datatype;
278 
279  SubscriptionPtr s(boost::make_shared<Subscription>(ops.topic, md5sum, datatype, ops.transport_hints));
280  s->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object, ops.allow_concurrent_callbacks);
281 
282  if (!registerSubscriber(s, ops.datatype))
283  {
284  ROS_WARN("couldn't register subscriber on topic [%s]", ops.topic.c_str());
285  s->shutdown();
286  return false;
287  }
288 
289  subscriptions_.push_back(s);
290 
291  return true;
292 }
293 
295 {
296  if (ops.datatype == "*")
297  {
298  std::stringstream ss;
299  ss << "Advertising with * as the datatype is not allowed. Topic [" << ops.topic << "]";
300  throw InvalidParameterException(ss.str());
301  }
302 
303  if (ops.md5sum == "*")
304  {
305  std::stringstream ss;
306  ss << "Advertising with * as the md5sum is not allowed. Topic [" << ops.topic << "]";
307  throw InvalidParameterException(ss.str());
308  }
309 
310  if (ops.md5sum.empty())
311  {
312  throw InvalidParameterException("Advertising on topic [" + ops.topic + "] with an empty md5sum");
313  }
314 
315  if (ops.datatype.empty())
316  {
317  throw InvalidParameterException("Advertising on topic [" + ops.topic + "] with an empty datatype");
318  }
319 
320  if (ops.message_definition.empty())
321  {
322  ROS_WARN("Advertising on topic [%s] with an empty message definition. Some tools (e.g. rosbag) may not work correctly.", ops.topic.c_str());
323  }
324 
325  PublicationPtr pub;
326 
327  {
328  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
329 
330  if (isShuttingDown())
331  {
332  return false;
333  }
334 
336  if (pub && pub->getNumCallbacks() == 0)
337  {
338  pub.reset();
339  }
340 
341  if (pub)
342  {
343  if (pub->getMD5Sum() != ops.md5sum)
344  {
345  ROS_ERROR("Tried to advertise on topic [%s] with md5sum [%s] and datatype [%s], but the topic is already advertised as md5sum [%s] and datatype [%s]",
346  ops.topic.c_str(), ops.md5sum.c_str(), ops.datatype.c_str(), pub->getMD5Sum().c_str(), pub->getDataType().c_str());
347  return false;
348  }
349 
350  pub->addCallbacks(callbacks);
351 
352  return true;
353  }
354 
355  pub = PublicationPtr(boost::make_shared<Publication>(ops.topic, ops.datatype, ops.md5sum, ops.message_definition, ops.queue_size, false, ops.has_header));
356  pub->addCallbacks(callbacks);
357  advertised_topics_.push_back(pub);
358  }
359 
360 
361  {
362  boost::mutex::scoped_lock lock(advertised_topic_names_mutex_);
363  advertised_topic_names_.push_back(ops.topic);
364  }
365 
366  // Check whether we've already subscribed to this topic. If so, we'll do
367  // the self-subscription here, to avoid the deadlock that would happen if
368  // the ROS thread later got the publisherUpdate with its own XMLRPC URI.
369  // The assumption is that advertise() is called from somewhere other
370  // than the ROS thread.
371  bool found = false;
372  SubscriptionPtr sub;
373  {
374  boost::mutex::scoped_lock lock(subs_mutex_);
375 
376  for (L_Subscription::iterator s = subscriptions_.begin();
377  s != subscriptions_.end() && !found; ++s)
378  {
379  if ((*s)->getName() == ops.topic && md5sumsMatch((*s)->md5sum(), ops.md5sum) && !(*s)->isDropped())
380  {
381  found = true;
382  sub = *s;
383  break;
384  }
385  }
386  }
387 
388  if(found)
389  {
390  sub->addLocalConnection(pub);
391  }
392 
393  XmlRpcValue args, result, payload;
394  args[0] = this_node::getName();
395  args[1] = ops.topic;
396  args[2] = ops.datatype;
397  args[3] = xmlrpc_manager_->getServerURI();
398  master::execute("registerPublisher", args, result, payload, true);
399 
400  return true;
401 }
402 
403 bool TopicManager::unadvertise(const std::string &topic, const SubscriberCallbacksPtr& callbacks)
404 {
405  PublicationPtr pub;
406  V_Publication::iterator i;
407  {
408  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
409 
410  if (isShuttingDown())
411  {
412  return false;
413  }
414 
415  for (i = advertised_topics_.begin();
416  i != advertised_topics_.end(); ++i)
417  {
418  if(((*i)->getName() == topic) && (!(*i)->isDropped()))
419  {
420  pub = *i;
421  break;
422  }
423  }
424  }
425 
426  if (!pub)
427  {
428  return false;
429  }
430 
431  pub->removeCallbacks(callbacks);
432 
433  {
434  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
435  if (pub->getNumCallbacks() == 0)
436  {
437  unregisterPublisher(pub->getName());
438  pub->drop();
439 
440  advertised_topics_.erase(i);
441 
442  {
443  boost::mutex::scoped_lock lock(advertised_topic_names_mutex_);
444  advertised_topic_names_.remove(pub->getName());
445  }
446  }
447  }
448 
449  return true;
450 }
451 
452 bool TopicManager::unregisterPublisher(const std::string& topic)
453 {
454  XmlRpcValue args, result, payload;
455  args[0] = this_node::getName();
456  args[1] = topic;
457  args[2] = xmlrpc_manager_->getServerURI();
458  master::execute("unregisterPublisher", args, result, payload, false);
459 
460  return true;
461 }
462 
463 bool TopicManager::isTopicAdvertised(const string &topic)
464 {
465  for (V_Publication::iterator t = advertised_topics_.begin(); t != advertised_topics_.end(); ++t)
466  {
467  if (((*t)->getName() == topic) && (!(*t)->isDropped()))
468  {
469  return true;
470  }
471  }
472 
473  return false;
474 }
475 
476 bool TopicManager::registerSubscriber(const SubscriptionPtr& s, const string &datatype)
477 {
478  XmlRpcValue args, result, payload;
479  args[0] = this_node::getName();
480  args[1] = s->getName();
481  args[2] = datatype;
482  args[3] = xmlrpc_manager_->getServerURI();
483 
484  if (!master::execute("registerSubscriber", args, result, payload, true))
485  {
486  return false;
487  }
488 
489  vector<string> pub_uris;
490  for (int i = 0; i < payload.size(); i++)
491  {
492  if (payload[i] != xmlrpc_manager_->getServerURI())
493  {
494  pub_uris.push_back(string(payload[i]));
495  }
496  }
497 
498  bool self_subscribed = false;
499  PublicationPtr pub;
500  const std::string& sub_md5sum = s->md5sum();
501  // Figure out if we have a local publisher
502  {
503  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
504  V_Publication::const_iterator it = advertised_topics_.begin();
505  V_Publication::const_iterator end = advertised_topics_.end();
506  for (; it != end; ++it)
507  {
508  pub = *it;
509  const std::string& pub_md5sum = pub->getMD5Sum();
510 
511  if (pub->getName() == s->getName() && !pub->isDropped())
512  {
513  if (!md5sumsMatch(pub_md5sum, sub_md5sum))
514  {
515  ROS_ERROR("md5sum mismatch making local subscription to topic %s.",
516  s->getName().c_str());
517  ROS_ERROR("Subscriber expects type %s, md5sum %s",
518  s->datatype().c_str(), s->md5sum().c_str());
519  ROS_ERROR("Publisher provides type %s, md5sum %s",
520  pub->getDataType().c_str(), pub->getMD5Sum().c_str());
521  return false;
522  }
523 
524  self_subscribed = true;
525  break;
526  }
527  }
528  }
529 
530  s->pubUpdate(pub_uris);
531  if (self_subscribed)
532  {
533  s->addLocalConnection(pub);
534  }
535 
536  return true;
537 }
538 
539 bool TopicManager::unregisterSubscriber(const string &topic)
540 {
541  XmlRpcValue args, result, payload;
542  args[0] = this_node::getName();
543  args[1] = topic;
544  args[2] = xmlrpc_manager_->getServerURI();
545 
546  master::execute("unregisterSubscriber", args, result, payload, false);
547 
548  return true;
549 }
550 
551 bool TopicManager::pubUpdate(const string &topic, const vector<string> &pubs)
552 {
553  SubscriptionPtr sub;
554  {
555  boost::mutex::scoped_lock lock(subs_mutex_);
556 
557  if (isShuttingDown())
558  {
559  return false;
560  }
561 
562  ROS_DEBUG("Received update for topic [%s] (%d publishers)", topic.c_str(), (int)pubs.size());
563  // find the subscription
564  for (L_Subscription::const_iterator s = subscriptions_.begin();
565  s != subscriptions_.end(); ++s)
566  {
567  if ((*s)->getName() != topic || (*s)->isDropped())
568  continue;
569 
570  sub = *s;
571  break;
572  }
573 
574  }
575 
576  if (sub)
577  {
578  return sub->pubUpdate(pubs);
579  }
580  else
581  {
582  ROSCPP_LOG_DEBUG("got a request for updating publishers of topic %s, but I " \
583  "don't have any subscribers to that topic.", topic.c_str());
584  }
585 
586  return false;
587 }
588 
589 bool TopicManager::requestTopic(const string &topic,
590  XmlRpcValue &protos,
591  XmlRpcValue &ret)
592 {
593  for (int proto_idx = 0; proto_idx < protos.size(); proto_idx++)
594  {
595  XmlRpcValue proto = protos[proto_idx]; // save typing
596  if (proto.getType() != XmlRpcValue::TypeArray)
597  {
598  ROSCPP_LOG_DEBUG( "requestTopic protocol list was not a list of lists");
599  return false;
600  }
601 
602  if (proto[0].getType() != XmlRpcValue::TypeString)
603  {
604  ROSCPP_LOG_DEBUG( "requestTopic received a protocol list in which a sublist " \
605  "did not start with a string");
606  return false;
607  }
608 
609  string proto_name = proto[0];
610  if (proto_name == string("TCPROS"))
611  {
612  XmlRpcValue tcpros_params;
613  tcpros_params[0] = string("TCPROS");
614  tcpros_params[1] = network::getHost();
615  tcpros_params[2] = int(connection_manager_->getTCPPort());
616  ret[0] = int(1);
617  ret[1] = string();
618  ret[2] = tcpros_params;
619  return true;
620  }
621  else if (proto_name == string("UDPROS"))
622  {
623  if (proto.size() != 5 ||
624  proto[1].getType() != XmlRpcValue::TypeBase64 ||
625  proto[2].getType() != XmlRpcValue::TypeString ||
626  proto[3].getType() != XmlRpcValue::TypeInt ||
627  proto[4].getType() != XmlRpcValue::TypeInt)
628  {
629  ROSCPP_LOG_DEBUG("Invalid protocol parameters for UDPROS");
630  return false;
631  }
632  std::vector<char> header_bytes = proto[1];
633  boost::shared_array<uint8_t> buffer(new uint8_t[header_bytes.size()]);
634  memcpy(buffer.get(), &header_bytes[0], header_bytes.size());
635  Header h;
636  string err;
637  if (!h.parse(buffer, header_bytes.size(), err))
638  {
639  ROSCPP_LOG_DEBUG("Unable to parse UDPROS connection header: %s", err.c_str());
640  return false;
641  }
642 
643  PublicationPtr pub_ptr = lookupPublication(topic);
644  if(!pub_ptr)
645  {
646  ROSCPP_LOG_DEBUG("Unable to find advertised topic %s for UDPROS connection", topic.c_str());
647  return false;
648  }
649 
650  std::string host = proto[2];
651  int port = proto[3];
652 
653  M_string m;
654  std::string error_msg;
655  if (!pub_ptr->validateHeader(h, error_msg))
656  {
657  ROSCPP_LOG_DEBUG("Error validating header from [%s:%d] for topic [%s]: %s", host.c_str(), port, topic.c_str(), error_msg.c_str());
658  return false;
659  }
660 
661  int max_datagram_size = proto[4];
662  int conn_id = connection_manager_->getNewConnectionID();
663  TransportUDPPtr transport = connection_manager_->getUDPServerTransport()->createOutgoing(host, port, conn_id, max_datagram_size);
664  if (!transport)
665  {
666  ROSCPP_LOG_DEBUG("Error creating outgoing transport for [%s:%d]", host.c_str(), port);
667  return false;
668  }
669  connection_manager_->udprosIncomingConnection(transport, h);
670 
671  XmlRpcValue udpros_params;
672  udpros_params[0] = string("UDPROS");
673  udpros_params[1] = network::getHost();
674  udpros_params[2] = connection_manager_->getUDPServerTransport()->getServerPort();
675  udpros_params[3] = conn_id;
676  udpros_params[4] = max_datagram_size;
677  m["topic"] = topic;
678  m["md5sum"] = pub_ptr->getMD5Sum();
679  m["type"] = pub_ptr->getDataType();
680  m["callerid"] = this_node::getName();
681  m["message_definition"] = pub_ptr->getMessageDefinition();
682  boost::shared_array<uint8_t> msg_def_buffer;
683  uint32_t len;
684  Header::write(m, msg_def_buffer, len);
685  XmlRpcValue v(msg_def_buffer.get(), len);
686  udpros_params[5] = v;
687  ret[0] = int(1);
688  ret[1] = string();
689  ret[2] = udpros_params;
690  return true;
691  }
692  else
693  {
694  ROSCPP_LOG_DEBUG( "an unsupported protocol was offered: [%s]",
695  proto_name.c_str());
696  }
697  }
698 
699  ROSCPP_LOG_DEBUG( "Currently, roscpp only supports TCPROS. The caller to " \
700  "requestTopic did not support TCPROS, so there are no " \
701  "protocols in common.");
702  return false;
703 }
704 
705 void TopicManager::publish(const std::string& topic, const boost::function<SerializedMessage(void)>& serfunc, SerializedMessage& m)
706 {
707  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
708 
709  if (isShuttingDown())
710  {
711  return;
712  }
713 
715  if (p->hasSubscribers() || p->isLatching())
716  {
717  ROS_DEBUG_NAMED("superdebug", "Publishing message on topic [%s] with sequence number [%d]", p->getName().c_str(), p->getSequence());
718 
719  // Determine what kinds of subscribers we're publishing to. If they're intraprocess with the same C++ type we can
720  // do a no-copy publish.
721  bool nocopy = false;
722  bool serialize = false;
723 
724  // We can only do a no-copy publish if a shared_ptr to the message is provided, and we have type information for it
725  if (m.type_info && m.message)
726  {
727  p->getPublishTypes(serialize, nocopy, *m.type_info);
728  }
729  else
730  {
731  serialize = true;
732  }
733 
734  if (!nocopy)
735  {
736  m.message.reset();
737  m.type_info = 0;
738  }
739 
740  if (serialize || p->isLatching())
741  {
742  SerializedMessage m2 = serfunc();
743  m.buf = m2.buf;
744  m.num_bytes = m2.num_bytes;
746  }
747 
748  p->publish(m);
749 
750  // If we're not doing a serialized publish we don't need to signal the pollset. The write()
751  // call inside signal() is actually relatively expensive when doing a nocopy publish.
752  if (serialize)
753  {
754  poll_manager_->getPollSet().signal();
755  }
756  }
757  else
758  {
759  p->incrementSequence();
760  }
761 }
762 
763 void TopicManager::incrementSequence(const std::string& topic)
764 {
765  PublicationPtr pub = lookupPublication(topic);
766  if (pub)
767  {
768  pub->incrementSequence();
769  }
770 }
771 
772 bool TopicManager::isLatched(const std::string& topic)
773 {
774  PublicationPtr pub = lookupPublication(topic);
775  if (pub)
776  {
777  return pub->isLatched();
778  }
779 
780  return false;
781 }
782 
784 {
785  PublicationPtr t;
786  for (V_Publication::iterator i = advertised_topics_.begin();
787  !t && i != advertised_topics_.end(); ++i)
788  {
789  if (((*i)->getName() == topic) && (!(*i)->isDropped()))
790  {
791  t = *i;
792  break;
793  }
794  }
795 
796  return t;
797 }
798 
799 bool TopicManager::unsubscribe(const std::string &topic, const SubscriptionCallbackHelperPtr& helper)
800 {
801  SubscriptionPtr sub;
802 
803  {
804  boost::mutex::scoped_lock lock(subs_mutex_);
805 
806  if (isShuttingDown())
807  {
808  return false;
809  }
810 
811  L_Subscription::iterator it;
812  for (it = subscriptions_.begin();
813  it != subscriptions_.end(); ++it)
814  {
815  if ((*it)->getName() == topic)
816  {
817  sub = *it;
818  break;
819  }
820  }
821  }
822 
823  if (!sub)
824  {
825  return false;
826  }
827 
828  sub->removeCallback(helper);
829 
830  if (sub->getNumCallbacks() == 0)
831  {
832  // nobody is left. blow away the subscription.
833  {
834  boost::mutex::scoped_lock lock(subs_mutex_);
835 
836  L_Subscription::iterator it;
837  for (it = subscriptions_.begin();
838  it != subscriptions_.end(); ++it)
839  {
840  if ((*it)->getName() == topic)
841  {
842  subscriptions_.erase(it);
843  break;
844  }
845  }
846 
847  if (!unregisterSubscriber(topic))
848  {
849  ROSCPP_LOG_DEBUG("Couldn't unregister subscriber for topic [%s]", topic.c_str());
850  }
851  }
852 
853  sub->shutdown();
854  return true;
855  }
856 
857  return true;
858 }
859 
860 size_t TopicManager::getNumSubscribers(const std::string &topic)
861 {
862  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
863 
864  if (isShuttingDown())
865  {
866  return 0;
867  }
868 
870  if (p)
871  {
872  return p->getNumSubscribers();
873  }
874 
875  return 0;
876 }
877 
879 {
880  boost::mutex::scoped_lock lock(subs_mutex_);
881  return subscriptions_.size();
882 }
883 
884 size_t TopicManager::getNumPublishers(const std::string &topic)
885 {
886  boost::mutex::scoped_lock lock(subs_mutex_);
887 
888  if (isShuttingDown())
889  {
890  return 0;
891  }
892 
893  for (L_Subscription::const_iterator t = subscriptions_.begin();
894  t != subscriptions_.end(); ++t)
895  {
896  if (!(*t)->isDropped() && (*t)->getName() == topic)
897  {
898  return (*t)->getNumPublishers();
899  }
900  }
901 
902  return 0;
903 }
904 
906 {
907  XmlRpcValue publish_stats, subscribe_stats, service_stats;
908  // force these guys to be arrays, even if we don't populate them
909  publish_stats.setSize(0);
910  subscribe_stats.setSize(0);
911  service_stats.setSize(0);
912 
913  uint32_t pidx = 0;
914  {
915  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
916  for (V_Publication::iterator t = advertised_topics_.begin();
917  t != advertised_topics_.end(); ++t)
918  {
919  publish_stats[pidx++] = (*t)->getStats();
920  }
921  }
922 
923  {
924  uint32_t sidx = 0;
925 
926  boost::mutex::scoped_lock lock(subs_mutex_);
927  for (L_Subscription::iterator t = subscriptions_.begin(); t != subscriptions_.end(); ++t)
928  {
929  subscribe_stats[sidx++] = (*t)->getStats();
930  }
931  }
932 
933  stats[0] = publish_stats;
934  stats[1] = subscribe_stats;
935  stats[2] = service_stats;
936 }
937 
939 {
940  // force these guys to be arrays, even if we don't populate them
941  info.setSize(0);
942 
943  {
944  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
945 
946  for (V_Publication::iterator t = advertised_topics_.begin();
947  t != advertised_topics_.end(); ++t)
948  {
949  (*t)->getInfo(info);
950  }
951  }
952 
953  {
954  boost::mutex::scoped_lock lock(subs_mutex_);
955 
956  for (L_Subscription::iterator t = subscriptions_.begin(); t != subscriptions_.end(); ++t)
957  {
958  (*t)->getInfo(info);
959  }
960  }
961 }
962 
964 {
965  // force these guys to be arrays, even if we don't populate them
966  subs.setSize(0);
967 
968  {
969  boost::mutex::scoped_lock lock(subs_mutex_);
970 
971  uint32_t sidx = 0;
972 
973  for (L_Subscription::iterator t = subscriptions_.begin(); t != subscriptions_.end(); ++t)
974  {
975  XmlRpcValue sub;
976  sub[0] = (*t)->getName();
977  sub[1] = (*t)->datatype();
978  subs[sidx++] = sub;
979  }
980  }
981 }
982 
984 {
985  // force these guys to be arrays, even if we don't populate them
986  pubs.setSize(0);
987 
988  {
989  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
990 
991  uint32_t sidx = 0;
992 
993  for (V_Publication::iterator t = advertised_topics_.begin();
994  t != advertised_topics_.end(); ++t)
995  {
996  XmlRpcValue pub;
997  pub[0] = (*t)->getName();
998  pub[1] = (*t)->getDataType();
999  pubs[sidx++] = pub;
1000  }
1001 
1002  }
1003 }
1004 
1006 {
1007  std::vector<std::string> pubs;
1008  for (int idx = 0; idx < params[2].size(); idx++)
1009  {
1010  pubs.push_back(params[2][idx]);
1011  }
1012  if (pubUpdate(params[1], pubs))
1013  {
1014  result = xmlrpc::responseInt(1, "", 0);
1015  }
1016  else
1017  {
1019  }
1020 }
1021 
1023 {
1024  if (!requestTopic(params[1], params[2], result))
1025  {
1027  }
1028 }
1029 
1031 {
1032  (void)params;
1033  result[0] = 1;
1034  result[1] = std::string("");
1036  getBusStats(result);
1037  result[2] = response;
1038 }
1039 
1041 {
1042  (void)params;
1043  result[0] = 1;
1044  result[1] = std::string("");
1047  result[2] = response;
1048 }
1049 
1051 {
1052  (void)params;
1053  result[0] = 1;
1054  result[1] = std::string("subscriptions");
1057  result[2] = response;
1058 }
1059 
1061 {
1062  (void)params;
1063  result[0] = 1;
1064  result[1] = std::string("publications");
1067  result[2] = response;
1068 }
1069 
1070 } // namespace ros
XmlRpc::XmlRpcValue::size
int size() const
ros::TopicManager::getBusInfoCallback
void getBusInfoCallback(XmlRpc::XmlRpcValue &params, XmlRpc::XmlRpcValue &result)
Definition: topic_manager.cpp:1040
connection_manager.h
response
const std::string response
this_node.h
ros::AdvertiseOptions::topic
std::string topic
The topic to publish on.
Definition: advertise_options.h:98
ros::SerializedMessage
ros::TopicManager::isTopicAdvertised
bool isTopicAdvertised(const std::string &topic)
Definition: topic_manager.cpp:463
ros::SerializedMessage::message_start
uint8_t * message_start
ros::TopicManager::getNumSubscriptions
size_t getNumSubscriptions()
Definition: topic_manager.cpp:878
ros::TopicManager::pubUpdateCallback
void pubUpdateCallback(XmlRpc::XmlRpcValue &params, XmlRpc::XmlRpcValue &result)
Definition: topic_manager.cpp:1005
md5sum
const char * md5sum()
ros::TopicManager::getNumSubscribers
size_t getNumSubscribers(const std::string &_topic)
Return the number of subscribers a node has for a particular topic:
Definition: topic_manager.cpp:860
ros::TopicManager::poll_manager_
PollManagerPtr poll_manager_
Definition: topic_manager.h:233
boost::shared_ptr< TopicManager >
ros::TopicManager::shutdown
void shutdown()
Definition: topic_manager.cpp:92
ros::xmlrpc::responseInt
ROSCPP_DECL XmlRpc::XmlRpcValue responseInt(int code, const std::string &msg, int response)
Definition: xmlrpc_manager.cpp:52
s
XmlRpcServer s
ros::TopicManager::processPublishQueues
void processPublishQueues()
Definition: topic_manager.cpp:150
ros::md5sumsMatch
bool md5sumsMatch(const std::string &lhs, const std::string &rhs)
Definition: topic_manager.cpp:194
ros
ros::TopicManager::~TopicManager
~TopicManager()
Definition: topic_manager.cpp:68
ros::Header
ros::TopicManager::requestTopic
bool requestTopic(const std::string &topic, XmlRpc::XmlRpcValue &protos, XmlRpc::XmlRpcValue &ret)
Request a topic.
Definition: topic_manager.cpp:589
ros::SubscribeOptions::topic
std::string topic
Topic to subscribe to.
Definition: subscribe_options.h:115
ros::SubscribeOptions::transport_hints
TransportHints transport_hints
Hints for transport type and options.
Definition: subscribe_options.h:141
ros::TopicManager::getNumPublishers
size_t getNumPublishers(const std::string &_topic)
Return the number of publishers connected to this node on a particular topic.
Definition: topic_manager.cpp:884
ros::SubscribeOptions::callback_queue
CallbackQueueInterface * callback_queue
Queue to add callbacks to. If NULL, the global callback queue will be used.
Definition: subscribe_options.h:123
ros::SerializedMessage::message
boost::shared_ptr< void const > message
ros::SubscribeOptions::allow_concurrent_callbacks
bool allow_concurrent_callbacks
Definition: subscribe_options.h:127
ros::TopicManager::subs_mutex_
boost::mutex subs_mutex_
Definition: topic_manager.h:222
XmlRpc
subscription.h
ros::TopicManager::start
void start()
Definition: topic_manager.cpp:73
ros::AdvertiseOptions::datatype
std::string datatype
The datatype of the message published on this topic (eg. "std_msgs/String")
Definition: advertise_options.h:102
ros::TopicManager::getBusStats
void getBusStats(XmlRpc::XmlRpcValue &stats)
Compute the statistics for the node's connectivity.
Definition: topic_manager.cpp:905
ros::AdvertiseOptions::has_header
bool has_header
Tells whether or not the message has a header. If it does, the sequence number will be written direct...
Definition: advertise_options.h:131
ros::TopicManager::getBusStatsCallback
void getBusStatsCallback(XmlRpc::XmlRpcValue &params, XmlRpc::XmlRpcValue &result)
Definition: topic_manager.cpp:1030
ros::SubscribeOptions::tracked_object
VoidConstPtr tracked_object
An object whose destruction will prevent the callback associated with this subscription.
Definition: subscribe_options.h:139
ros::Header::write
static void write(const M_string &key_vals, boost::shared_array< uint8_t > &buffer, uint32_t &size)
ros::TopicManager::advertised_topic_names_
std::list< std::string > advertised_topic_names_
Definition: topic_manager.h:227
ros::AdvertiseOptions
Encapsulates all options available for creating a Publisher.
Definition: advertise_options.h:41
ros::console::g_last_error_message
ROSCONSOLE_DECL std::string g_last_error_message
ros::TopicManager::addSubCallback
bool addSubCallback(const SubscribeOptions &ops)
Definition: topic_manager.cpp:199
ros::network::getHost
const ROSCPP_DECL std::string & getHost()
Definition: network.cpp:51
ros::InvalidParameterException
Thrown when an invalid parameter is passed to a method.
Definition: exceptions.h:74
topic_manager.h
ros::TopicManager::pubUpdate
bool pubUpdate(const std::string &topic, const std::vector< std::string > &pubs)
Update local publisher lists.
Definition: topic_manager.cpp:551
console.h
ros::SubscribeOptions::md5sum
std::string md5sum
MD5 of the message datatype.
Definition: subscribe_options.h:118
ros::ConflictingSubscriptionException
Thrown when a second (third,...) subscription is attempted with conflicting arguments.
Definition: exceptions.h:63
init.h
ros::TopicManager::connection_manager_
ConnectionManagerPtr connection_manager_
Definition: topic_manager.h:234
ROS_DEBUG
#define ROS_DEBUG(...)
ROS_DEBUG_NAMED
#define ROS_DEBUG_NAMED(name,...)
ros::AdvertiseOptions::queue_size
uint32_t queue_size
The maximum number of outgoing messages to be queued for delivery to subscribers.
Definition: advertise_options.h:99
ros::TopicManager::subscriptions_
L_Subscription subscriptions_
Definition: topic_manager.h:223
ros::TopicManager::requestTopicCallback
void requestTopicCallback(XmlRpc::XmlRpcValue &params, XmlRpc::XmlRpcValue &result)
Definition: topic_manager.cpp:1022
ros::TopicManager::unsubscribe
bool unsubscribe(const std::string &_topic, const SubscriptionCallbackHelperPtr &helper)
Definition: topic_manager.cpp:799
transport_udp.h
ros::TopicManager::advertised_topics_
V_Publication advertised_topics_
Definition: topic_manager.h:226
subscribe_options.h
ros::TopicManager::lookupPublication
PublicationPtr lookupPublication(const std::string &topic)
Lookup an advertised topic.
Definition: topic_manager.cpp:187
ros::PublicationPtr
boost::shared_ptr< Publication > PublicationPtr
Definition: forwards.h:66
transport_tcp.h
ros::Header::parse
bool parse(const boost::shared_array< uint8_t > &buffer, uint32_t size, std::string &error_msg)
boost::shared_array< uint8_t >
XmlRpc.h
ROS_WARN
#define ROS_WARN(...)
ros::TopicManager::shutting_down_mutex_
boost::mutex shutting_down_mutex_
Definition: topic_manager.h:231
ros::XMLRPCManager::instance
static const XMLRPCManagerPtr & instance()
Definition: xmlrpc_manager.cpp:98
ros::PollManager::instance
static const PollManagerPtr & instance()
Definition: poll_manager.cpp:36
xmlrpc_manager.h
ros::SerializedMessage::num_bytes
size_t num_bytes
ros::TopicManager::subscribe
bool subscribe(const SubscribeOptions &ops)
Definition: topic_manager.cpp:247
XmlRpc::XmlRpcValue::getType
const Type & getType() const
ros::TopicManager::getSubscribedTopics
void getSubscribedTopics(V_string &topics)
Get the list of topics subscribed to by this node.
Definition: topic_manager.cpp:173
ros::SerializedMessage::type_info
const std::type_info * type_info
ros::master::execute
ROSCPP_DECL bool execute(const std::string &method, const XmlRpc::XmlRpcValue &request, XmlRpc::XmlRpcValue &response, XmlRpc::XmlRpcValue &payload, bool wait_for_master)
Execute an XMLRPC call on the master.
Definition: master.cpp:175
ros::this_node::getName
const ROSCPP_DECL std::string & getName()
Returns the name of the current node.
Definition: this_node.cpp:74
ros::TopicManager::isLatched
bool isLatched(const std::string &topic)
Definition: topic_manager.cpp:772
ros::TopicManager::incrementSequence
void incrementSequence(const std::string &_topic)
Definition: topic_manager.cpp:763
ros::TopicManager::publish
void publish(const std::string &topic, const M &message)
Definition: topic_manager.h:123
ros::TopicManager::unadvertise
bool unadvertise(const std::string &topic, const SubscriberCallbacksPtr &callbacks)
Definition: topic_manager.cpp:403
ros::TopicManager::advertised_topic_names_mutex_
boost::mutex advertised_topic_names_mutex_
Definition: topic_manager.h:228
ros::TopicManager::unregisterSubscriber
bool unregisterSubscriber(const std::string &topic)
Definition: topic_manager.cpp:539
XmlRpc::XmlRpcServer::shutdown
void shutdown()
ros::SubscribeOptions
Encapsulates all options available for creating a Subscriber.
Definition: subscribe_options.h:43
master.h
ros::TopicManager::registerSubscriber
bool registerSubscriber(const SubscriptionPtr &s, const std::string &datatype)
Definition: topic_manager.cpp:476
ros::TopicManager::unregisterPublisher
bool unregisterPublisher(const std::string &topic)
Definition: topic_manager.cpp:452
poll_manager.h
ros::TopicManager::getPublications
void getPublications(XmlRpc::XmlRpcValue &publications)
Return the list of advertised topics for the node.
Definition: topic_manager.cpp:983
ros::SubscribeOptions::queue_size
uint32_t queue_size
Number of incoming messages to queue up for processing (messages in excess of this queue capacity wil...
Definition: subscribe_options.h:116
std
publication.h
ros::TopicManager::getSubscriptionsCallback
void getSubscriptionsCallback(XmlRpc::XmlRpcValue &params, XmlRpc::XmlRpcValue &result)
Definition: topic_manager.cpp:1050
ros::TopicManager::getBusInfo
void getBusInfo(XmlRpc::XmlRpcValue &info)
Compute the info for the node's connectivity.
Definition: topic_manager.cpp:938
ROS_ERROR
#define ROS_ERROR(...)
network.h
ros::ConnectionManager::instance
static const ConnectionManagerPtr & instance()
Definition: connection_manager.cpp:44
XmlRpc::XmlRpcValue::setSize
void setSize(int size)
datatype
const char * datatype()
ros::TopicManager::getSubscriptions
void getSubscriptions(XmlRpc::XmlRpcValue &subscriptions)
Return the list of subcriptions for the node.
Definition: topic_manager.cpp:963
ros::AdvertiseOptions::md5sum
std::string md5sum
The md5sum of the message datatype published on this topic.
Definition: advertise_options.h:101
rosout_appender.h
ros::V_string
std::vector< std::string > V_string
ros::SubscribeOptions::datatype
std::string datatype
Datatype of the message we'd like to subscribe as.
Definition: subscribe_options.h:119
ros::AdvertiseOptions::message_definition
std::string message_definition
The full definition of the message published on this topic.
Definition: advertise_options.h:103
ros::TopicManager::xmlrpc_manager_
XMLRPCManagerPtr xmlrpc_manager_
Definition: topic_manager.h:235
ros::SubscribeOptions::helper
SubscriptionCallbackHelperPtr helper
Helper object used to get create messages and call callbacks.
Definition: subscribe_options.h:121
ros::TopicManager::lookupPublicationWithoutLock
PublicationPtr lookupPublicationWithoutLock(const std::string &topic)
Definition: topic_manager.cpp:783
ros::TopicManager::shutting_down_
volatile bool shutting_down_
Definition: topic_manager.h:230
serialize
void serialize(Stream &stream, const boost::array< T, N > &t)
ROSCPP_LOG_DEBUG
#define ROSCPP_LOG_DEBUG(...)
Definition: file_log.h:35
ros::TopicManager::advertised_topics_mutex_
boost::recursive_mutex advertised_topics_mutex_
Definition: topic_manager.h:225
ros::TopicManager::getPublicationsCallback
void getPublicationsCallback(XmlRpc::XmlRpcValue &params, XmlRpc::XmlRpcValue &result)
Definition: topic_manager.cpp:1060
ros::SerializedMessage::buf
boost::shared_array< uint8_t > buf
XmlRpc::XmlRpcValue
file_log.h
ros::TopicManager::isShuttingDown
bool isShuttingDown()
Definition: topic_manager.h:220
ros::M_string
std::map< std::string, std::string > M_string
ros::TopicManager::getAdvertisedTopics
void getAdvertisedTopics(V_string &topics)
Get the list of topics advertised by this node.
Definition: topic_manager.cpp:163
ros::TopicManager::advertise
bool advertise(const AdvertiseOptions &ops, const SubscriberCallbacksPtr &callbacks)
Definition: topic_manager.cpp:294


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas , Jacob Perron
autogenerated on Tue May 20 2025 03:00:11