58 static TopicManagerPtr topic_manager = boost::make_shared<TopicManager>();
62 TopicManager::TopicManager()
63 : shutting_down_(false)
124 if(!(*i)->isDropped())
155 for (; it != end; ++it)
158 pub->processPublishQueue();
179 for (; it != end; ++it)
182 topics.push_back(sub->getName());
195 return lhs ==
"*" || rhs ==
"*" || lhs == rhs;
202 bool found_topic =
false;
216 if (!sub->isDropped() && sub->getName() == ops.
topic)
228 if (found_topic && !found)
230 std::stringstream ss;
231 ss <<
"Tried to subscribe to a topic with the same name but different md5sum as a topic that was already subscribed [" << ops.
datatype <<
"/" << ops.
md5sum <<
" vs. " << sub->datatype() <<
"/" << sub->md5sum() <<
"]";
283 ROS_WARN(
"couldn't register subscriber on topic [%s]", ops.
topic.c_str());
297 std::stringstream ss;
298 ss <<
"Advertising with * as the datatype is not allowed. Topic [" << ops.
topic <<
"]";
304 std::stringstream ss;
305 ss <<
"Advertising with * as the md5sum is not allowed. Topic [" << ops.
topic <<
"]";
321 ROS_WARN(
"Advertising on topic [%s] with an empty message definition. Some tools (e.g. rosbag) may not work correctly.", ops.
topic.c_str());
335 if (pub && pub->getNumCallbacks() == 0)
342 if (pub->getMD5Sum() != ops.
md5sum)
344 ROS_ERROR(
"Tried to advertise on topic [%s] with md5sum [%s] and datatype [%s], but the topic is already advertised as md5sum [%s] and datatype [%s]",
345 ops.
topic.c_str(), ops.
md5sum.c_str(), ops.
datatype.c_str(), pub->getMD5Sum().c_str(), pub->getDataType().c_str());
349 pub->addCallbacks(callbacks);
355 pub->addCallbacks(callbacks);
389 sub->addLocalConnection(pub);
405 V_Publication::iterator i;
417 if(((*i)->getName() == topic) && (!(*i)->isDropped()))
430 pub->removeCallbacks(callbacks);
434 if (pub->getNumCallbacks() == 0)
466 if (((*t)->getName() == topic) && (!(*t)->isDropped()))
479 args[1] =
s->getName();
483 if (!
master::execute(
"registerSubscriber", args, result, payload,
true))
488 vector<string> pub_uris;
489 for (
int i = 0; i < payload.
size(); i++)
493 pub_uris.push_back(
string(payload[i]));
497 bool self_subscribed =
false;
499 const std::string& sub_md5sum =
s->md5sum();
505 for (; it != end; ++it)
508 const std::string& pub_md5sum = pub->getMD5Sum();
510 if (pub->getName() ==
s->getName() && !pub->isDropped())
514 ROS_ERROR(
"md5sum mismatch making local subscription to topic %s.",
515 s->getName().c_str());
516 ROS_ERROR(
"Subscriber expects type %s, md5sum %s",
517 s->datatype().c_str(),
s->md5sum().c_str());
518 ROS_ERROR(
"Publisher provides type %s, md5sum %s",
519 pub->getDataType().c_str(), pub->getMD5Sum().c_str());
523 self_subscribed =
true;
529 s->pubUpdate(pub_uris);
532 s->addLocalConnection(pub);
561 ROS_DEBUG(
"Received update for topic [%s] (%d publishers)", topic.c_str(), (
int)pubs.size());
566 if ((*s)->getName() != topic || (*s)->isDropped())
577 return sub->pubUpdate(pubs);
581 ROSCPP_LOG_DEBUG(
"got a request for updating publishers of topic %s, but I " \
582 "don't have any subscribers to that topic.", topic.c_str());
592 for (
int proto_idx = 0; proto_idx < protos.
size(); proto_idx++)
595 if (proto.
getType() != XmlRpcValue::TypeArray)
601 if (proto[0].getType() != XmlRpcValue::TypeString)
603 ROSCPP_LOG_DEBUG(
"requestTopic received a protocol list in which a sublist " \
604 "did not start with a string");
608 string proto_name = proto[0];
609 if (proto_name ==
string(
"TCPROS"))
612 tcpros_params[0] = string(
"TCPROS");
617 ret[2] = tcpros_params;
620 else if (proto_name ==
string(
"UDPROS"))
622 if (proto.
size() != 5 ||
623 proto[1].
getType() != XmlRpcValue::TypeBase64 ||
624 proto[2].
getType() != XmlRpcValue::TypeString ||
625 proto[3].
getType() != XmlRpcValue::TypeInt ||
626 proto[4].
getType() != XmlRpcValue::TypeInt)
631 std::vector<char> header_bytes = proto[1];
633 memcpy(buffer.get(), &header_bytes[0], header_bytes.size());
636 if (!h.
parse(buffer, header_bytes.size(), err))
638 ROSCPP_LOG_DEBUG(
"Unable to parse UDPROS connection header: %s", err.c_str());
645 ROSCPP_LOG_DEBUG(
"Unable to find advertised topic %s for UDPROS connection", topic.c_str());
649 std::string host = proto[2];
653 std::string error_msg;
654 if (!pub_ptr->validateHeader(h, error_msg))
656 ROSCPP_LOG_DEBUG(
"Error validating header from [%s:%d] for topic [%s]: %s", host.c_str(), port, topic.c_str(), error_msg.c_str());
660 int max_datagram_size = proto[4];
665 ROSCPP_LOG_DEBUG(
"Error creating outgoing transport for [%s:%d]", host.c_str(), port);
671 udpros_params[0] = string(
"UDPROS");
674 udpros_params[3] = conn_id;
675 udpros_params[4] = max_datagram_size;
677 m[
"md5sum"] = pub_ptr->getMD5Sum();
678 m[
"type"] = pub_ptr->getDataType();
680 m[
"message_definition"] = pub_ptr->getMessageDefinition();
685 udpros_params[5] = v;
688 ret[2] = udpros_params;
698 ROSCPP_LOG_DEBUG(
"Currently, roscpp only supports TCPROS. The caller to " \
699 "requestTopic did not support TCPROS, so there are no " \
700 "protocols in common.");
714 if (p->hasSubscribers() || p->isLatching())
716 ROS_DEBUG_NAMED(
"superdebug",
"Publishing message on topic [%s] with sequence number [%d]", p->getName().c_str(), p->getSequence());
758 p->incrementSequence();
767 pub->incrementSequence();
776 return pub->isLatched();
788 if (((*i)->getName() == topic) && (!(*i)->isDropped()))
810 L_Subscription::iterator it;
814 if ((*it)->getName() == topic)
827 sub->removeCallback(helper);
829 if (sub->getNumCallbacks() == 0)
835 L_Subscription::iterator it;
839 if ((*it)->getName() == topic)
848 ROSCPP_LOG_DEBUG(
"Couldn't unregister subscriber for topic [%s]", topic.c_str());
871 return p->getNumSubscribers();
895 if (!(*t)->isDropped() && (*t)->getName() == topic)
897 return (*t)->getNumPublishers();
906 XmlRpcValue publish_stats, subscribe_stats, service_stats;
918 publish_stats[pidx++] = (*t)->getStats();
928 subscribe_stats[sidx++] = (*t)->getStats();
932 stats[0] = publish_stats;
933 stats[1] = subscribe_stats;
934 stats[2] = service_stats;
975 sub[0] = (*t)->getName();
976 sub[1] = (*t)->datatype();
996 pub[0] = (*t)->getName();
997 pub[1] = (*t)->getDataType();
1006 std::vector<std::string> pubs;
1007 for (
int idx = 0; idx < params[2].
size(); idx++)
1009 pubs.push_back(params[2][idx]);
1033 result[1] = std::string(
"");
1043 result[1] = std::string(
"");
1053 result[1] = std::string(
"subscriptions");
1063 result[1] = std::string(
"publications");