topic_manager.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2009, Willow Garage, Inc.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are met:
6  * * Redistributions of source code must retain the above copyright notice,
7  * this list of conditions and the following disclaimer.
8  * * Redistributions in binary form must reproduce the above copyright
9  * notice, this list of conditions and the following disclaimer in the
10  * documentation and/or other materials provided with the distribution.
11  * * Neither the names of Willow Garage, Inc. nor the names of its
12  * contributors may be used to endorse or promote products derived from
13  * this software without specific prior written permission.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
19  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25  * POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #include "ros/topic_manager.h"
29 #include "ros/xmlrpc_manager.h"
30 #include "ros/connection_manager.h"
31 #include "ros/poll_manager.h"
32 #include "ros/publication.h"
33 #include "ros/subscription.h"
34 #include "ros/this_node.h"
35 #include "ros/network.h"
36 #include "ros/master.h"
39 #include "ros/rosout_appender.h"
40 #include "ros/init.h"
41 #include "ros/file_log.h"
42 #include "ros/subscribe_options.h"
43 
44 #include "xmlrpcpp/XmlRpc.h"
45 
46 #include <ros/console.h>
47 
48 using namespace XmlRpc; // A battle to be fought later
49 using namespace std; // sigh
50 
52 
53 namespace ros
54 {
55 
56 const TopicManagerPtr& TopicManager::instance()
57 {
58  static TopicManagerPtr topic_manager = boost::make_shared<TopicManager>();
59  return topic_manager;
60 }
61 
62 TopicManager::TopicManager()
63 : shutting_down_(false)
64 {
65 }
66 
68 {
69  shutdown();
70 }
71 
73 {
74  boost::mutex::scoped_lock shutdown_lock(shutting_down_mutex_);
75  shutting_down_ = false;
76 
80 
81  xmlrpc_manager_->bind("publisherUpdate", boost::bind(&TopicManager::pubUpdateCallback, this, boost::placeholders::_1, boost::placeholders::_2));
82  xmlrpc_manager_->bind("requestTopic", boost::bind(&TopicManager::requestTopicCallback, this, boost::placeholders::_1, boost::placeholders::_2));
83  xmlrpc_manager_->bind("getBusStats", boost::bind(&TopicManager::getBusStatsCallback, this, boost::placeholders::_1, boost::placeholders::_2));
84  xmlrpc_manager_->bind("getBusInfo", boost::bind(&TopicManager::getBusInfoCallback, this, boost::placeholders::_1, boost::placeholders::_2));
85  xmlrpc_manager_->bind("getSubscriptions", boost::bind(&TopicManager::getSubscriptionsCallback, this, boost::placeholders::_1, boost::placeholders::_2));
86  xmlrpc_manager_->bind("getPublications", boost::bind(&TopicManager::getPublicationsCallback, this, boost::placeholders::_1, boost::placeholders::_2));
87 
88  poll_manager_->addPollThreadListener(boost::bind(&TopicManager::processPublishQueues, this));
89 }
90 
92 {
93  boost::mutex::scoped_lock shutdown_lock(shutting_down_mutex_);
94  if (shutting_down_)
95  {
96  return;
97  }
98 
99  {
101  shutting_down_ = true;
102  subs_mutex_.unlock();
103  advertised_topics_mutex_.unlock();
104  }
105 
106  // actually one should call poll_manager_->removePollThreadListener(), but the connection is not stored above
107  poll_manager_->shutdown();
108 
109  xmlrpc_manager_->unbind("publisherUpdate");
110  xmlrpc_manager_->unbind("requestTopic");
111  xmlrpc_manager_->unbind("getBusStats");
112  xmlrpc_manager_->unbind("getBusInfo");
113  xmlrpc_manager_->unbind("getSubscriptions");
114  xmlrpc_manager_->unbind("getPublications");
115 
116  ROSCPP_LOG_DEBUG("Shutting down topics...");
117  ROSCPP_LOG_DEBUG(" shutting down publishers");
118  {
119  boost::recursive_mutex::scoped_lock adv_lock(advertised_topics_mutex_);
120 
121  for (V_Publication::iterator i = advertised_topics_.begin();
122  i != advertised_topics_.end(); ++i)
123  {
124  if(!(*i)->isDropped())
125  {
126  unregisterPublisher((*i)->getName());
127  }
128  (*i)->drop();
129  }
130  advertised_topics_.clear();
131  }
132 
133  // unregister all of our subscriptions
134  ROSCPP_LOG_DEBUG(" shutting down subscribers");
135  {
136  boost::mutex::scoped_lock subs_lock(subs_mutex_);
137 
138  for (L_Subscription::iterator s = subscriptions_.begin(); s != subscriptions_.end(); ++s)
139  {
140  // Remove us as a subscriber from the master
141  unregisterSubscriber((*s)->getName());
142  // now, drop our side of the connection
143  (*s)->shutdown();
144  }
145  subscriptions_.clear();
146  }
147 }
148 
150 {
151  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
152 
153  V_Publication::iterator it = advertised_topics_.begin();
154  V_Publication::iterator end = advertised_topics_.end();
155  for (; it != end; ++it)
156  {
157  const PublicationPtr& pub = *it;
158  pub->processPublishQueue();
159  }
160 }
161 
163 {
164  boost::mutex::scoped_lock lock(advertised_topic_names_mutex_);
165 
166  topics.resize(advertised_topic_names_.size());
167  std::copy(advertised_topic_names_.begin(),
169  topics.begin());
170 }
171 
173 {
174  boost::mutex::scoped_lock lock(subs_mutex_);
175 
176  topics.reserve(subscriptions_.size());
177  L_Subscription::const_iterator it = subscriptions_.begin();
178  L_Subscription::const_iterator end = subscriptions_.end();
179  for (; it != end; ++it)
180  {
181  const SubscriptionPtr& sub = *it;
182  topics.push_back(sub->getName());
183  }
184 }
185 
187 {
188  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
189 
190  return lookupPublicationWithoutLock(topic);
191 }
192 
193 bool md5sumsMatch(const std::string& lhs, const std::string& rhs)
194 {
195  return lhs == "*" || rhs == "*" || lhs == rhs;
196 }
197 
199 {
200  // spin through the subscriptions and see if we find a match. if so, use it.
201  bool found = false;
202  bool found_topic = false;
203 
204  SubscriptionPtr sub;
205 
206  {
207  if (isShuttingDown())
208  {
209  return false;
210  }
211 
212  for (L_Subscription::iterator s = subscriptions_.begin();
213  s != subscriptions_.end() && !found; ++s)
214  {
215  sub = *s;
216  if (!sub->isDropped() && sub->getName() == ops.topic)
217  {
218  found_topic = true;
219  if (md5sumsMatch(ops.md5sum, sub->md5sum()))
220  {
221  found = true;
222  }
223  break;
224  }
225  }
226  }
227 
228  if (found_topic && !found)
229  {
230  std::stringstream ss;
231  ss << "Tried to subscribe to a topic with the same name but different md5sum as a topic that was already subscribed [" << ops.datatype << "/" << ops.md5sum << " vs. " << sub->datatype() << "/" << sub->md5sum() << "]";
232  throw ConflictingSubscriptionException(ss.str());
233  }
234  else if (found)
235  {
236  if (!sub->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object, ops.allow_concurrent_callbacks))
237  {
238  return false;
239  }
240  }
241 
242  return found;
243 }
244 
245 // this function has the subscription code that doesn't need to be templated.
247 {
248  boost::mutex::scoped_lock lock(subs_mutex_);
249 
250  if (addSubCallback(ops))
251  {
252  return true;
253  }
254 
255  if (isShuttingDown())
256  {
257  return false;
258  }
259 
260  if (ops.md5sum.empty())
261  {
262  throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] with an empty md5sum");
263  }
264 
265  if (ops.datatype.empty())
266  {
267  throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] with an empty datatype");
268  }
269 
270  if (!ops.helper)
271  {
272  throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] without a callback");
273  }
274 
275  const std::string& md5sum = ops.md5sum;
276  std::string datatype = ops.datatype;
277 
278  SubscriptionPtr s(boost::make_shared<Subscription>(ops.topic, md5sum, datatype, ops.transport_hints));
279  s->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object, ops.allow_concurrent_callbacks);
280 
281  if (!registerSubscriber(s, ops.datatype))
282  {
283  ROS_WARN("couldn't register subscriber on topic [%s]", ops.topic.c_str());
284  s->shutdown();
285  return false;
286  }
287 
288  subscriptions_.push_back(s);
289 
290  return true;
291 }
292 
294 {
295  if (ops.datatype == "*")
296  {
297  std::stringstream ss;
298  ss << "Advertising with * as the datatype is not allowed. Topic [" << ops.topic << "]";
299  throw InvalidParameterException(ss.str());
300  }
301 
302  if (ops.md5sum == "*")
303  {
304  std::stringstream ss;
305  ss << "Advertising with * as the md5sum is not allowed. Topic [" << ops.topic << "]";
306  throw InvalidParameterException(ss.str());
307  }
308 
309  if (ops.md5sum.empty())
310  {
311  throw InvalidParameterException("Advertising on topic [" + ops.topic + "] with an empty md5sum");
312  }
313 
314  if (ops.datatype.empty())
315  {
316  throw InvalidParameterException("Advertising on topic [" + ops.topic + "] with an empty datatype");
317  }
318 
319  if (ops.message_definition.empty())
320  {
321  ROS_WARN("Advertising on topic [%s] with an empty message definition. Some tools (e.g. rosbag) may not work correctly.", ops.topic.c_str());
322  }
323 
324  PublicationPtr pub;
325 
326  {
327  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
328 
329  if (isShuttingDown())
330  {
331  return false;
332  }
333 
335  if (pub && pub->getNumCallbacks() == 0)
336  {
337  pub.reset();
338  }
339 
340  if (pub)
341  {
342  if (pub->getMD5Sum() != ops.md5sum)
343  {
344  ROS_ERROR("Tried to advertise on topic [%s] with md5sum [%s] and datatype [%s], but the topic is already advertised as md5sum [%s] and datatype [%s]",
345  ops.topic.c_str(), ops.md5sum.c_str(), ops.datatype.c_str(), pub->getMD5Sum().c_str(), pub->getDataType().c_str());
346  return false;
347  }
348 
349  pub->addCallbacks(callbacks);
350 
351  return true;
352  }
353 
354  pub = PublicationPtr(boost::make_shared<Publication>(ops.topic, ops.datatype, ops.md5sum, ops.message_definition, ops.queue_size, false, ops.has_header));
355  pub->addCallbacks(callbacks);
356  advertised_topics_.push_back(pub);
357  }
358 
359 
360  {
361  boost::mutex::scoped_lock lock(advertised_topic_names_mutex_);
362  advertised_topic_names_.push_back(ops.topic);
363  }
364 
365  // Check whether we've already subscribed to this topic. If so, we'll do
366  // the self-subscription here, to avoid the deadlock that would happen if
367  // the ROS thread later got the publisherUpdate with its own XMLRPC URI.
368  // The assumption is that advertise() is called from somewhere other
369  // than the ROS thread.
370  bool found = false;
371  SubscriptionPtr sub;
372  {
373  boost::mutex::scoped_lock lock(subs_mutex_);
374 
375  for (L_Subscription::iterator s = subscriptions_.begin();
376  s != subscriptions_.end() && !found; ++s)
377  {
378  if ((*s)->getName() == ops.topic && md5sumsMatch((*s)->md5sum(), ops.md5sum) && !(*s)->isDropped())
379  {
380  found = true;
381  sub = *s;
382  break;
383  }
384  }
385  }
386 
387  if(found)
388  {
389  sub->addLocalConnection(pub);
390  }
391 
392  XmlRpcValue args, result, payload;
393  args[0] = this_node::getName();
394  args[1] = ops.topic;
395  args[2] = ops.datatype;
396  args[3] = xmlrpc_manager_->getServerURI();
397  master::execute("registerPublisher", args, result, payload, true);
398 
399  return true;
400 }
401 
402 bool TopicManager::unadvertise(const std::string &topic, const SubscriberCallbacksPtr& callbacks)
403 {
404  PublicationPtr pub;
405  V_Publication::iterator i;
406  {
407  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
408 
409  if (isShuttingDown())
410  {
411  return false;
412  }
413 
414  for (i = advertised_topics_.begin();
415  i != advertised_topics_.end(); ++i)
416  {
417  if(((*i)->getName() == topic) && (!(*i)->isDropped()))
418  {
419  pub = *i;
420  break;
421  }
422  }
423  }
424 
425  if (!pub)
426  {
427  return false;
428  }
429 
430  pub->removeCallbacks(callbacks);
431 
432  {
433  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
434  if (pub->getNumCallbacks() == 0)
435  {
436  unregisterPublisher(pub->getName());
437  pub->drop();
438 
439  advertised_topics_.erase(i);
440 
441  {
442  boost::mutex::scoped_lock lock(advertised_topic_names_mutex_);
443  advertised_topic_names_.remove(pub->getName());
444  }
445  }
446  }
447 
448  return true;
449 }
450 
451 bool TopicManager::unregisterPublisher(const std::string& topic)
452 {
453  XmlRpcValue args, result, payload;
454  args[0] = this_node::getName();
455  args[1] = topic;
456  args[2] = xmlrpc_manager_->getServerURI();
457  master::execute("unregisterPublisher", args, result, payload, false);
458 
459  return true;
460 }
461 
462 bool TopicManager::isTopicAdvertised(const string &topic)
463 {
464  for (V_Publication::iterator t = advertised_topics_.begin(); t != advertised_topics_.end(); ++t)
465  {
466  if (((*t)->getName() == topic) && (!(*t)->isDropped()))
467  {
468  return true;
469  }
470  }
471 
472  return false;
473 }
474 
475 bool TopicManager::registerSubscriber(const SubscriptionPtr& s, const string &datatype)
476 {
477  XmlRpcValue args, result, payload;
478  args[0] = this_node::getName();
479  args[1] = s->getName();
480  args[2] = datatype;
481  args[3] = xmlrpc_manager_->getServerURI();
482 
483  if (!master::execute("registerSubscriber", args, result, payload, true))
484  {
485  return false;
486  }
487 
488  vector<string> pub_uris;
489  for (int i = 0; i < payload.size(); i++)
490  {
491  if (payload[i] != xmlrpc_manager_->getServerURI())
492  {
493  pub_uris.push_back(string(payload[i]));
494  }
495  }
496 
497  bool self_subscribed = false;
498  PublicationPtr pub;
499  const std::string& sub_md5sum = s->md5sum();
500  // Figure out if we have a local publisher
501  {
502  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
503  V_Publication::const_iterator it = advertised_topics_.begin();
504  V_Publication::const_iterator end = advertised_topics_.end();
505  for (; it != end; ++it)
506  {
507  pub = *it;
508  const std::string& pub_md5sum = pub->getMD5Sum();
509 
510  if (pub->getName() == s->getName() && !pub->isDropped())
511  {
512  if (!md5sumsMatch(pub_md5sum, sub_md5sum))
513  {
514  ROS_ERROR("md5sum mismatch making local subscription to topic %s.",
515  s->getName().c_str());
516  ROS_ERROR("Subscriber expects type %s, md5sum %s",
517  s->datatype().c_str(), s->md5sum().c_str());
518  ROS_ERROR("Publisher provides type %s, md5sum %s",
519  pub->getDataType().c_str(), pub->getMD5Sum().c_str());
520  return false;
521  }
522 
523  self_subscribed = true;
524  break;
525  }
526  }
527  }
528 
529  s->pubUpdate(pub_uris);
530  if (self_subscribed)
531  {
532  s->addLocalConnection(pub);
533  }
534 
535  return true;
536 }
537 
538 bool TopicManager::unregisterSubscriber(const string &topic)
539 {
540  XmlRpcValue args, result, payload;
541  args[0] = this_node::getName();
542  args[1] = topic;
543  args[2] = xmlrpc_manager_->getServerURI();
544 
545  master::execute("unregisterSubscriber", args, result, payload, false);
546 
547  return true;
548 }
549 
550 bool TopicManager::pubUpdate(const string &topic, const vector<string> &pubs)
551 {
552  SubscriptionPtr sub;
553  {
554  boost::mutex::scoped_lock lock(subs_mutex_);
555 
556  if (isShuttingDown())
557  {
558  return false;
559  }
560 
561  ROS_DEBUG("Received update for topic [%s] (%d publishers)", topic.c_str(), (int)pubs.size());
562  // find the subscription
563  for (L_Subscription::const_iterator s = subscriptions_.begin();
564  s != subscriptions_.end(); ++s)
565  {
566  if ((*s)->getName() != topic || (*s)->isDropped())
567  continue;
568 
569  sub = *s;
570  break;
571  }
572 
573  }
574 
575  if (sub)
576  {
577  return sub->pubUpdate(pubs);
578  }
579  else
580  {
581  ROSCPP_LOG_DEBUG("got a request for updating publishers of topic %s, but I " \
582  "don't have any subscribers to that topic.", topic.c_str());
583  }
584 
585  return false;
586 }
587 
588 bool TopicManager::requestTopic(const string &topic,
589  XmlRpcValue &protos,
590  XmlRpcValue &ret)
591 {
592  for (int proto_idx = 0; proto_idx < protos.size(); proto_idx++)
593  {
594  XmlRpcValue proto = protos[proto_idx]; // save typing
595  if (proto.getType() != XmlRpcValue::TypeArray)
596  {
597  ROSCPP_LOG_DEBUG( "requestTopic protocol list was not a list of lists");
598  return false;
599  }
600 
601  if (proto[0].getType() != XmlRpcValue::TypeString)
602  {
603  ROSCPP_LOG_DEBUG( "requestTopic received a protocol list in which a sublist " \
604  "did not start with a string");
605  return false;
606  }
607 
608  string proto_name = proto[0];
609  if (proto_name == string("TCPROS"))
610  {
611  XmlRpcValue tcpros_params;
612  tcpros_params[0] = string("TCPROS");
613  tcpros_params[1] = network::getHost();
614  tcpros_params[2] = int(connection_manager_->getTCPPort());
615  ret[0] = int(1);
616  ret[1] = string();
617  ret[2] = tcpros_params;
618  return true;
619  }
620  else if (proto_name == string("UDPROS"))
621  {
622  if (proto.size() != 5 ||
623  proto[1].getType() != XmlRpcValue::TypeBase64 ||
624  proto[2].getType() != XmlRpcValue::TypeString ||
625  proto[3].getType() != XmlRpcValue::TypeInt ||
626  proto[4].getType() != XmlRpcValue::TypeInt)
627  {
628  ROSCPP_LOG_DEBUG("Invalid protocol parameters for UDPROS");
629  return false;
630  }
631  std::vector<char> header_bytes = proto[1];
632  boost::shared_array<uint8_t> buffer(new uint8_t[header_bytes.size()]);
633  memcpy(buffer.get(), &header_bytes[0], header_bytes.size());
634  Header h;
635  string err;
636  if (!h.parse(buffer, header_bytes.size(), err))
637  {
638  ROSCPP_LOG_DEBUG("Unable to parse UDPROS connection header: %s", err.c_str());
639  return false;
640  }
641 
642  PublicationPtr pub_ptr = lookupPublication(topic);
643  if(!pub_ptr)
644  {
645  ROSCPP_LOG_DEBUG("Unable to find advertised topic %s for UDPROS connection", topic.c_str());
646  return false;
647  }
648 
649  std::string host = proto[2];
650  int port = proto[3];
651 
652  M_string m;
653  std::string error_msg;
654  if (!pub_ptr->validateHeader(h, error_msg))
655  {
656  ROSCPP_LOG_DEBUG("Error validating header from [%s:%d] for topic [%s]: %s", host.c_str(), port, topic.c_str(), error_msg.c_str());
657  return false;
658  }
659 
660  int max_datagram_size = proto[4];
661  int conn_id = connection_manager_->getNewConnectionID();
662  TransportUDPPtr transport = connection_manager_->getUDPServerTransport()->createOutgoing(host, port, conn_id, max_datagram_size);
663  if (!transport)
664  {
665  ROSCPP_LOG_DEBUG("Error creating outgoing transport for [%s:%d]", host.c_str(), port);
666  return false;
667  }
668  connection_manager_->udprosIncomingConnection(transport, h);
669 
670  XmlRpcValue udpros_params;
671  udpros_params[0] = string("UDPROS");
672  udpros_params[1] = network::getHost();
673  udpros_params[2] = connection_manager_->getUDPServerTransport()->getServerPort();
674  udpros_params[3] = conn_id;
675  udpros_params[4] = max_datagram_size;
676  m["topic"] = topic;
677  m["md5sum"] = pub_ptr->getMD5Sum();
678  m["type"] = pub_ptr->getDataType();
679  m["callerid"] = this_node::getName();
680  m["message_definition"] = pub_ptr->getMessageDefinition();
681  boost::shared_array<uint8_t> msg_def_buffer;
682  uint32_t len;
683  Header::write(m, msg_def_buffer, len);
684  XmlRpcValue v(msg_def_buffer.get(), len);
685  udpros_params[5] = v;
686  ret[0] = int(1);
687  ret[1] = string();
688  ret[2] = udpros_params;
689  return true;
690  }
691  else
692  {
693  ROSCPP_LOG_DEBUG( "an unsupported protocol was offered: [%s]",
694  proto_name.c_str());
695  }
696  }
697 
698  ROSCPP_LOG_DEBUG( "Currently, roscpp only supports TCPROS. The caller to " \
699  "requestTopic did not support TCPROS, so there are no " \
700  "protocols in common.");
701  return false;
702 }
703 
704 void TopicManager::publish(const std::string& topic, const boost::function<SerializedMessage(void)>& serfunc, SerializedMessage& m)
705 {
706  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
707 
708  if (isShuttingDown())
709  {
710  return;
711  }
712 
714  if (p->hasSubscribers() || p->isLatching())
715  {
716  ROS_DEBUG_NAMED("superdebug", "Publishing message on topic [%s] with sequence number [%d]", p->getName().c_str(), p->getSequence());
717 
718  // Determine what kinds of subscribers we're publishing to. If they're intraprocess with the same C++ type we can
719  // do a no-copy publish.
720  bool nocopy = false;
721  bool serialize = false;
722 
723  // We can only do a no-copy publish if a shared_ptr to the message is provided, and we have type information for it
724  if (m.type_info && m.message)
725  {
726  p->getPublishTypes(serialize, nocopy, *m.type_info);
727  }
728  else
729  {
730  serialize = true;
731  }
732 
733  if (!nocopy)
734  {
735  m.message.reset();
736  m.type_info = 0;
737  }
738 
739  if (serialize || p->isLatching())
740  {
741  SerializedMessage m2 = serfunc();
742  m.buf = m2.buf;
743  m.num_bytes = m2.num_bytes;
745  }
746 
747  p->publish(m);
748 
749  // If we're not doing a serialized publish we don't need to signal the pollset. The write()
750  // call inside signal() is actually relatively expensive when doing a nocopy publish.
751  if (serialize)
752  {
753  poll_manager_->getPollSet().signal();
754  }
755  }
756  else
757  {
758  p->incrementSequence();
759  }
760 }
761 
762 void TopicManager::incrementSequence(const std::string& topic)
763 {
764  PublicationPtr pub = lookupPublication(topic);
765  if (pub)
766  {
767  pub->incrementSequence();
768  }
769 }
770 
771 bool TopicManager::isLatched(const std::string& topic)
772 {
773  PublicationPtr pub = lookupPublication(topic);
774  if (pub)
775  {
776  return pub->isLatched();
777  }
778 
779  return false;
780 }
781 
783 {
784  PublicationPtr t;
785  for (V_Publication::iterator i = advertised_topics_.begin();
786  !t && i != advertised_topics_.end(); ++i)
787  {
788  if (((*i)->getName() == topic) && (!(*i)->isDropped()))
789  {
790  t = *i;
791  break;
792  }
793  }
794 
795  return t;
796 }
797 
798 bool TopicManager::unsubscribe(const std::string &topic, const SubscriptionCallbackHelperPtr& helper)
799 {
800  SubscriptionPtr sub;
801 
802  {
803  boost::mutex::scoped_lock lock(subs_mutex_);
804 
805  if (isShuttingDown())
806  {
807  return false;
808  }
809 
810  L_Subscription::iterator it;
811  for (it = subscriptions_.begin();
812  it != subscriptions_.end(); ++it)
813  {
814  if ((*it)->getName() == topic)
815  {
816  sub = *it;
817  break;
818  }
819  }
820  }
821 
822  if (!sub)
823  {
824  return false;
825  }
826 
827  sub->removeCallback(helper);
828 
829  if (sub->getNumCallbacks() == 0)
830  {
831  // nobody is left. blow away the subscription.
832  {
833  boost::mutex::scoped_lock lock(subs_mutex_);
834 
835  L_Subscription::iterator it;
836  for (it = subscriptions_.begin();
837  it != subscriptions_.end(); ++it)
838  {
839  if ((*it)->getName() == topic)
840  {
841  subscriptions_.erase(it);
842  break;
843  }
844  }
845 
846  if (!unregisterSubscriber(topic))
847  {
848  ROSCPP_LOG_DEBUG("Couldn't unregister subscriber for topic [%s]", topic.c_str());
849  }
850  }
851 
852  sub->shutdown();
853  return true;
854  }
855 
856  return true;
857 }
858 
859 size_t TopicManager::getNumSubscribers(const std::string &topic)
860 {
861  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
862 
863  if (isShuttingDown())
864  {
865  return 0;
866  }
867 
869  if (p)
870  {
871  return p->getNumSubscribers();
872  }
873 
874  return 0;
875 }
876 
878 {
879  boost::mutex::scoped_lock lock(subs_mutex_);
880  return subscriptions_.size();
881 }
882 
883 size_t TopicManager::getNumPublishers(const std::string &topic)
884 {
885  boost::mutex::scoped_lock lock(subs_mutex_);
886 
887  if (isShuttingDown())
888  {
889  return 0;
890  }
891 
892  for (L_Subscription::const_iterator t = subscriptions_.begin();
893  t != subscriptions_.end(); ++t)
894  {
895  if (!(*t)->isDropped() && (*t)->getName() == topic)
896  {
897  return (*t)->getNumPublishers();
898  }
899  }
900 
901  return 0;
902 }
903 
905 {
906  XmlRpcValue publish_stats, subscribe_stats, service_stats;
907  // force these guys to be arrays, even if we don't populate them
908  publish_stats.setSize(0);
909  subscribe_stats.setSize(0);
910  service_stats.setSize(0);
911 
912  uint32_t pidx = 0;
913  {
914  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
915  for (V_Publication::iterator t = advertised_topics_.begin();
916  t != advertised_topics_.end(); ++t)
917  {
918  publish_stats[pidx++] = (*t)->getStats();
919  }
920  }
921 
922  {
923  uint32_t sidx = 0;
924 
925  boost::mutex::scoped_lock lock(subs_mutex_);
926  for (L_Subscription::iterator t = subscriptions_.begin(); t != subscriptions_.end(); ++t)
927  {
928  subscribe_stats[sidx++] = (*t)->getStats();
929  }
930  }
931 
932  stats[0] = publish_stats;
933  stats[1] = subscribe_stats;
934  stats[2] = service_stats;
935 }
936 
938 {
939  // force these guys to be arrays, even if we don't populate them
940  info.setSize(0);
941 
942  {
943  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
944 
945  for (V_Publication::iterator t = advertised_topics_.begin();
946  t != advertised_topics_.end(); ++t)
947  {
948  (*t)->getInfo(info);
949  }
950  }
951 
952  {
953  boost::mutex::scoped_lock lock(subs_mutex_);
954 
955  for (L_Subscription::iterator t = subscriptions_.begin(); t != subscriptions_.end(); ++t)
956  {
957  (*t)->getInfo(info);
958  }
959  }
960 }
961 
963 {
964  // force these guys to be arrays, even if we don't populate them
965  subs.setSize(0);
966 
967  {
968  boost::mutex::scoped_lock lock(subs_mutex_);
969 
970  uint32_t sidx = 0;
971 
972  for (L_Subscription::iterator t = subscriptions_.begin(); t != subscriptions_.end(); ++t)
973  {
974  XmlRpcValue sub;
975  sub[0] = (*t)->getName();
976  sub[1] = (*t)->datatype();
977  subs[sidx++] = sub;
978  }
979  }
980 }
981 
983 {
984  // force these guys to be arrays, even if we don't populate them
985  pubs.setSize(0);
986 
987  {
988  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
989 
990  uint32_t sidx = 0;
991 
992  for (V_Publication::iterator t = advertised_topics_.begin();
993  t != advertised_topics_.end(); ++t)
994  {
995  XmlRpcValue pub;
996  pub[0] = (*t)->getName();
997  pub[1] = (*t)->getDataType();
998  pubs[sidx++] = pub;
999  }
1000 
1001  }
1002 }
1003 
1005 {
1006  std::vector<std::string> pubs;
1007  for (int idx = 0; idx < params[2].size(); idx++)
1008  {
1009  pubs.push_back(params[2][idx]);
1010  }
1011  if (pubUpdate(params[1], pubs))
1012  {
1013  result = xmlrpc::responseInt(1, "", 0);
1014  }
1015  else
1016  {
1018  }
1019 }
1020 
1022 {
1023  if (!requestTopic(params[1], params[2], result))
1024  {
1026  }
1027 }
1028 
1030 {
1031  (void)params;
1032  result[0] = 1;
1033  result[1] = std::string("");
1035  getBusStats(result);
1036  result[2] = response;
1037 }
1038 
1040 {
1041  (void)params;
1042  result[0] = 1;
1043  result[1] = std::string("");
1046  result[2] = response;
1047 }
1048 
1050 {
1051  (void)params;
1052  result[0] = 1;
1053  result[1] = std::string("subscriptions");
1056  result[2] = response;
1057 }
1058 
1060 {
1061  (void)params;
1062  result[0] = 1;
1063  result[1] = std::string("publications");
1066  result[2] = response;
1067 }
1068 
1069 } // namespace ros
XmlRpc::XmlRpcValue::size
int size() const
ros::TopicManager::getBusInfoCallback
void getBusInfoCallback(XmlRpc::XmlRpcValue &params, XmlRpc::XmlRpcValue &result)
Definition: topic_manager.cpp:1039
connection_manager.h
response
const std::string response
this_node.h
ros::AdvertiseOptions::topic
std::string topic
The topic to publish on.
Definition: advertise_options.h:98
ros::SerializedMessage
ros::TopicManager::isTopicAdvertised
bool isTopicAdvertised(const std::string &topic)
Definition: topic_manager.cpp:462
ros::SerializedMessage::message_start
uint8_t * message_start
ros::TopicManager::getNumSubscriptions
size_t getNumSubscriptions()
Definition: topic_manager.cpp:877
ros::TopicManager::pubUpdateCallback
void pubUpdateCallback(XmlRpc::XmlRpcValue &params, XmlRpc::XmlRpcValue &result)
Definition: topic_manager.cpp:1004
md5sum
const char * md5sum()
ros::TopicManager::getNumSubscribers
size_t getNumSubscribers(const std::string &_topic)
Return the number of subscribers a node has for a particular topic:
Definition: topic_manager.cpp:859
ros::TopicManager::poll_manager_
PollManagerPtr poll_manager_
Definition: topic_manager.h:232
boost::shared_ptr< TopicManager >
ros::TopicManager::shutdown
void shutdown()
Definition: topic_manager.cpp:91
ros::xmlrpc::responseInt
ROSCPP_DECL XmlRpc::XmlRpcValue responseInt(int code, const std::string &msg, int response)
Definition: xmlrpc_manager.cpp:52
s
XmlRpcServer s
ros::TopicManager::processPublishQueues
void processPublishQueues()
Definition: topic_manager.cpp:149
ros::md5sumsMatch
bool md5sumsMatch(const std::string &lhs, const std::string &rhs)
Definition: topic_manager.cpp:193
ros
ros::TopicManager::~TopicManager
~TopicManager()
Definition: topic_manager.cpp:67
ros::Header
ros::TopicManager::requestTopic
bool requestTopic(const std::string &topic, XmlRpc::XmlRpcValue &protos, XmlRpc::XmlRpcValue &ret)
Request a topic.
Definition: topic_manager.cpp:588
ros::SubscribeOptions::topic
std::string topic
Topic to subscribe to.
Definition: subscribe_options.h:115
ros::SubscribeOptions::transport_hints
TransportHints transport_hints
Hints for transport type and options.
Definition: subscribe_options.h:141
ros::TopicManager::getNumPublishers
size_t getNumPublishers(const std::string &_topic)
Return the number of publishers connected to this node on a particular topic.
Definition: topic_manager.cpp:883
ros::SubscribeOptions::callback_queue
CallbackQueueInterface * callback_queue
Queue to add callbacks to. If NULL, the global callback queue will be used.
Definition: subscribe_options.h:123
ros::SerializedMessage::message
boost::shared_ptr< void const > message
ros::SubscribeOptions::allow_concurrent_callbacks
bool allow_concurrent_callbacks
Definition: subscribe_options.h:127
ros::TopicManager::subs_mutex_
boost::mutex subs_mutex_
Definition: topic_manager.h:221
XmlRpc
subscription.h
ros::TopicManager::start
void start()
Definition: topic_manager.cpp:72
ros::AdvertiseOptions::datatype
std::string datatype
The datatype of the message published on this topic (eg. "std_msgs/String")
Definition: advertise_options.h:102
ros::TopicManager::getBusStats
void getBusStats(XmlRpc::XmlRpcValue &stats)
Compute the statistics for the node's connectivity.
Definition: topic_manager.cpp:904
ros::AdvertiseOptions::has_header
bool has_header
Tells whether or not the message has a header. If it does, the sequence number will be written direct...
Definition: advertise_options.h:131
ros::TopicManager::getBusStatsCallback
void getBusStatsCallback(XmlRpc::XmlRpcValue &params, XmlRpc::XmlRpcValue &result)
Definition: topic_manager.cpp:1029
ros::SubscribeOptions::tracked_object
VoidConstPtr tracked_object
An object whose destruction will prevent the callback associated with this subscription.
Definition: subscribe_options.h:139
ros::Header::write
static void write(const M_string &key_vals, boost::shared_array< uint8_t > &buffer, uint32_t &size)
ros::TopicManager::advertised_topic_names_
std::list< std::string > advertised_topic_names_
Definition: topic_manager.h:226
ros::AdvertiseOptions
Encapsulates all options available for creating a Publisher.
Definition: advertise_options.h:41
ros::console::g_last_error_message
ROSCONSOLE_DECL std::string g_last_error_message
ros::TopicManager::addSubCallback
bool addSubCallback(const SubscribeOptions &ops)
Definition: topic_manager.cpp:198
ros::network::getHost
const ROSCPP_DECL std::string & getHost()
Definition: network.cpp:51
ros::InvalidParameterException
Thrown when an invalid parameter is passed to a method.
Definition: exceptions.h:74
topic_manager.h
ros::TopicManager::pubUpdate
bool pubUpdate(const std::string &topic, const std::vector< std::string > &pubs)
Update local publisher lists.
Definition: topic_manager.cpp:550
console.h
ros::SubscribeOptions::md5sum
std::string md5sum
MD5 of the message datatype.
Definition: subscribe_options.h:118
ros::ConflictingSubscriptionException
Thrown when a second (third,...) subscription is attempted with conflicting arguments.
Definition: exceptions.h:63
init.h
ros::TopicManager::connection_manager_
ConnectionManagerPtr connection_manager_
Definition: topic_manager.h:233
ROS_DEBUG
#define ROS_DEBUG(...)
ROS_DEBUG_NAMED
#define ROS_DEBUG_NAMED(name,...)
ros::AdvertiseOptions::queue_size
uint32_t queue_size
The maximum number of outgoing messages to be queued for delivery to subscribers.
Definition: advertise_options.h:99
ros::TopicManager::subscriptions_
L_Subscription subscriptions_
Definition: topic_manager.h:222
ros::TopicManager::requestTopicCallback
void requestTopicCallback(XmlRpc::XmlRpcValue &params, XmlRpc::XmlRpcValue &result)
Definition: topic_manager.cpp:1021
ros::TopicManager::unsubscribe
bool unsubscribe(const std::string &_topic, const SubscriptionCallbackHelperPtr &helper)
Definition: topic_manager.cpp:798
transport_udp.h
ros::TopicManager::advertised_topics_
V_Publication advertised_topics_
Definition: topic_manager.h:225
subscribe_options.h
ros::TopicManager::lookupPublication
PublicationPtr lookupPublication(const std::string &topic)
Lookup an advertised topic.
Definition: topic_manager.cpp:186
ros::PublicationPtr
boost::shared_ptr< Publication > PublicationPtr
Definition: forwards.h:66
transport_tcp.h
ros::Header::parse
bool parse(const boost::shared_array< uint8_t > &buffer, uint32_t size, std::string &error_msg)
boost::shared_array< uint8_t >
XmlRpc.h
ROS_WARN
#define ROS_WARN(...)
ros::TopicManager::shutting_down_mutex_
boost::mutex shutting_down_mutex_
Definition: topic_manager.h:230
ros::XMLRPCManager::instance
static const XMLRPCManagerPtr & instance()
Definition: xmlrpc_manager.cpp:98
ros::PollManager::instance
static const PollManagerPtr & instance()
Definition: poll_manager.cpp:36
xmlrpc_manager.h
ros::SerializedMessage::num_bytes
size_t num_bytes
ros::TopicManager::subscribe
bool subscribe(const SubscribeOptions &ops)
Definition: topic_manager.cpp:246
XmlRpc::XmlRpcValue::getType
const Type & getType() const
ros::TopicManager::getSubscribedTopics
void getSubscribedTopics(V_string &topics)
Get the list of topics subscribed to by this node.
Definition: topic_manager.cpp:172
ros::SerializedMessage::type_info
const std::type_info * type_info
ros::master::execute
ROSCPP_DECL bool execute(const std::string &method, const XmlRpc::XmlRpcValue &request, XmlRpc::XmlRpcValue &response, XmlRpc::XmlRpcValue &payload, bool wait_for_master)
Execute an XMLRPC call on the master.
Definition: master.cpp:175
ros::this_node::getName
const ROSCPP_DECL std::string & getName()
Returns the name of the current node.
Definition: this_node.cpp:74
ros::TopicManager::isLatched
bool isLatched(const std::string &topic)
Definition: topic_manager.cpp:771
ros::TopicManager::incrementSequence
void incrementSequence(const std::string &_topic)
Definition: topic_manager.cpp:762
ros::TopicManager::publish
void publish(const std::string &topic, const M &message)
Definition: topic_manager.h:122
ros::TopicManager::unadvertise
bool unadvertise(const std::string &topic, const SubscriberCallbacksPtr &callbacks)
Definition: topic_manager.cpp:402
ros::TopicManager::advertised_topic_names_mutex_
boost::mutex advertised_topic_names_mutex_
Definition: topic_manager.h:227
ros::TopicManager::unregisterSubscriber
bool unregisterSubscriber(const std::string &topic)
Definition: topic_manager.cpp:538
XmlRpc::XmlRpcServer::shutdown
void shutdown()
ros::SubscribeOptions
Encapsulates all options available for creating a Subscriber.
Definition: subscribe_options.h:43
master.h
ros::TopicManager::registerSubscriber
bool registerSubscriber(const SubscriptionPtr &s, const std::string &datatype)
Definition: topic_manager.cpp:475
ros::TopicManager::unregisterPublisher
bool unregisterPublisher(const std::string &topic)
Definition: topic_manager.cpp:451
poll_manager.h
ros::TopicManager::getPublications
void getPublications(XmlRpc::XmlRpcValue &publications)
Return the list of advertised topics for the node.
Definition: topic_manager.cpp:982
ros::SubscribeOptions::queue_size
uint32_t queue_size
Number of incoming messages to queue up for processing (messages in excess of this queue capacity wil...
Definition: subscribe_options.h:116
std
publication.h
ros::TopicManager::getSubscriptionsCallback
void getSubscriptionsCallback(XmlRpc::XmlRpcValue &params, XmlRpc::XmlRpcValue &result)
Definition: topic_manager.cpp:1049
ros::TopicManager::getBusInfo
void getBusInfo(XmlRpc::XmlRpcValue &info)
Compute the info for the node's connectivity.
Definition: topic_manager.cpp:937
ROS_ERROR
#define ROS_ERROR(...)
network.h
ros::ConnectionManager::instance
static const ConnectionManagerPtr & instance()
Definition: connection_manager.cpp:43
XmlRpc::XmlRpcValue::setSize
void setSize(int size)
datatype
const char * datatype()
ros::TopicManager::getSubscriptions
void getSubscriptions(XmlRpc::XmlRpcValue &subscriptions)
Return the list of subcriptions for the node.
Definition: topic_manager.cpp:962
ros::AdvertiseOptions::md5sum
std::string md5sum
The md5sum of the message datatype published on this topic.
Definition: advertise_options.h:101
rosout_appender.h
ros::V_string
std::vector< std::string > V_string
ros::SubscribeOptions::datatype
std::string datatype
Datatype of the message we'd like to subscribe as.
Definition: subscribe_options.h:119
ros::AdvertiseOptions::message_definition
std::string message_definition
The full definition of the message published on this topic.
Definition: advertise_options.h:103
ros::TopicManager::xmlrpc_manager_
XMLRPCManagerPtr xmlrpc_manager_
Definition: topic_manager.h:234
ros::SubscribeOptions::helper
SubscriptionCallbackHelperPtr helper
Helper object used to get create messages and call callbacks.
Definition: subscribe_options.h:121
ros::TopicManager::lookupPublicationWithoutLock
PublicationPtr lookupPublicationWithoutLock(const std::string &topic)
Definition: topic_manager.cpp:782
ros::TopicManager::shutting_down_
volatile bool shutting_down_
Definition: topic_manager.h:229
serialize
void serialize(Stream &stream, const boost::array< T, N > &t)
ROSCPP_LOG_DEBUG
#define ROSCPP_LOG_DEBUG(...)
Definition: file_log.h:35
ros::TopicManager::advertised_topics_mutex_
boost::recursive_mutex advertised_topics_mutex_
Definition: topic_manager.h:224
ros::TopicManager::getPublicationsCallback
void getPublicationsCallback(XmlRpc::XmlRpcValue &params, XmlRpc::XmlRpcValue &result)
Definition: topic_manager.cpp:1059
ros::SerializedMessage::buf
boost::shared_array< uint8_t > buf
XmlRpc::XmlRpcValue
file_log.h
ros::TopicManager::isShuttingDown
bool isShuttingDown()
Definition: topic_manager.h:219
ros::M_string
std::map< std::string, std::string > M_string
ros::TopicManager::getAdvertisedTopics
void getAdvertisedTopics(V_string &topics)
Get the list of topics advertised by this node.
Definition: topic_manager.cpp:162
ros::TopicManager::advertise
bool advertise(const AdvertiseOptions &ops, const SubscriberCallbacksPtr &callbacks)
Definition: topic_manager.cpp:293


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas , Jacob Perron
autogenerated on Thu Nov 23 2023 04:01:44