58 static TopicManagerPtr topic_manager = boost::make_shared<TopicManager>();
62 TopicManager::TopicManager()
63 : shutting_down_(false)
124 if(!(*i)->isDropped())
155 for (; it != end; ++it)
158 pub->processPublishQueue();
179 for (; it != end; ++it)
182 topics.push_back(sub->getName());
195 return lhs ==
"*" || rhs ==
"*" || lhs == rhs;
202 bool found_topic =
false;
216 if (!sub->isDropped() && sub->getName() == ops.
topic)
228 if (found_topic && !found)
230 std::stringstream ss;
231 ss <<
"Tried to subscribe to a topic with the same name but different md5sum as a topic that was already subscribed [" << ops.
datatype <<
"/" << ops.
md5sum <<
" vs. " << sub->datatype() <<
"/" << sub->md5sum() <<
"]";
283 ROS_WARN(
"couldn't register subscriber on topic [%s]", ops.
topic.c_str());
297 std::stringstream ss;
298 ss <<
"Advertising with * as the datatype is not allowed. Topic [" << ops.
topic <<
"]";
304 std::stringstream ss;
305 ss <<
"Advertising with * as the md5sum is not allowed. Topic [" << ops.
topic <<
"]";
321 ROS_WARN(
"Advertising on topic [%s] with an empty message definition. Some tools (e.g. rosbag) may not work correctly.", ops.
topic.c_str());
335 if (pub && pub->getNumCallbacks() == 0)
342 if (pub->getMD5Sum() != ops.
md5sum)
344 ROS_ERROR(
"Tried to advertise on topic [%s] with md5sum [%s] and datatype [%s], but the topic is already advertised as md5sum [%s] and datatype [%s]",
345 ops.
topic.c_str(), ops.
md5sum.c_str(), ops.
datatype.c_str(), pub->getMD5Sum().c_str(), pub->getDataType().c_str());
349 pub->addCallbacks(callbacks);
355 pub->addCallbacks(callbacks);
389 sub->addLocalConnection(pub);
405 V_Publication::iterator i;
417 if(((*i)->getName() == topic) && (!(*i)->isDropped()))
430 pub->removeCallbacks(callbacks);
434 if (pub->getNumCallbacks() == 0)
466 if (((*t)->getName() == topic) && (!(*t)->isDropped()))
479 args[1] = s->getName();
483 if (!
master::execute(
"registerSubscriber", args, result, payload,
true))
488 vector<string> pub_uris;
489 for (
int i = 0; i < payload.
size(); i++)
493 pub_uris.push_back(
string(payload[i]));
497 bool self_subscribed =
false;
499 const std::string& sub_md5sum = s->md5sum();
505 for (; it != end; ++it)
508 const std::string& pub_md5sum = pub->getMD5Sum();
510 if (pub->getName() == s->getName() && !pub->isDropped())
514 ROS_ERROR(
"md5sum mismatch making local subscription to topic %s.",
515 s->getName().c_str());
516 ROS_ERROR(
"Subscriber expects type %s, md5sum %s",
517 s->datatype().c_str(), s->md5sum().c_str());
518 ROS_ERROR(
"Publisher provides type %s, md5sum %s",
519 pub->getDataType().c_str(), pub->getMD5Sum().c_str());
523 self_subscribed =
true;
529 s->pubUpdate(pub_uris);
532 s->addLocalConnection(pub);
561 ROS_DEBUG(
"Received update for topic [%s] (%d publishers)", topic.c_str(), (int)pubs.size());
566 if ((*s)->getName() != topic || (*s)->isDropped())
577 return sub->pubUpdate(pubs);
581 ROSCPP_LOG_DEBUG(
"got a request for updating publishers of topic %s, but I " \
582 "don't have any subscribers to that topic.", topic.c_str());
592 for (
int proto_idx = 0; proto_idx < protos.
size(); proto_idx++)
595 if (proto.
getType() != XmlRpcValue::TypeArray)
601 if (proto[0].getType() != XmlRpcValue::TypeString)
603 ROSCPP_LOG_DEBUG(
"requestTopic received a protocol list in which a sublist " \
604 "did not start with a string");
608 string proto_name = proto[0];
609 if (proto_name ==
string(
"TCPROS"))
612 tcpros_params[0] = string(
"TCPROS");
617 ret[2] = tcpros_params;
620 else if (proto_name ==
string(
"UDPROS"))
622 if (proto.
size() != 5 ||
623 proto[1].
getType() != XmlRpcValue::TypeBase64 ||
624 proto[2].
getType() != XmlRpcValue::TypeString ||
625 proto[3].
getType() != XmlRpcValue::TypeInt ||
626 proto[4].
getType() != XmlRpcValue::TypeInt)
631 std::vector<char> header_bytes = proto[1];
633 memcpy(buffer.get(), &header_bytes[0], header_bytes.size());
636 if (!h.parse(buffer, header_bytes.size(), err))
638 ROSCPP_LOG_DEBUG(
"Unable to parse UDPROS connection header: %s", err.c_str());
645 ROSCPP_LOG_DEBUG(
"Unable to find advertised topic %s for UDPROS connection", topic.c_str());
649 std::string host = proto[2];
653 std::string error_msg;
654 if (!pub_ptr->validateHeader(h, error_msg))
656 ROSCPP_LOG_DEBUG(
"Error validating header from [%s:%d] for topic [%s]: %s", host.c_str(), port, topic.c_str(), error_msg.c_str());
660 int max_datagram_size = proto[4];
665 ROSCPP_LOG_DEBUG(
"Error creating outgoing transport for [%s:%d]", host.c_str(), port);
671 udpros_params[0] = string(
"UDPROS");
674 udpros_params[3] = conn_id;
675 udpros_params[4] = max_datagram_size;
677 m[
"md5sum"] = pub_ptr->getMD5Sum();
678 m[
"type"] = pub_ptr->getDataType();
680 m[
"message_definition"] = pub_ptr->getMessageDefinition();
685 udpros_params[5] = v;
688 ret[2] = udpros_params;
698 ROSCPP_LOG_DEBUG(
"Currently, roscpp only supports TCPROS. The caller to " \
699 "requestTopic did not support TCPROS, so there are no " \
700 "protocols in common.");
714 if (p->hasSubscribers() || p->isLatching())
716 ROS_DEBUG_NAMED(
"superdebug",
"Publishing message on topic [%s] with sequence number [%d]", p->getName().c_str(), p->getSequence());
726 p->getPublishTypes(serialize, nocopy, *m.
type_info);
739 if (serialize || p->isLatching())
758 p->incrementSequence();
767 pub->incrementSequence();
776 return pub->isLatched();
788 if (((*i)->getName() == topic) && (!(*i)->isDropped()))
810 L_Subscription::iterator it;
814 if ((*it)->getName() == topic)
827 sub->removeCallback(helper);
829 if (sub->getNumCallbacks() == 0)
835 L_Subscription::iterator it;
839 if ((*it)->getName() == topic)
848 ROSCPP_LOG_DEBUG(
"Couldn't unregister subscriber for topic [%s]", topic.c_str());
871 return p->getNumSubscribers();
895 if (!(*t)->isDropped() && (*t)->getName() == topic)
897 return (*t)->getNumPublishers();
906 XmlRpcValue publish_stats, subscribe_stats, service_stats;
918 publish_stats[pidx++] = (*t)->getStats();
928 subscribe_stats[sidx++] = (*t)->getStats();
932 stats[0] = publish_stats;
933 stats[1] = subscribe_stats;
934 stats[2] = service_stats;
975 sub[0] = (*t)->getName();
976 sub[1] = (*t)->datatype();
996 pub[0] = (*t)->getName();
997 pub[1] = (*t)->getDataType();
1008 std::vector<std::string> pubs;
1009 for (
int idx = 0; idx < params[2].
size(); idx++)
1011 pubs.push_back(params[2][idx]);
1035 result[1] = std::string(
"");
1038 result[2] = response;
1045 result[1] = std::string(
"");
1048 result[2] = response;
1055 result[1] = std::string(
"subscriptions");
1058 result[2] = response;
1065 result[1] = std::string(
"publications");
1068 result[2] = response;
bool unregisterPublisher(const std::string &topic)
XMLRPCManagerPtr xmlrpc_manager_
volatile bool shutting_down_
void pubUpdateCallback(XmlRpc::XmlRpcValue ¶ms, XmlRpc::XmlRpcValue &result)
std::list< std::string > advertised_topic_names_
void incrementSequence(const std::string &_topic)
std::string message_definition
The full definition of the message published on this topic.
bool isTopicAdvertised(const std::string &topic)
bool has_header
Tells whether or not the message has a header. If it does, the sequence number will be written direct...
L_Subscription subscriptions_
void getSubscriptions(XmlRpc::XmlRpcValue &subscriptions)
Return the list of subcriptions for the node.
XmlRpc::XmlRpcValue responseInt(int code, const std::string &msg, int response)
size_t getNumPublishers(const std::string &_topic)
Return the number of publishers connected to this node on a particular topic.
bool latch
Whether or not this publication should "latch". A latching publication will automatically send out th...
ROSCPP_DECL const std::string & getName()
Returns the name of the current node.
bool subscribe(const SubscribeOptions &ops)
PollManagerPtr poll_manager_
size_t getNumSubscribers(const std::string &_topic)
Return the number of subscribers a node has for a particular topic:
Encapsulates all options available for creating a Publisher.
void getBusInfo(XmlRpc::XmlRpcValue &info)
Compute the info for the node's connectivity.
Type const & getType() const
bool requestTopic(const std::string &topic, XmlRpc::XmlRpcValue &protos, XmlRpc::XmlRpcValue &ret)
Request a topic.
Thrown when an invalid parameter is passed to a method.
PublicationPtr lookupPublicationWithoutLock(const std::string &topic)
std::map< std::string, std::string > M_string
PublicationPtr lookupPublication(const std::string &topic)
Lookup an advertised topic.
#define ROS_DEBUG_NAMED(name,...)
void processPublishQueues()
boost::mutex shutting_down_mutex_
#define ROSCPP_LOG_DEBUG(...)
std::vector< std::string > V_string
std::string md5sum
MD5 of the message datatype.
SubscriptionCallbackHelperPtr helper
Helper object used to get create messages and call callbacks.
void getPublications(XmlRpc::XmlRpcValue &publications)
Return the list of advertised topics for the node.
bool advertise(const AdvertiseOptions &ops, const SubscriberCallbacksPtr &callbacks)
ROSCPP_DECL const std::string & getHost()
boost::mutex advertised_topic_names_mutex_
bool isLatched(const std::string &topic)
void serialize(Stream &stream, const T &t)
static const ConnectionManagerPtr & instance()
Encapsulates all options available for creating a Subscriber.
V_Publication advertised_topics_
size_t getNumSubscriptions()
bool addSubCallback(const SubscribeOptions &ops)
const std::type_info * type_info
uint32_t queue_size
Number of incoming messages to queue up for processing (messages in excess of this queue capacity wil...
void requestTopicCallback(XmlRpc::XmlRpcValue ¶ms, XmlRpc::XmlRpcValue &result)
void getBusStatsCallback(XmlRpc::XmlRpcValue ¶ms, XmlRpc::XmlRpcValue &result)
void getSubscriptionsCallback(XmlRpc::XmlRpcValue ¶ms, XmlRpc::XmlRpcValue &result)
Thrown when a second (third,...) subscription is attempted with conflicting arguments.
ROSCPP_DECL bool execute(const std::string &method, const XmlRpc::XmlRpcValue &request, XmlRpc::XmlRpcValue &response, XmlRpc::XmlRpcValue &payload, bool wait_for_master)
Execute an XMLRPC call on the master.
std::string datatype
The datatype of the message published on this topic (eg. "std_msgs/String")
bool pubUpdate(const std::string &topic, const std::vector< std::string > &pubs)
Update local publisher lists.
void getPublicationsCallback(XmlRpc::XmlRpcValue ¶ms, XmlRpc::XmlRpcValue &result)
void getAdvertisedTopics(V_string &topics)
Get the list of topics advertised by this node.
TransportHints transport_hints
Hints for transport type and options.
void getBusStats(XmlRpc::XmlRpcValue &stats)
Compute the statistics for the node's connectivity.
bool unregisterSubscriber(const std::string &topic)
std::string datatype
Datatype of the message we'd like to subscribe as.
bool unsubscribe(const std::string &_topic, const SubscriptionCallbackHelperPtr &helper)
boost::shared_ptr< Publication > PublicationPtr
std::string md5sum
The md5sum of the message datatype published on this topic.
bool unadvertise(const std::string &topic, const SubscriberCallbacksPtr &callbacks)
static const PollManagerPtr & instance()
boost::shared_array< uint8_t > buf
VoidConstPtr tracked_object
An object whose destruction will prevent the callback associated with this subscription.
void getSubscribedTopics(V_string &topics)
Get the list of topics subscribed to by this node.
std::string topic
The topic to publish on.
void getBusInfoCallback(XmlRpc::XmlRpcValue ¶ms, XmlRpc::XmlRpcValue &result)
static const XMLRPCManagerPtr & instance()
ROSCONSOLE_DECL std::string g_last_error_message
std::string topic
Topic to subscribe to.
boost::shared_ptr< void const > message
bool allow_concurrent_callbacks
bool md5sumsMatch(const std::string &lhs, const std::string &rhs)
CallbackQueueInterface * callback_queue
Queue to add callbacks to. If NULL, the global callback queue will be used.
uint32_t queue_size
The maximum number of outgoing messages to be queued for delivery to subscribers. ...
void publish(const std::string &topic, const M &message)
bool registerSubscriber(const SubscriptionPtr &s, const std::string &datatype)
boost::recursive_mutex advertised_topics_mutex_
ConnectionManagerPtr connection_manager_