topic_manager.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2009, Willow Garage, Inc.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are met:
6  * * Redistributions of source code must retain the above copyright notice,
7  * this list of conditions and the following disclaimer.
8  * * Redistributions in binary form must reproduce the above copyright
9  * notice, this list of conditions and the following disclaimer in the
10  * documentation and/or other materials provided with the distribution.
11  * * Neither the names of Willow Garage, Inc. nor the names of its
12  * contributors may be used to endorse or promote products derived from
13  * this software without specific prior written permission.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
19  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25  * POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #include "ros/topic_manager.h"
29 #include "ros/xmlrpc_manager.h"
30 #include "ros/connection_manager.h"
31 #include "ros/poll_manager.h"
32 #include "ros/publication.h"
33 #include "ros/subscription.h"
34 #include "ros/this_node.h"
35 #include "ros/network.h"
36 #include "ros/master.h"
39 #include "ros/rosout_appender.h"
40 #include "ros/init.h"
41 #include "ros/file_log.h"
42 #include "ros/subscribe_options.h"
43 
44 #include "xmlrpcpp/XmlRpc.h"
45 
46 #include <ros/console.h>
47 
48 using namespace XmlRpc; // A battle to be fought later
49 using namespace std; // sigh
50 
52 
53 namespace ros
54 {
55 
56 const TopicManagerPtr& TopicManager::instance()
57 {
58  static TopicManagerPtr topic_manager = boost::make_shared<TopicManager>();
59  return topic_manager;
60 }
61 
62 TopicManager::TopicManager()
63 : shutting_down_(false)
64 {
65 }
66 
68 {
69  shutdown();
70 }
71 
73 {
74  boost::mutex::scoped_lock shutdown_lock(shutting_down_mutex_);
75  shutting_down_ = false;
76 
80 
81  xmlrpc_manager_->bind("publisherUpdate", boost::bind(&TopicManager::pubUpdateCallback, this, _1, _2));
82  xmlrpc_manager_->bind("requestTopic", boost::bind(&TopicManager::requestTopicCallback, this, _1, _2));
83  xmlrpc_manager_->bind("getBusStats", boost::bind(&TopicManager::getBusStatsCallback, this, _1, _2));
84  xmlrpc_manager_->bind("getBusInfo", boost::bind(&TopicManager::getBusInfoCallback, this, _1, _2));
85  xmlrpc_manager_->bind("getSubscriptions", boost::bind(&TopicManager::getSubscriptionsCallback, this, _1, _2));
86  xmlrpc_manager_->bind("getPublications", boost::bind(&TopicManager::getPublicationsCallback, this, _1, _2));
87 
88  poll_manager_->addPollThreadListener(boost::bind(&TopicManager::processPublishQueues, this));
89 }
90 
92 {
93  boost::mutex::scoped_lock shutdown_lock(shutting_down_mutex_);
94  if (shutting_down_)
95  {
96  return;
97  }
98 
99  {
100  boost::recursive_mutex::scoped_lock lock1(advertised_topics_mutex_);
101  boost::mutex::scoped_lock lock2(subs_mutex_);
102  shutting_down_ = true;
103  }
104 
105  // actually one should call poll_manager_->removePollThreadListener(), but the connection is not stored above
106  poll_manager_->shutdown();
107 
108  xmlrpc_manager_->unbind("publisherUpdate");
109  xmlrpc_manager_->unbind("requestTopic");
110  xmlrpc_manager_->unbind("getBusStats");
111  xmlrpc_manager_->unbind("getBusInfo");
112  xmlrpc_manager_->unbind("getSubscriptions");
113  xmlrpc_manager_->unbind("getPublications");
114 
115  ROSCPP_LOG_DEBUG("Shutting down topics...");
116  ROSCPP_LOG_DEBUG(" shutting down publishers");
117  {
118  boost::recursive_mutex::scoped_lock adv_lock(advertised_topics_mutex_);
119 
120  for (V_Publication::iterator i = advertised_topics_.begin();
121  i != advertised_topics_.end(); ++i)
122  {
123  if(!(*i)->isDropped())
124  {
125  unregisterPublisher((*i)->getName());
126  }
127  (*i)->drop();
128  }
129  advertised_topics_.clear();
130  }
131 
132  // unregister all of our subscriptions
133  ROSCPP_LOG_DEBUG(" shutting down subscribers");
134  {
135  boost::mutex::scoped_lock subs_lock(subs_mutex_);
136 
137  for (L_Subscription::iterator s = subscriptions_.begin(); s != subscriptions_.end(); ++s)
138  {
139  // Remove us as a subscriber from the master
140  unregisterSubscriber((*s)->getName());
141  // now, drop our side of the connection
142  (*s)->shutdown();
143  }
144  subscriptions_.clear();
145  }
146 }
147 
149 {
150  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
151 
152  V_Publication::iterator it = advertised_topics_.begin();
153  V_Publication::iterator end = advertised_topics_.end();
154  for (; it != end; ++it)
155  {
156  const PublicationPtr& pub = *it;
157  pub->processPublishQueue();
158  }
159 }
160 
162 {
163  boost::mutex::scoped_lock lock(advertised_topic_names_mutex_);
164 
165  topics.resize(advertised_topic_names_.size());
166  std::copy(advertised_topic_names_.begin(),
168  topics.begin());
169 }
170 
172 {
173  boost::mutex::scoped_lock lock(subs_mutex_);
174 
175  topics.reserve(subscriptions_.size());
176  L_Subscription::const_iterator it = subscriptions_.begin();
177  L_Subscription::const_iterator end = subscriptions_.end();
178  for (; it != end; ++it)
179  {
180  const SubscriptionPtr& sub = *it;
181  topics.push_back(sub->getName());
182  }
183 }
184 
186 {
187  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
188 
189  return lookupPublicationWithoutLock(topic);
190 }
191 
192 bool md5sumsMatch(const std::string& lhs, const std::string& rhs)
193 {
194  return lhs == "*" || rhs == "*" || lhs == rhs;
195 }
196 
198 {
199  // spin through the subscriptions and see if we find a match. if so, use it.
200  bool found = false;
201  bool found_topic = false;
202 
203  SubscriptionPtr sub;
204 
205  {
206  if (isShuttingDown())
207  {
208  return false;
209  }
210 
211  for (L_Subscription::iterator s = subscriptions_.begin();
212  s != subscriptions_.end() && !found; ++s)
213  {
214  sub = *s;
215  if (!sub->isDropped() && sub->getName() == ops.topic)
216  {
217  found_topic = true;
218  if (md5sumsMatch(ops.md5sum, sub->md5sum()))
219  {
220  found = true;
221  }
222  break;
223  }
224  }
225  }
226 
227  if (found_topic && !found)
228  {
229  std::stringstream ss;
230  ss << "Tried to subscribe to a topic with the same name but different md5sum as a topic that was already subscribed [" << ops.datatype << "/" << ops.md5sum << " vs. " << sub->datatype() << "/" << sub->md5sum() << "]";
231  throw ConflictingSubscriptionException(ss.str());
232  }
233  else if (found)
234  {
235  if (!sub->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object, ops.allow_concurrent_callbacks))
236  {
237  return false;
238  }
239  }
240 
241  return found;
242 }
243 
244 // this function has the subscription code that doesn't need to be templated.
246 {
247  boost::mutex::scoped_lock lock(subs_mutex_);
248 
249  if (addSubCallback(ops))
250  {
251  return true;
252  }
253 
254  if (isShuttingDown())
255  {
256  return false;
257  }
258 
259  if (ops.md5sum.empty())
260  {
261  throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] with an empty md5sum");
262  }
263 
264  if (ops.datatype.empty())
265  {
266  throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] with an empty datatype");
267  }
268 
269  if (!ops.helper)
270  {
271  throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] without a callback");
272  }
273 
274  const std::string& md5sum = ops.md5sum;
275  std::string datatype = ops.datatype;
276 
277  SubscriptionPtr s(boost::make_shared<Subscription>(ops.topic, md5sum, datatype, ops.transport_hints));
278  s->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object, ops.allow_concurrent_callbacks);
279 
280  if (!registerSubscriber(s, ops.datatype))
281  {
282  ROS_WARN("couldn't register subscriber on topic [%s]", ops.topic.c_str());
283  s->shutdown();
284  return false;
285  }
286 
287  subscriptions_.push_back(s);
288 
289  return true;
290 }
291 
293 {
294  if (ops.datatype == "*")
295  {
296  std::stringstream ss;
297  ss << "Advertising with * as the datatype is not allowed. Topic [" << ops.topic << "]";
298  throw InvalidParameterException(ss.str());
299  }
300 
301  if (ops.md5sum == "*")
302  {
303  std::stringstream ss;
304  ss << "Advertising with * as the md5sum is not allowed. Topic [" << ops.topic << "]";
305  throw InvalidParameterException(ss.str());
306  }
307 
308  if (ops.md5sum.empty())
309  {
310  throw InvalidParameterException("Advertising on topic [" + ops.topic + "] with an empty md5sum");
311  }
312 
313  if (ops.datatype.empty())
314  {
315  throw InvalidParameterException("Advertising on topic [" + ops.topic + "] with an empty datatype");
316  }
317 
318  if (ops.message_definition.empty())
319  {
320  ROS_WARN("Advertising on topic [%s] with an empty message definition. Some tools (e.g. rosbag) may not work correctly.", ops.topic.c_str());
321  }
322 
323  PublicationPtr pub;
324 
325  {
326  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
327 
328  if (isShuttingDown())
329  {
330  return false;
331  }
332 
334  if (pub && pub->getNumCallbacks() == 0)
335  {
336  pub.reset();
337  }
338 
339  if (pub)
340  {
341  if (pub->getMD5Sum() != ops.md5sum)
342  {
343  ROS_ERROR("Tried to advertise on topic [%s] with md5sum [%s] and datatype [%s], but the topic is already advertised as md5sum [%s] and datatype [%s]",
344  ops.topic.c_str(), ops.md5sum.c_str(), ops.datatype.c_str(), pub->getMD5Sum().c_str(), pub->getDataType().c_str());
345  return false;
346  }
347 
348  pub->addCallbacks(callbacks);
349 
350  return true;
351  }
352 
353  pub = PublicationPtr(boost::make_shared<Publication>(ops.topic, ops.datatype, ops.md5sum, ops.message_definition, ops.queue_size, ops.latch, ops.has_header));
354  pub->addCallbacks(callbacks);
355  advertised_topics_.push_back(pub);
356  }
357 
358 
359  {
360  boost::mutex::scoped_lock lock(advertised_topic_names_mutex_);
361  advertised_topic_names_.push_back(ops.topic);
362  }
363 
364  // Check whether we've already subscribed to this topic. If so, we'll do
365  // the self-subscription here, to avoid the deadlock that would happen if
366  // the ROS thread later got the publisherUpdate with its own XMLRPC URI.
367  // The assumption is that advertise() is called from somewhere other
368  // than the ROS thread.
369  bool found = false;
370  SubscriptionPtr sub;
371  {
372  boost::mutex::scoped_lock lock(subs_mutex_);
373 
374  for (L_Subscription::iterator s = subscriptions_.begin();
375  s != subscriptions_.end() && !found; ++s)
376  {
377  if ((*s)->getName() == ops.topic && md5sumsMatch((*s)->md5sum(), ops.md5sum) && !(*s)->isDropped())
378  {
379  found = true;
380  sub = *s;
381  break;
382  }
383  }
384  }
385 
386  if(found)
387  {
388  sub->addLocalConnection(pub);
389  }
390 
391  XmlRpcValue args, result, payload;
392  args[0] = this_node::getName();
393  args[1] = ops.topic;
394  args[2] = ops.datatype;
395  args[3] = xmlrpc_manager_->getServerURI();
396  master::execute("registerPublisher", args, result, payload, true);
397 
398  return true;
399 }
400 
401 bool TopicManager::unadvertise(const std::string &topic, const SubscriberCallbacksPtr& callbacks)
402 {
403  PublicationPtr pub;
404  V_Publication::iterator i;
405  {
406  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
407 
408  if (isShuttingDown())
409  {
410  return false;
411  }
412 
413  for (i = advertised_topics_.begin();
414  i != advertised_topics_.end(); ++i)
415  {
416  if(((*i)->getName() == topic) && (!(*i)->isDropped()))
417  {
418  pub = *i;
419  break;
420  }
421  }
422  }
423 
424  if (!pub)
425  {
426  return false;
427  }
428 
429  pub->removeCallbacks(callbacks);
430 
431  {
432  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
433  if (pub->getNumCallbacks() == 0)
434  {
435  unregisterPublisher(pub->getName());
436  pub->drop();
437 
438  advertised_topics_.erase(i);
439 
440  {
441  boost::mutex::scoped_lock lock(advertised_topic_names_mutex_);
442  advertised_topic_names_.remove(pub->getName());
443  }
444  }
445  }
446 
447  return true;
448 }
449 
450 bool TopicManager::unregisterPublisher(const std::string& topic)
451 {
452  XmlRpcValue args, result, payload;
453  args[0] = this_node::getName();
454  args[1] = topic;
455  args[2] = xmlrpc_manager_->getServerURI();
456  master::execute("unregisterPublisher", args, result, payload, false);
457 
458  return true;
459 }
460 
461 bool TopicManager::isTopicAdvertised(const string &topic)
462 {
463  for (V_Publication::iterator t = advertised_topics_.begin(); t != advertised_topics_.end(); ++t)
464  {
465  if (((*t)->getName() == topic) && (!(*t)->isDropped()))
466  {
467  return true;
468  }
469  }
470 
471  return false;
472 }
473 
474 bool TopicManager::registerSubscriber(const SubscriptionPtr& s, const string &datatype)
475 {
476  XmlRpcValue args, result, payload;
477  args[0] = this_node::getName();
478  args[1] = s->getName();
479  args[2] = datatype;
480  args[3] = xmlrpc_manager_->getServerURI();
481 
482  if (!master::execute("registerSubscriber", args, result, payload, true))
483  {
484  return false;
485  }
486 
487  vector<string> pub_uris;
488  for (int i = 0; i < payload.size(); i++)
489  {
490  if (payload[i] != xmlrpc_manager_->getServerURI())
491  {
492  pub_uris.push_back(string(payload[i]));
493  }
494  }
495 
496  bool self_subscribed = false;
497  PublicationPtr pub;
498  const std::string& sub_md5sum = s->md5sum();
499  // Figure out if we have a local publisher
500  {
501  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
502  V_Publication::const_iterator it = advertised_topics_.begin();
503  V_Publication::const_iterator end = advertised_topics_.end();
504  for (; it != end; ++it)
505  {
506  pub = *it;
507  const std::string& pub_md5sum = pub->getMD5Sum();
508 
509  if (pub->getName() == s->getName() && !pub->isDropped())
510  {
511  if (!md5sumsMatch(pub_md5sum, sub_md5sum))
512  {
513  ROS_ERROR("md5sum mismatch making local subscription to topic %s.",
514  s->getName().c_str());
515  ROS_ERROR("Subscriber expects type %s, md5sum %s",
516  s->datatype().c_str(), s->md5sum().c_str());
517  ROS_ERROR("Publisher provides type %s, md5sum %s",
518  pub->getDataType().c_str(), pub->getMD5Sum().c_str());
519  return false;
520  }
521 
522  self_subscribed = true;
523  break;
524  }
525  }
526  }
527 
528  s->pubUpdate(pub_uris);
529  if (self_subscribed)
530  {
531  s->addLocalConnection(pub);
532  }
533 
534  return true;
535 }
536 
537 bool TopicManager::unregisterSubscriber(const string &topic)
538 {
539  XmlRpcValue args, result, payload;
540  args[0] = this_node::getName();
541  args[1] = topic;
542  args[2] = xmlrpc_manager_->getServerURI();
543 
544  master::execute("unregisterSubscriber", args, result, payload, false);
545 
546  return true;
547 }
548 
549 bool TopicManager::pubUpdate(const string &topic, const vector<string> &pubs)
550 {
551  SubscriptionPtr sub;
552  {
553  boost::mutex::scoped_lock lock(subs_mutex_);
554 
555  if (isShuttingDown())
556  {
557  return false;
558  }
559 
560  ROS_DEBUG("Received update for topic [%s] (%d publishers)", topic.c_str(), (int)pubs.size());
561  // find the subscription
562  for (L_Subscription::const_iterator s = subscriptions_.begin();
563  s != subscriptions_.end(); ++s)
564  {
565  if ((*s)->getName() != topic || (*s)->isDropped())
566  continue;
567 
568  sub = *s;
569  break;
570  }
571 
572  }
573 
574  if (sub)
575  {
576  return sub->pubUpdate(pubs);
577  }
578  else
579  {
580  ROSCPP_LOG_DEBUG("got a request for updating publishers of topic %s, but I " \
581  "don't have any subscribers to that topic.", topic.c_str());
582  }
583 
584  return false;
585 }
586 
587 bool TopicManager::requestTopic(const string &topic,
588  XmlRpcValue &protos,
589  XmlRpcValue &ret)
590 {
591  for (int proto_idx = 0; proto_idx < protos.size(); proto_idx++)
592  {
593  XmlRpcValue proto = protos[proto_idx]; // save typing
594  if (proto.getType() != XmlRpcValue::TypeArray)
595  {
596  ROSCPP_LOG_DEBUG( "requestTopic protocol list was not a list of lists");
597  return false;
598  }
599 
600  if (proto[0].getType() != XmlRpcValue::TypeString)
601  {
602  ROSCPP_LOG_DEBUG( "requestTopic received a protocol list in which a sublist " \
603  "did not start with a string");
604  return false;
605  }
606 
607  string proto_name = proto[0];
608  if (proto_name == string("TCPROS"))
609  {
610  XmlRpcValue tcpros_params;
611  tcpros_params[0] = string("TCPROS");
612  tcpros_params[1] = network::getHost();
613  tcpros_params[2] = int(connection_manager_->getTCPPort());
614  ret[0] = int(1);
615  ret[1] = string();
616  ret[2] = tcpros_params;
617  return true;
618  }
619  else if (proto_name == string("UDPROS"))
620  {
621  if (proto.size() != 5 ||
622  proto[1].getType() != XmlRpcValue::TypeBase64 ||
623  proto[2].getType() != XmlRpcValue::TypeString ||
624  proto[3].getType() != XmlRpcValue::TypeInt ||
625  proto[4].getType() != XmlRpcValue::TypeInt)
626  {
627  ROSCPP_LOG_DEBUG("Invalid protocol parameters for UDPROS");
628  return false;
629  }
630  std::vector<char> header_bytes = proto[1];
631  boost::shared_array<uint8_t> buffer(new uint8_t[header_bytes.size()]);
632  memcpy(buffer.get(), &header_bytes[0], header_bytes.size());
633  Header h;
634  string err;
635  if (!h.parse(buffer, header_bytes.size(), err))
636  {
637  ROSCPP_LOG_DEBUG("Unable to parse UDPROS connection header: %s", err.c_str());
638  return false;
639  }
640 
641  PublicationPtr pub_ptr = lookupPublication(topic);
642  if(!pub_ptr)
643  {
644  ROSCPP_LOG_DEBUG("Unable to find advertised topic %s for UDPROS connection", topic.c_str());
645  return false;
646  }
647 
648  std::string host = proto[2];
649  int port = proto[3];
650 
651  M_string m;
652  std::string error_msg;
653  if (!pub_ptr->validateHeader(h, error_msg))
654  {
655  ROSCPP_LOG_DEBUG("Error validating header from [%s:%d] for topic [%s]: %s", host.c_str(), port, topic.c_str(), error_msg.c_str());
656  return false;
657  }
658 
659  int max_datagram_size = proto[4];
660  int conn_id = connection_manager_->getNewConnectionID();
661  TransportUDPPtr transport = connection_manager_->getUDPServerTransport()->createOutgoing(host, port, conn_id, max_datagram_size);
662  if (!transport)
663  {
664  ROSCPP_LOG_DEBUG("Error creating outgoing transport for [%s:%d]", host.c_str(), port);
665  return false;
666  }
667  connection_manager_->udprosIncomingConnection(transport, h);
668 
669  XmlRpcValue udpros_params;
670  udpros_params[0] = string("UDPROS");
671  udpros_params[1] = network::getHost();
672  udpros_params[2] = connection_manager_->getUDPServerTransport()->getServerPort();
673  udpros_params[3] = conn_id;
674  udpros_params[4] = max_datagram_size;
675  m["topic"] = topic;
676  m["md5sum"] = pub_ptr->getMD5Sum();
677  m["type"] = pub_ptr->getDataType();
678  m["callerid"] = this_node::getName();
679  m["message_definition"] = pub_ptr->getMessageDefinition();
680  boost::shared_array<uint8_t> msg_def_buffer;
681  uint32_t len;
682  Header::write(m, msg_def_buffer, len);
683  XmlRpcValue v(msg_def_buffer.get(), len);
684  udpros_params[5] = v;
685  ret[0] = int(1);
686  ret[1] = string();
687  ret[2] = udpros_params;
688  return true;
689  }
690  else
691  {
692  ROSCPP_LOG_DEBUG( "an unsupported protocol was offered: [%s]",
693  proto_name.c_str());
694  }
695  }
696 
697  ROSCPP_LOG_DEBUG( "Currently, roscpp only supports TCPROS. The caller to " \
698  "requestTopic did not support TCPROS, so there are no " \
699  "protocols in common.");
700  return false;
701 }
702 
703 void TopicManager::publish(const std::string& topic, const boost::function<SerializedMessage(void)>& serfunc, SerializedMessage& m)
704 {
705  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
706 
707  if (isShuttingDown())
708  {
709  return;
710  }
711 
713  if (p->hasSubscribers() || p->isLatching())
714  {
715  ROS_DEBUG_NAMED("superdebug", "Publishing message on topic [%s] with sequence number [%d]", p->getName().c_str(), p->getSequence());
716 
717  // Determine what kinds of subscribers we're publishing to. If they're intraprocess with the same C++ type we can
718  // do a no-copy publish.
719  bool nocopy = false;
720  bool serialize = false;
721 
722  // We can only do a no-copy publish if a shared_ptr to the message is provided, and we have type information for it
723  if (m.type_info && m.message)
724  {
725  p->getPublishTypes(serialize, nocopy, *m.type_info);
726  }
727  else
728  {
729  serialize = true;
730  }
731 
732  if (!nocopy)
733  {
734  m.message.reset();
735  m.type_info = 0;
736  }
737 
738  if (serialize || p->isLatching())
739  {
740  SerializedMessage m2 = serfunc();
741  m.buf = m2.buf;
742  m.num_bytes = m2.num_bytes;
744  }
745 
746  p->publish(m);
747 
748  // If we're not doing a serialized publish we don't need to signal the pollset. The write()
749  // call inside signal() is actually relatively expensive when doing a nocopy publish.
750  if (serialize)
751  {
752  poll_manager_->getPollSet().signal();
753  }
754  }
755  else
756  {
757  p->incrementSequence();
758  }
759 }
760 
761 void TopicManager::incrementSequence(const std::string& topic)
762 {
763  PublicationPtr pub = lookupPublication(topic);
764  if (pub)
765  {
766  pub->incrementSequence();
767  }
768 }
769 
770 bool TopicManager::isLatched(const std::string& topic)
771 {
772  PublicationPtr pub = lookupPublication(topic);
773  if (pub)
774  {
775  return pub->isLatched();
776  }
777 
778  return false;
779 }
780 
782 {
783  PublicationPtr t;
784  for (V_Publication::iterator i = advertised_topics_.begin();
785  !t && i != advertised_topics_.end(); ++i)
786  {
787  if (((*i)->getName() == topic) && (!(*i)->isDropped()))
788  {
789  t = *i;
790  break;
791  }
792  }
793 
794  return t;
795 }
796 
797 bool TopicManager::unsubscribe(const std::string &topic, const SubscriptionCallbackHelperPtr& helper)
798 {
799  SubscriptionPtr sub;
800 
801  {
802  boost::mutex::scoped_lock lock(subs_mutex_);
803 
804  if (isShuttingDown())
805  {
806  return false;
807  }
808 
809  L_Subscription::iterator it;
810  for (it = subscriptions_.begin();
811  it != subscriptions_.end(); ++it)
812  {
813  if ((*it)->getName() == topic)
814  {
815  sub = *it;
816  break;
817  }
818  }
819  }
820 
821  if (!sub)
822  {
823  return false;
824  }
825 
826  sub->removeCallback(helper);
827 
828  if (sub->getNumCallbacks() == 0)
829  {
830  // nobody is left. blow away the subscription.
831  {
832  boost::mutex::scoped_lock lock(subs_mutex_);
833 
834  L_Subscription::iterator it;
835  for (it = subscriptions_.begin();
836  it != subscriptions_.end(); ++it)
837  {
838  if ((*it)->getName() == topic)
839  {
840  subscriptions_.erase(it);
841  break;
842  }
843  }
844 
845  if (!unregisterSubscriber(topic))
846  {
847  ROSCPP_LOG_DEBUG("Couldn't unregister subscriber for topic [%s]", topic.c_str());
848  }
849  }
850 
851  sub->shutdown();
852  return true;
853  }
854 
855  return true;
856 }
857 
858 size_t TopicManager::getNumSubscribers(const std::string &topic)
859 {
860  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
861 
862  if (isShuttingDown())
863  {
864  return 0;
865  }
866 
868  if (p)
869  {
870  return p->getNumSubscribers();
871  }
872 
873  return 0;
874 }
875 
877 {
878  boost::mutex::scoped_lock lock(subs_mutex_);
879  return subscriptions_.size();
880 }
881 
882 size_t TopicManager::getNumPublishers(const std::string &topic)
883 {
884  boost::mutex::scoped_lock lock(subs_mutex_);
885 
886  if (isShuttingDown())
887  {
888  return 0;
889  }
890 
891  for (L_Subscription::const_iterator t = subscriptions_.begin();
892  t != subscriptions_.end(); ++t)
893  {
894  if (!(*t)->isDropped() && (*t)->getName() == topic)
895  {
896  return (*t)->getNumPublishers();
897  }
898  }
899 
900  return 0;
901 }
902 
904 {
905  XmlRpcValue publish_stats, subscribe_stats, service_stats;
906  // force these guys to be arrays, even if we don't populate them
907  publish_stats.setSize(0);
908  subscribe_stats.setSize(0);
909  service_stats.setSize(0);
910 
911  uint32_t pidx = 0;
912  {
913  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
914  for (V_Publication::iterator t = advertised_topics_.begin();
915  t != advertised_topics_.end(); ++t)
916  {
917  publish_stats[pidx++] = (*t)->getStats();
918  }
919  }
920 
921  {
922  uint32_t sidx = 0;
923 
924  boost::mutex::scoped_lock lock(subs_mutex_);
925  for (L_Subscription::iterator t = subscriptions_.begin(); t != subscriptions_.end(); ++t)
926  {
927  subscribe_stats[sidx++] = (*t)->getStats();
928  }
929  }
930 
931  stats[0] = publish_stats;
932  stats[1] = subscribe_stats;
933  stats[2] = service_stats;
934 }
935 
937 {
938  // force these guys to be arrays, even if we don't populate them
939  info.setSize(0);
940 
941  {
942  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
943 
944  for (V_Publication::iterator t = advertised_topics_.begin();
945  t != advertised_topics_.end(); ++t)
946  {
947  (*t)->getInfo(info);
948  }
949  }
950 
951  {
952  boost::mutex::scoped_lock lock(subs_mutex_);
953 
954  for (L_Subscription::iterator t = subscriptions_.begin(); t != subscriptions_.end(); ++t)
955  {
956  (*t)->getInfo(info);
957  }
958  }
959 }
960 
962 {
963  // force these guys to be arrays, even if we don't populate them
964  subs.setSize(0);
965 
966  {
967  boost::mutex::scoped_lock lock(subs_mutex_);
968 
969  uint32_t sidx = 0;
970 
971  for (L_Subscription::iterator t = subscriptions_.begin(); t != subscriptions_.end(); ++t)
972  {
973  XmlRpcValue sub;
974  sub[0] = (*t)->getName();
975  sub[1] = (*t)->datatype();
976  subs[sidx++] = sub;
977  }
978  }
979 }
980 
982 {
983  // force these guys to be arrays, even if we don't populate them
984  pubs.setSize(0);
985 
986  {
987  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
988 
989  uint32_t sidx = 0;
990 
991  for (V_Publication::iterator t = advertised_topics_.begin();
992  t != advertised_topics_.end(); ++t)
993  {
994  XmlRpcValue pub;
995  pub[0] = (*t)->getName();
996  pub[1] = (*t)->getDataType();
997  pubs[sidx++] = pub;
998  }
999 
1000  }
1001 }
1002 
1003 extern std::string console::g_last_error_message;
1004 
1006 {
1007  std::vector<std::string> pubs;
1008  for (int idx = 0; idx < params[2].size(); idx++)
1009  {
1010  pubs.push_back(params[2][idx]);
1011  }
1012  if (pubUpdate(params[1], pubs))
1013  {
1014  result = xmlrpc::responseInt(1, "", 0);
1015  }
1016  else
1017  {
1018  result = xmlrpc::responseInt(0, console::g_last_error_message, 0);
1019  }
1020 }
1021 
1023 {
1024  if (!requestTopic(params[1], params[2], result))
1025  {
1026  result = xmlrpc::responseInt(0, console::g_last_error_message, 0);
1027  }
1028 }
1029 
1031 {
1032  (void)params;
1033  result[0] = 1;
1034  result[1] = std::string("");
1035  XmlRpcValue response;
1036  getBusStats(result);
1037  result[2] = response;
1038 }
1039 
1041 {
1042  (void)params;
1043  result[0] = 1;
1044  result[1] = std::string("");
1045  XmlRpcValue response;
1046  getBusInfo(response);
1047  result[2] = response;
1048 }
1049 
1051 {
1052  (void)params;
1053  result[0] = 1;
1054  result[1] = std::string("subscriptions");
1055  XmlRpcValue response;
1056  getSubscriptions(response);
1057  result[2] = response;
1058 }
1059 
1061 {
1062  (void)params;
1063  result[0] = 1;
1064  result[1] = std::string("publications");
1065  XmlRpcValue response;
1066  getPublications(response);
1067  result[2] = response;
1068 }
1069 
1070 } // namespace ros
bool unregisterPublisher(const std::string &topic)
XMLRPCManagerPtr xmlrpc_manager_
volatile bool shutting_down_
void pubUpdateCallback(XmlRpc::XmlRpcValue &params, XmlRpc::XmlRpcValue &result)
std::list< std::string > advertised_topic_names_
void incrementSequence(const std::string &_topic)
std::string message_definition
The full definition of the message published on this topic.
bool isTopicAdvertised(const std::string &topic)
bool has_header
Tells whether or not the message has a header. If it does, the sequence number will be written direct...
boost::mutex subs_mutex_
L_Subscription subscriptions_
void getSubscriptions(XmlRpc::XmlRpcValue &subscriptions)
Return the list of subcriptions for the node.
XmlRpc::XmlRpcValue responseInt(int code, const std::string &msg, int response)
int size() const
size_t getNumPublishers(const std::string &_topic)
Return the number of publishers connected to this node on a particular topic.
XmlRpcServer s
bool latch
Whether or not this publication should "latch". A latching publication will automatically send out th...
ROSCPP_DECL const std::string & getName()
Returns the name of the current node.
Definition: this_node.cpp:81
bool subscribe(const SubscribeOptions &ops)
PollManagerPtr poll_manager_
size_t getNumSubscribers(const std::string &_topic)
Return the number of subscribers a node has for a particular topic:
Encapsulates all options available for creating a Publisher.
void getBusInfo(XmlRpc::XmlRpcValue &info)
Compute the info for the node&#39;s connectivity.
Type const & getType() const
#define ROS_WARN(...)
bool requestTopic(const std::string &topic, XmlRpc::XmlRpcValue &protos, XmlRpc::XmlRpcValue &ret)
Request a topic.
Thrown when an invalid parameter is passed to a method.
Definition: exceptions.h:74
PublicationPtr lookupPublicationWithoutLock(const std::string &topic)
std::map< std::string, std::string > M_string
PublicationPtr lookupPublication(const std::string &topic)
Lookup an advertised topic.
#define ROS_DEBUG_NAMED(name,...)
boost::mutex shutting_down_mutex_
#define ROSCPP_LOG_DEBUG(...)
Definition: file_log.h:35
std::vector< std::string > V_string
std::string md5sum
MD5 of the message datatype.
SubscriptionCallbackHelperPtr helper
Helper object used to get create messages and call callbacks.
void getPublications(XmlRpc::XmlRpcValue &publications)
Return the list of advertised topics for the node.
const char * datatype()
bool advertise(const AdvertiseOptions &ops, const SubscriberCallbacksPtr &callbacks)
ROSCPP_DECL const std::string & getHost()
Definition: network.cpp:50
boost::mutex advertised_topic_names_mutex_
bool isLatched(const std::string &topic)
void serialize(Stream &stream, const T &t)
void setSize(int size)
static const ConnectionManagerPtr & instance()
Encapsulates all options available for creating a Subscriber.
V_Publication advertised_topics_
size_t getNumSubscriptions()
bool addSubCallback(const SubscribeOptions &ops)
const std::type_info * type_info
uint32_t queue_size
Number of incoming messages to queue up for processing (messages in excess of this queue capacity wil...
static void write(const M_string &key_vals, boost::shared_array< uint8_t > &buffer, uint32_t &size)
void requestTopicCallback(XmlRpc::XmlRpcValue &params, XmlRpc::XmlRpcValue &result)
void getBusStatsCallback(XmlRpc::XmlRpcValue &params, XmlRpc::XmlRpcValue &result)
void getSubscriptionsCallback(XmlRpc::XmlRpcValue &params, XmlRpc::XmlRpcValue &result)
Thrown when a second (third,...) subscription is attempted with conflicting arguments.
Definition: exceptions.h:63
ROSCPP_DECL bool execute(const std::string &method, const XmlRpc::XmlRpcValue &request, XmlRpc::XmlRpcValue &response, XmlRpc::XmlRpcValue &payload, bool wait_for_master)
Execute an XMLRPC call on the master.
Definition: master.cpp:179
std::string datatype
The datatype of the message published on this topic (eg. "std_msgs/String")
bool pubUpdate(const std::string &topic, const std::vector< std::string > &pubs)
Update local publisher lists.
void getPublicationsCallback(XmlRpc::XmlRpcValue &params, XmlRpc::XmlRpcValue &result)
void getAdvertisedTopics(V_string &topics)
Get the list of topics advertised by this node.
TransportHints transport_hints
Hints for transport type and options.
void getBusStats(XmlRpc::XmlRpcValue &stats)
Compute the statistics for the node&#39;s connectivity.
bool unregisterSubscriber(const std::string &topic)
std::string datatype
Datatype of the message we&#39;d like to subscribe as.
bool unsubscribe(const std::string &_topic, const SubscriptionCallbackHelperPtr &helper)
boost::shared_ptr< Publication > PublicationPtr
Definition: forwards.h:66
std::string md5sum
The md5sum of the message datatype published on this topic.
bool unadvertise(const std::string &topic, const SubscriberCallbacksPtr &callbacks)
static const PollManagerPtr & instance()
boost::shared_array< uint8_t > buf
VoidConstPtr tracked_object
An object whose destruction will prevent the callback associated with this subscription.
void getSubscribedTopics(V_string &topics)
Get the list of topics subscribed to by this node.
std::string topic
The topic to publish on.
void getBusInfoCallback(XmlRpc::XmlRpcValue &params, XmlRpc::XmlRpcValue &result)
static const XMLRPCManagerPtr & instance()
const char * md5sum()
ROSCONSOLE_DECL std::string g_last_error_message
std::string topic
Topic to subscribe to.
boost::shared_ptr< void const > message
#define ROS_ERROR(...)
bool md5sumsMatch(const std::string &lhs, const std::string &rhs)
CallbackQueueInterface * callback_queue
Queue to add callbacks to. If NULL, the global callback queue will be used.
uint32_t queue_size
The maximum number of outgoing messages to be queued for delivery to subscribers. ...
void publish(const std::string &topic, const M &message)
bool registerSubscriber(const SubscriptionPtr &s, const std::string &datatype)
#define ROS_DEBUG(...)
boost::recursive_mutex advertised_topics_mutex_
ConnectionManagerPtr connection_manager_


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Wed Dec 20 2017 03:58:41