46 #include <boost/bind/bind.hpp>
59 static TopicManagerPtr topic_manager = boost::make_shared<TopicManager>();
63 TopicManager::TopicManager()
64 : shutting_down_(false)
125 if(!(*i)->isDropped())
156 for (; it != end; ++it)
159 pub->processPublishQueue();
180 for (; it != end; ++it)
183 topics.push_back(sub->getName());
196 return lhs ==
"*" || rhs ==
"*" || lhs == rhs;
203 bool found_topic =
false;
217 if (!sub->isDropped() && sub->getName() == ops.
topic)
229 if (found_topic && !found)
231 std::stringstream ss;
232 ss <<
"Tried to subscribe to a topic with the same name but different md5sum as a topic that was already subscribed [" << ops.
datatype <<
"/" << ops.
md5sum <<
" vs. " << sub->datatype() <<
"/" << sub->md5sum() <<
"]";
284 ROS_WARN(
"couldn't register subscriber on topic [%s]", ops.
topic.c_str());
298 std::stringstream ss;
299 ss <<
"Advertising with * as the datatype is not allowed. Topic [" << ops.
topic <<
"]";
305 std::stringstream ss;
306 ss <<
"Advertising with * as the md5sum is not allowed. Topic [" << ops.
topic <<
"]";
322 ROS_WARN(
"Advertising on topic [%s] with an empty message definition. Some tools (e.g. rosbag) may not work correctly.", ops.
topic.c_str());
336 if (pub && pub->getNumCallbacks() == 0)
343 if (pub->getMD5Sum() != ops.
md5sum)
345 ROS_ERROR(
"Tried to advertise on topic [%s] with md5sum [%s] and datatype [%s], but the topic is already advertised as md5sum [%s] and datatype [%s]",
346 ops.
topic.c_str(), ops.
md5sum.c_str(), ops.
datatype.c_str(), pub->getMD5Sum().c_str(), pub->getDataType().c_str());
350 pub->addCallbacks(callbacks);
356 pub->addCallbacks(callbacks);
390 sub->addLocalConnection(pub);
406 V_Publication::iterator i;
418 if(((*i)->getName() == topic) && (!(*i)->isDropped()))
431 pub->removeCallbacks(callbacks);
435 if (pub->getNumCallbacks() == 0)
467 if (((*t)->getName() == topic) && (!(*t)->isDropped()))
480 args[1] =
s->getName();
484 if (!
master::execute(
"registerSubscriber", args, result, payload,
true))
489 vector<string> pub_uris;
490 for (
int i = 0; i < payload.
size(); i++)
494 pub_uris.push_back(
string(payload[i]));
498 bool self_subscribed =
false;
500 const std::string& sub_md5sum =
s->md5sum();
506 for (; it != end; ++it)
509 const std::string& pub_md5sum = pub->getMD5Sum();
511 if (pub->getName() ==
s->getName() && !pub->isDropped())
515 ROS_ERROR(
"md5sum mismatch making local subscription to topic %s.",
516 s->getName().c_str());
517 ROS_ERROR(
"Subscriber expects type %s, md5sum %s",
518 s->datatype().c_str(),
s->md5sum().c_str());
519 ROS_ERROR(
"Publisher provides type %s, md5sum %s",
520 pub->getDataType().c_str(), pub->getMD5Sum().c_str());
524 self_subscribed =
true;
530 s->pubUpdate(pub_uris);
533 s->addLocalConnection(pub);
562 ROS_DEBUG(
"Received update for topic [%s] (%d publishers)", topic.c_str(), (
int)pubs.size());
567 if ((*s)->getName() != topic || (*s)->isDropped())
578 return sub->pubUpdate(pubs);
582 ROSCPP_LOG_DEBUG(
"got a request for updating publishers of topic %s, but I " \
583 "don't have any subscribers to that topic.", topic.c_str());
593 for (
int proto_idx = 0; proto_idx < protos.
size(); proto_idx++)
596 if (proto.
getType() != XmlRpcValue::TypeArray)
602 if (proto[0].getType() != XmlRpcValue::TypeString)
604 ROSCPP_LOG_DEBUG(
"requestTopic received a protocol list in which a sublist " \
605 "did not start with a string");
609 string proto_name = proto[0];
610 if (proto_name ==
string(
"TCPROS"))
613 tcpros_params[0] = string(
"TCPROS");
618 ret[2] = tcpros_params;
621 else if (proto_name ==
string(
"UDPROS"))
623 if (proto.
size() != 5 ||
624 proto[1].
getType() != XmlRpcValue::TypeBase64 ||
625 proto[2].
getType() != XmlRpcValue::TypeString ||
626 proto[3].
getType() != XmlRpcValue::TypeInt ||
627 proto[4].
getType() != XmlRpcValue::TypeInt)
632 std::vector<char> header_bytes = proto[1];
634 memcpy(buffer.get(), &header_bytes[0], header_bytes.size());
637 if (!h.
parse(buffer, header_bytes.size(), err))
639 ROSCPP_LOG_DEBUG(
"Unable to parse UDPROS connection header: %s", err.c_str());
646 ROSCPP_LOG_DEBUG(
"Unable to find advertised topic %s for UDPROS connection", topic.c_str());
650 std::string host = proto[2];
654 std::string error_msg;
655 if (!pub_ptr->validateHeader(h, error_msg))
657 ROSCPP_LOG_DEBUG(
"Error validating header from [%s:%d] for topic [%s]: %s", host.c_str(), port, topic.c_str(), error_msg.c_str());
661 int max_datagram_size = proto[4];
666 ROSCPP_LOG_DEBUG(
"Error creating outgoing transport for [%s:%d]", host.c_str(), port);
672 udpros_params[0] = string(
"UDPROS");
675 udpros_params[3] = conn_id;
676 udpros_params[4] = max_datagram_size;
678 m[
"md5sum"] = pub_ptr->getMD5Sum();
679 m[
"type"] = pub_ptr->getDataType();
681 m[
"message_definition"] = pub_ptr->getMessageDefinition();
686 udpros_params[5] = v;
689 ret[2] = udpros_params;
699 ROSCPP_LOG_DEBUG(
"Currently, roscpp only supports TCPROS. The caller to " \
700 "requestTopic did not support TCPROS, so there are no " \
701 "protocols in common.");
715 if (p->hasSubscribers() || p->isLatching())
717 ROS_DEBUG_NAMED(
"superdebug",
"Publishing message on topic [%s] with sequence number [%d]", p->getName().c_str(), p->getSequence());
759 p->incrementSequence();
768 pub->incrementSequence();
777 return pub->isLatched();
789 if (((*i)->getName() == topic) && (!(*i)->isDropped()))
811 L_Subscription::iterator it;
815 if ((*it)->getName() == topic)
828 sub->removeCallback(helper);
830 if (sub->getNumCallbacks() == 0)
836 L_Subscription::iterator it;
840 if ((*it)->getName() == topic)
849 ROSCPP_LOG_DEBUG(
"Couldn't unregister subscriber for topic [%s]", topic.c_str());
872 return p->getNumSubscribers();
896 if (!(*t)->isDropped() && (*t)->getName() == topic)
898 return (*t)->getNumPublishers();
907 XmlRpcValue publish_stats, subscribe_stats, service_stats;
919 publish_stats[pidx++] = (*t)->getStats();
929 subscribe_stats[sidx++] = (*t)->getStats();
933 stats[0] = publish_stats;
934 stats[1] = subscribe_stats;
935 stats[2] = service_stats;
976 sub[0] = (*t)->getName();
977 sub[1] = (*t)->datatype();
997 pub[0] = (*t)->getName();
998 pub[1] = (*t)->getDataType();
1007 std::vector<std::string> pubs;
1008 for (
int idx = 0; idx < params[2].
size(); idx++)
1010 pubs.push_back(params[2][idx]);
1034 result[1] = std::string(
"");
1044 result[1] = std::string(
"");
1054 result[1] = std::string(
"subscriptions");
1064 result[1] = std::string(
"publications");