58 static TopicManagerPtr topic_manager = boost::make_shared<TopicManager>();
62 TopicManager::TopicManager()
63 : shutting_down_(false)
123 if(!(*i)->isDropped())
154 for (; it != end; ++it)
157 pub->processPublishQueue();
178 for (; it != end; ++it)
181 topics.push_back(sub->getName());
194 return lhs ==
"*" || rhs ==
"*" || lhs == rhs;
201 bool found_topic =
false;
215 if (!sub->isDropped() && sub->getName() == ops.
topic)
227 if (found_topic && !found)
229 std::stringstream ss;
230 ss <<
"Tried to subscribe to a topic with the same name but different md5sum as a topic that was already subscribed [" << ops.
datatype <<
"/" << ops.
md5sum <<
" vs. " << sub->datatype() <<
"/" << sub->md5sum() <<
"]";
282 ROS_WARN(
"couldn't register subscriber on topic [%s]", ops.
topic.c_str());
296 std::stringstream ss;
297 ss <<
"Advertising with * as the datatype is not allowed. Topic [" << ops.
topic <<
"]";
303 std::stringstream ss;
304 ss <<
"Advertising with * as the md5sum is not allowed. Topic [" << ops.
topic <<
"]";
320 ROS_WARN(
"Advertising on topic [%s] with an empty message definition. Some tools (e.g. rosbag) may not work correctly.", ops.
topic.c_str());
334 if (pub && pub->getNumCallbacks() == 0)
341 if (pub->getMD5Sum() != ops.
md5sum)
343 ROS_ERROR(
"Tried to advertise on topic [%s] with md5sum [%s] and datatype [%s], but the topic is already advertised as md5sum [%s] and datatype [%s]",
344 ops.
topic.c_str(), ops.
md5sum.c_str(), ops.
datatype.c_str(), pub->getMD5Sum().c_str(), pub->getDataType().c_str());
348 pub->addCallbacks(callbacks);
354 pub->addCallbacks(callbacks);
388 sub->addLocalConnection(pub);
404 V_Publication::iterator i;
416 if(((*i)->getName() == topic) && (!(*i)->isDropped()))
429 pub->removeCallbacks(callbacks);
433 if (pub->getNumCallbacks() == 0)
465 if (((*t)->getName() == topic) && (!(*t)->isDropped()))
478 args[1] = s->getName();
482 if (!
master::execute(
"registerSubscriber", args, result, payload,
true))
487 vector<string> pub_uris;
488 for (
int i = 0; i < payload.
size(); i++)
492 pub_uris.push_back(
string(payload[i]));
496 bool self_subscribed =
false;
498 const std::string& sub_md5sum = s->md5sum();
504 for (; it != end; ++it)
507 const std::string& pub_md5sum = pub->getMD5Sum();
509 if (pub->getName() == s->getName() && !pub->isDropped())
513 ROS_ERROR(
"md5sum mismatch making local subscription to topic %s.",
514 s->getName().c_str());
515 ROS_ERROR(
"Subscriber expects type %s, md5sum %s",
516 s->datatype().c_str(), s->md5sum().c_str());
517 ROS_ERROR(
"Publisher provides type %s, md5sum %s",
518 pub->getDataType().c_str(), pub->getMD5Sum().c_str());
522 self_subscribed =
true;
528 s->pubUpdate(pub_uris);
531 s->addLocalConnection(pub);
560 ROS_DEBUG(
"Received update for topic [%s] (%d publishers)", topic.c_str(), (int)pubs.size());
565 if ((*s)->getName() != topic || (*s)->isDropped())
576 return sub->pubUpdate(pubs);
580 ROSCPP_LOG_DEBUG(
"got a request for updating publishers of topic %s, but I " \
581 "don't have any subscribers to that topic.", topic.c_str());
591 for (
int proto_idx = 0; proto_idx < protos.
size(); proto_idx++)
594 if (proto.
getType() != XmlRpcValue::TypeArray)
600 if (proto[0].getType() != XmlRpcValue::TypeString)
602 ROSCPP_LOG_DEBUG(
"requestTopic received a protocol list in which a sublist " \
603 "did not start with a string");
607 string proto_name = proto[0];
608 if (proto_name ==
string(
"TCPROS"))
611 tcpros_params[0] = string(
"TCPROS");
616 ret[2] = tcpros_params;
619 else if (proto_name ==
string(
"UDPROS"))
621 if (proto.
size() != 5 ||
622 proto[1].
getType() != XmlRpcValue::TypeBase64 ||
623 proto[2].
getType() != XmlRpcValue::TypeString ||
624 proto[3].
getType() != XmlRpcValue::TypeInt ||
625 proto[4].
getType() != XmlRpcValue::TypeInt)
630 std::vector<char> header_bytes = proto[1];
632 memcpy(buffer.get(), &header_bytes[0], header_bytes.size());
635 if (!h.parse(buffer, header_bytes.size(), err))
637 ROSCPP_LOG_DEBUG(
"Unable to parse UDPROS connection header: %s", err.c_str());
644 ROSCPP_LOG_DEBUG(
"Unable to find advertised topic %s for UDPROS connection", topic.c_str());
648 std::string host = proto[2];
652 std::string error_msg;
653 if (!pub_ptr->validateHeader(h, error_msg))
655 ROSCPP_LOG_DEBUG(
"Error validating header from [%s:%d] for topic [%s]: %s", host.c_str(), port, topic.c_str(), error_msg.c_str());
659 int max_datagram_size = proto[4];
664 ROSCPP_LOG_DEBUG(
"Error creating outgoing transport for [%s:%d]", host.c_str(), port);
670 udpros_params[0] = string(
"UDPROS");
673 udpros_params[3] = conn_id;
674 udpros_params[4] = max_datagram_size;
676 m[
"md5sum"] = pub_ptr->getMD5Sum();
677 m[
"type"] = pub_ptr->getDataType();
679 m[
"message_definition"] = pub_ptr->getMessageDefinition();
684 udpros_params[5] = v;
687 ret[2] = udpros_params;
697 ROSCPP_LOG_DEBUG(
"Currently, roscpp only supports TCPROS. The caller to " \
698 "requestTopic did not support TCPROS, so there are no " \
699 "protocols in common.");
713 if (p->hasSubscribers() || p->isLatching())
715 ROS_DEBUG_NAMED(
"superdebug",
"Publishing message on topic [%s] with sequence number [%d]", p->getName().c_str(), p->getSequence());
725 p->getPublishTypes(serialize, nocopy, *m.
type_info);
738 if (serialize || p->isLatching())
757 p->incrementSequence();
766 pub->incrementSequence();
775 return pub->isLatched();
787 if (((*i)->getName() == topic) && (!(*i)->isDropped()))
809 L_Subscription::iterator it;
813 if ((*it)->getName() == topic)
826 sub->removeCallback(helper);
828 if (sub->getNumCallbacks() == 0)
834 L_Subscription::iterator it;
838 if ((*it)->getName() == topic)
847 ROSCPP_LOG_DEBUG(
"Couldn't unregister subscriber for topic [%s]", topic.c_str());
870 return p->getNumSubscribers();
894 if (!(*t)->isDropped() && (*t)->getName() == topic)
896 return (*t)->getNumPublishers();
905 XmlRpcValue publish_stats, subscribe_stats, service_stats;
917 publish_stats[pidx++] = (*t)->getStats();
927 subscribe_stats[sidx++] = (*t)->getStats();
931 stats[0] = publish_stats;
932 stats[1] = subscribe_stats;
933 stats[2] = service_stats;
974 sub[0] = (*t)->getName();
975 sub[1] = (*t)->datatype();
995 pub[0] = (*t)->getName();
996 pub[1] = (*t)->getDataType();
1007 std::vector<std::string> pubs;
1008 for (
int idx = 0; idx < params[2].
size(); idx++)
1010 pubs.push_back(params[2][idx]);
1034 result[1] = std::string(
"");
1037 result[2] = response;
1044 result[1] = std::string(
"");
1047 result[2] = response;
1054 result[1] = std::string(
"subscriptions");
1057 result[2] = response;
1064 result[1] = std::string(
"publications");
1067 result[2] = response;
bool unregisterPublisher(const std::string &topic)
XMLRPCManagerPtr xmlrpc_manager_
volatile bool shutting_down_
void pubUpdateCallback(XmlRpc::XmlRpcValue ¶ms, XmlRpc::XmlRpcValue &result)
std::list< std::string > advertised_topic_names_
void incrementSequence(const std::string &_topic)
std::string message_definition
The full definition of the message published on this topic.
bool isTopicAdvertised(const std::string &topic)
bool has_header
Tells whether or not the message has a header. If it does, the sequence number will be written direct...
L_Subscription subscriptions_
void getSubscriptions(XmlRpc::XmlRpcValue &subscriptions)
Return the list of subcriptions for the node.
XmlRpc::XmlRpcValue responseInt(int code, const std::string &msg, int response)
size_t getNumPublishers(const std::string &_topic)
Return the number of publishers connected to this node on a particular topic.
bool latch
Whether or not this publication should "latch". A latching publication will automatically send out th...
ROSCPP_DECL const std::string & getName()
Returns the name of the current node.
bool subscribe(const SubscribeOptions &ops)
PollManagerPtr poll_manager_
size_t getNumSubscribers(const std::string &_topic)
Return the number of subscribers a node has for a particular topic:
Encapsulates all options available for creating a Publisher.
void getBusInfo(XmlRpc::XmlRpcValue &info)
Compute the info for the node's connectivity.
Type const & getType() const
bool requestTopic(const std::string &topic, XmlRpc::XmlRpcValue &protos, XmlRpc::XmlRpcValue &ret)
Request a topic.
Thrown when an invalid parameter is passed to a method.
PublicationPtr lookupPublicationWithoutLock(const std::string &topic)
std::map< std::string, std::string > M_string
PublicationPtr lookupPublication(const std::string &topic)
Lookup an advertised topic.
#define ROS_DEBUG_NAMED(name,...)
void processPublishQueues()
boost::mutex shutting_down_mutex_
#define ROSCPP_LOG_DEBUG(...)
std::vector< std::string > V_string
std::string md5sum
MD5 of the message datatype.
SubscriptionCallbackHelperPtr helper
Helper object used to get create messages and call callbacks.
void getPublications(XmlRpc::XmlRpcValue &publications)
Return the list of advertised topics for the node.
bool advertise(const AdvertiseOptions &ops, const SubscriberCallbacksPtr &callbacks)
ROSCPP_DECL const std::string & getHost()
boost::mutex advertised_topic_names_mutex_
bool isLatched(const std::string &topic)
void serialize(Stream &stream, const T &t)
static const ConnectionManagerPtr & instance()
Encapsulates all options available for creating a Subscriber.
V_Publication advertised_topics_
size_t getNumSubscriptions()
bool addSubCallback(const SubscribeOptions &ops)
const std::type_info * type_info
uint32_t queue_size
Number of incoming messages to queue up for processing (messages in excess of this queue capacity wil...
void requestTopicCallback(XmlRpc::XmlRpcValue ¶ms, XmlRpc::XmlRpcValue &result)
void getBusStatsCallback(XmlRpc::XmlRpcValue ¶ms, XmlRpc::XmlRpcValue &result)
void getSubscriptionsCallback(XmlRpc::XmlRpcValue ¶ms, XmlRpc::XmlRpcValue &result)
Thrown when a second (third,...) subscription is attempted with conflicting arguments.
ROSCPP_DECL bool execute(const std::string &method, const XmlRpc::XmlRpcValue &request, XmlRpc::XmlRpcValue &response, XmlRpc::XmlRpcValue &payload, bool wait_for_master)
Execute an XMLRPC call on the master.
std::string datatype
The datatype of the message published on this topic (eg. "std_msgs/String")
bool pubUpdate(const std::string &topic, const std::vector< std::string > &pubs)
Update local publisher lists.
void getPublicationsCallback(XmlRpc::XmlRpcValue ¶ms, XmlRpc::XmlRpcValue &result)
void getAdvertisedTopics(V_string &topics)
Get the list of topics advertised by this node.
TransportHints transport_hints
Hints for transport type and options.
void getBusStats(XmlRpc::XmlRpcValue &stats)
Compute the statistics for the node's connectivity.
bool unregisterSubscriber(const std::string &topic)
std::string datatype
Datatype of the message we'd like to subscribe as.
bool unsubscribe(const std::string &_topic, const SubscriptionCallbackHelperPtr &helper)
boost::shared_ptr< Publication > PublicationPtr
std::string md5sum
The md5sum of the message datatype published on this topic.
bool unadvertise(const std::string &topic, const SubscriberCallbacksPtr &callbacks)
static const PollManagerPtr & instance()
boost::shared_array< uint8_t > buf
VoidConstPtr tracked_object
An object whose destruction will prevent the callback associated with this subscription.
void getSubscribedTopics(V_string &topics)
Get the list of topics subscribed to by this node.
std::string topic
The topic to publish on.
void getBusInfoCallback(XmlRpc::XmlRpcValue ¶ms, XmlRpc::XmlRpcValue &result)
static const XMLRPCManagerPtr & instance()
ROSCONSOLE_DECL std::string g_last_error_message
std::string topic
Topic to subscribe to.
boost::shared_ptr< void const > message
bool allow_concurrent_callbacks
bool md5sumsMatch(const std::string &lhs, const std::string &rhs)
CallbackQueueInterface * callback_queue
Queue to add callbacks to. If NULL, the global callback queue will be used.
const std::string response
uint32_t queue_size
The maximum number of outgoing messages to be queued for delivery to subscribers. ...
void publish(const std::string &topic, const M &message)
bool registerSubscriber(const SubscriptionPtr &s, const std::string &datatype)
boost::recursive_mutex advertised_topics_mutex_
ConnectionManagerPtr connection_manager_