00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include "ros/topic_manager.h"
00029 #include "ros/xmlrpc_manager.h"
00030 #include "ros/connection_manager.h"
00031 #include "ros/poll_manager.h"
00032 #include "ros/publication.h"
00033 #include "ros/subscription.h"
00034 #include "ros/this_node.h"
00035 #include "ros/network.h"
00036 #include "ros/master.h"
00037 #include "ros/transport/transport_tcp.h"
00038 #include "ros/transport/transport_udp.h"
00039 #include "ros/rosout_appender.h"
00040 #include "ros/init.h"
00041 #include "ros/file_log.h"
00042 #include "ros/subscribe_options.h"
00043
00044 #include "XmlRpc.h"
00045
00046 #include <ros/console.h>
00047
00048 using namespace XmlRpc;
00049 using namespace std;
00050
00052
00053 namespace ros
00054 {
00055
00056 TopicManagerPtr g_topic_manager;
00057 boost::mutex g_topic_manager_mutex;
00058 const TopicManagerPtr& TopicManager::instance()
00059 {
00060 if (!g_topic_manager)
00061 {
00062 boost::mutex::scoped_lock lock(g_topic_manager_mutex);
00063 if (!g_topic_manager)
00064 {
00065 g_topic_manager = boost::make_shared<TopicManager>();
00066 }
00067 }
00068
00069 return g_topic_manager;
00070 }
00071
00072 TopicManager::TopicManager()
00073 : shutting_down_(false)
00074 {
00075 }
00076
00077 TopicManager::~TopicManager()
00078 {
00079 shutdown();
00080 }
00081
00082 void TopicManager::start()
00083 {
00084 boost::mutex::scoped_lock shutdown_lock(shutting_down_mutex_);
00085 shutting_down_ = false;
00086
00087 poll_manager_ = PollManager::instance();
00088 connection_manager_ = ConnectionManager::instance();
00089 xmlrpc_manager_ = XMLRPCManager::instance();
00090
00091 xmlrpc_manager_->bind("publisherUpdate", boost::bind(&TopicManager::pubUpdateCallback, this, _1, _2));
00092 xmlrpc_manager_->bind("requestTopic", boost::bind(&TopicManager::requestTopicCallback, this, _1, _2));
00093 xmlrpc_manager_->bind("getBusStats", boost::bind(&TopicManager::getBusStatsCallback, this, _1, _2));
00094 xmlrpc_manager_->bind("getBusInfo", boost::bind(&TopicManager::getBusInfoCallback, this, _1, _2));
00095 xmlrpc_manager_->bind("getSubscriptions", boost::bind(&TopicManager::getSubscriptionsCallback, this, _1, _2));
00096 xmlrpc_manager_->bind("getPublications", boost::bind(&TopicManager::getPublicationsCallback, this, _1, _2));
00097
00098 poll_manager_->addPollThreadListener(boost::bind(&TopicManager::processPublishQueues, this));
00099 }
00100
00101 void TopicManager::shutdown()
00102 {
00103 boost::mutex::scoped_lock shutdown_lock(shutting_down_mutex_);
00104 if (shutting_down_)
00105 {
00106 return;
00107 }
00108
00109 {
00110 boost::recursive_mutex::scoped_lock lock1(advertised_topics_mutex_);
00111 boost::mutex::scoped_lock lock2(subs_mutex_);
00112 shutting_down_ = true;
00113 }
00114
00115 xmlrpc_manager_->unbind("publisherUpdate");
00116 xmlrpc_manager_->unbind("requestTopic");
00117 xmlrpc_manager_->unbind("getBusStats");
00118 xmlrpc_manager_->unbind("getBusInfo");
00119 xmlrpc_manager_->unbind("getSubscriptions");
00120 xmlrpc_manager_->unbind("getPublications");
00121
00122 ROSCPP_LOG_DEBUG("Shutting down topics...");
00123 ROSCPP_LOG_DEBUG(" shutting down publishers");
00124 {
00125 boost::recursive_mutex::scoped_lock adv_lock(advertised_topics_mutex_);
00126
00127 for (V_Publication::iterator i = advertised_topics_.begin();
00128 i != advertised_topics_.end(); ++i)
00129 {
00130 if(!(*i)->isDropped())
00131 {
00132 unregisterPublisher((*i)->getName());
00133 }
00134 (*i)->drop();
00135 }
00136 advertised_topics_.clear();
00137 }
00138
00139
00140 ROSCPP_LOG_DEBUG(" shutting down subscribers");
00141 {
00142 boost::mutex::scoped_lock subs_lock(subs_mutex_);
00143
00144 for (L_Subscription::iterator s = subscriptions_.begin(); s != subscriptions_.end(); ++s)
00145 {
00146
00147 unregisterSubscriber((*s)->getName());
00148
00149 (*s)->shutdown();
00150 }
00151 subscriptions_.clear();
00152 }
00153 }
00154
00155 void TopicManager::processPublishQueues()
00156 {
00157 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00158
00159 V_Publication::iterator it = advertised_topics_.begin();
00160 V_Publication::iterator end = advertised_topics_.end();
00161 for (; it != end; ++it)
00162 {
00163 const PublicationPtr& pub = *it;
00164 pub->processPublishQueue();
00165 }
00166 }
00167
00168 void TopicManager::getAdvertisedTopics(V_string& topics)
00169 {
00170 boost::mutex::scoped_lock lock(advertised_topic_names_mutex_);
00171
00172 topics.resize(advertised_topic_names_.size());
00173 std::copy(advertised_topic_names_.begin(),
00174 advertised_topic_names_.end(),
00175 topics.begin());
00176 }
00177
00178 void TopicManager::getSubscribedTopics(V_string& topics)
00179 {
00180 boost::mutex::scoped_lock lock(subs_mutex_);
00181
00182 topics.reserve(subscriptions_.size());
00183 L_Subscription::const_iterator it = subscriptions_.begin();
00184 L_Subscription::const_iterator end = subscriptions_.end();
00185 for (; it != end; ++it)
00186 {
00187 const SubscriptionPtr& sub = *it;
00188 topics.push_back(sub->getName());
00189 }
00190 }
00191
00192 PublicationPtr TopicManager::lookupPublication(const std::string& topic)
00193 {
00194 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00195
00196 return lookupPublicationWithoutLock(topic);
00197 }
00198
00199 bool md5sumsMatch(const std::string& lhs, const std::string& rhs)
00200 {
00201 return lhs == "*" || rhs == "*" || lhs == rhs;
00202 }
00203
00204 bool TopicManager::addSubCallback(const SubscribeOptions& ops)
00205 {
00206
00207 bool found = false;
00208 bool found_topic = false;
00209
00210 SubscriptionPtr sub;
00211
00212 {
00213 if (isShuttingDown())
00214 {
00215 return false;
00216 }
00217
00218 for (L_Subscription::iterator s = subscriptions_.begin();
00219 s != subscriptions_.end() && !found; ++s)
00220 {
00221 sub = *s;
00222 if (!sub->isDropped() && sub->getName() == ops.topic)
00223 {
00224 found_topic = true;
00225 if (md5sumsMatch(ops.md5sum, sub->md5sum()))
00226 {
00227 found = true;
00228 }
00229 break;
00230 }
00231 }
00232 }
00233
00234 if (found_topic && !found)
00235 {
00236 std::stringstream ss;
00237 ss << "Tried to subscribe to a topic with the same name but different md5sum as a topic that was already subscribed [" << ops.datatype << "/" << ops.md5sum << " vs. " << sub->datatype() << "/" << sub->md5sum() << "]";
00238 throw ConflictingSubscriptionException(ss.str());
00239 }
00240 else if (found)
00241 {
00242 if (!sub->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object, ops.allow_concurrent_callbacks))
00243 {
00244 return false;
00245 }
00246 }
00247
00248 return found;
00249 }
00250
00251
00252 bool TopicManager::subscribe(const SubscribeOptions& ops)
00253 {
00254 boost::mutex::scoped_lock lock(subs_mutex_);
00255
00256 if (addSubCallback(ops))
00257 {
00258 return true;
00259 }
00260
00261 if (isShuttingDown())
00262 {
00263 return false;
00264 }
00265
00266 if (ops.md5sum.empty())
00267 {
00268 throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] with an empty md5sum");
00269 }
00270
00271 if (ops.datatype.empty())
00272 {
00273 throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] with an empty datatype");
00274 }
00275
00276 if (!ops.helper)
00277 {
00278 throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] without a callback");
00279 }
00280
00281 const std::string& md5sum = ops.md5sum;
00282 std::string datatype = ops.datatype;
00283
00284 SubscriptionPtr s(boost::make_shared<Subscription>(ops.topic, md5sum, datatype, ops.transport_hints));
00285 s->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object, ops.allow_concurrent_callbacks);
00286
00287 if (!registerSubscriber(s, ops.datatype))
00288 {
00289 ROS_WARN("couldn't register subscriber on topic [%s]", ops.topic.c_str());
00290 s->shutdown();
00291 return false;
00292 }
00293
00294 subscriptions_.push_back(s);
00295
00296 return true;
00297 }
00298
00299 bool TopicManager::advertise(const AdvertiseOptions& ops, const SubscriberCallbacksPtr& callbacks)
00300 {
00301 if (ops.datatype == "*")
00302 {
00303 std::stringstream ss;
00304 ss << "Advertising with * as the datatype is not allowed. Topic [" << ops.topic << "]";
00305 throw InvalidParameterException(ss.str());
00306 }
00307
00308 if (ops.md5sum == "*")
00309 {
00310 std::stringstream ss;
00311 ss << "Advertising with * as the md5sum is not allowed. Topic [" << ops.topic << "]";
00312 throw InvalidParameterException(ss.str());
00313 }
00314
00315 if (ops.md5sum.empty())
00316 {
00317 throw InvalidParameterException("Advertising on topic [" + ops.topic + "] with an empty md5sum");
00318 }
00319
00320 if (ops.datatype.empty())
00321 {
00322 throw InvalidParameterException("Advertising on topic [" + ops.topic + "] with an empty datatype");
00323 }
00324
00325 if (ops.message_definition.empty())
00326 {
00327 ROS_WARN("Advertising on topic [%s] with an empty message definition. Some tools (e.g. rosbag) may not work correctly.", ops.topic.c_str());
00328 }
00329
00330 PublicationPtr pub;
00331
00332 {
00333 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00334
00335 if (isShuttingDown())
00336 {
00337 return false;
00338 }
00339
00340 pub = lookupPublicationWithoutLock(ops.topic);
00341 if (pub && pub->getNumCallbacks() == 0)
00342 {
00343 pub.reset();
00344 }
00345
00346 if (pub)
00347 {
00348 if (pub->getMD5Sum() != ops.md5sum)
00349 {
00350 ROS_ERROR("Tried to advertise on topic [%s] with md5sum [%s] and datatype [%s], but the topic is already advertised as md5sum [%s] and datatype [%s]",
00351 ops.topic.c_str(), ops.md5sum.c_str(), ops.datatype.c_str(), pub->getMD5Sum().c_str(), pub->getDataType().c_str());
00352 return false;
00353 }
00354
00355 pub->addCallbacks(callbacks);
00356
00357 return true;
00358 }
00359
00360 pub = PublicationPtr(boost::make_shared<Publication>(ops.topic, ops.datatype, ops.md5sum, ops.message_definition, ops.queue_size, ops.latch, ops.has_header));
00361 pub->addCallbacks(callbacks);
00362 advertised_topics_.push_back(pub);
00363 }
00364
00365
00366 {
00367 boost::mutex::scoped_lock lock(advertised_topic_names_mutex_);
00368 advertised_topic_names_.push_back(ops.topic);
00369 }
00370
00371
00372
00373
00374
00375
00376 bool found = false;
00377 SubscriptionPtr sub;
00378 {
00379 boost::mutex::scoped_lock lock(subs_mutex_);
00380
00381 for (L_Subscription::iterator s = subscriptions_.begin();
00382 s != subscriptions_.end() && !found; ++s)
00383 {
00384 if ((*s)->getName() == ops.topic && md5sumsMatch((*s)->md5sum(), ops.md5sum) && !(*s)->isDropped())
00385 {
00386 found = true;
00387 sub = *s;
00388 break;
00389 }
00390 }
00391 }
00392
00393 if(found)
00394 {
00395 sub->addLocalConnection(pub);
00396 }
00397
00398 XmlRpcValue args, result, payload;
00399 args[0] = this_node::getName();
00400 args[1] = ops.topic;
00401 args[2] = ops.datatype;
00402 args[3] = xmlrpc_manager_->getServerURI();
00403 master::execute("registerPublisher", args, result, payload, true);
00404
00405 return true;
00406 }
00407
00408 bool TopicManager::unadvertise(const std::string &topic, const SubscriberCallbacksPtr& callbacks)
00409 {
00410 PublicationPtr pub;
00411 V_Publication::iterator i;
00412 {
00413 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00414
00415 if (isShuttingDown())
00416 {
00417 return false;
00418 }
00419
00420 for (i = advertised_topics_.begin();
00421 i != advertised_topics_.end(); ++i)
00422 {
00423 if(((*i)->getName() == topic) && (!(*i)->isDropped()))
00424 {
00425 pub = *i;
00426 break;
00427 }
00428 }
00429 }
00430
00431 if (!pub)
00432 {
00433 return false;
00434 }
00435
00436 pub->removeCallbacks(callbacks);
00437
00438 {
00439 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00440 if (pub->getNumCallbacks() == 0)
00441 {
00442 unregisterPublisher(pub->getName());
00443 pub->drop();
00444
00445 advertised_topics_.erase(i);
00446
00447 {
00448 boost::mutex::scoped_lock lock(advertised_topic_names_mutex_);
00449 advertised_topic_names_.remove(pub->getName());
00450 }
00451 }
00452 }
00453
00454 return true;
00455 }
00456
00457 bool TopicManager::unregisterPublisher(const std::string& topic)
00458 {
00459 XmlRpcValue args, result, payload;
00460 args[0] = this_node::getName();
00461 args[1] = topic;
00462 args[2] = xmlrpc_manager_->getServerURI();
00463 master::execute("unregisterPublisher", args, result, payload, false);
00464
00465 return true;
00466 }
00467
00468 bool TopicManager::isTopicAdvertised(const string &topic)
00469 {
00470 for (V_Publication::iterator t = advertised_topics_.begin(); t != advertised_topics_.end(); ++t)
00471 {
00472 if (((*t)->getName() == topic) && (!(*t)->isDropped()))
00473 {
00474 return true;
00475 }
00476 }
00477
00478 return false;
00479 }
00480
00481 bool TopicManager::registerSubscriber(const SubscriptionPtr& s, const string &datatype)
00482 {
00483 XmlRpcValue args, result, payload;
00484 args[0] = this_node::getName();
00485 args[1] = s->getName();
00486 args[2] = datatype;
00487 args[3] = xmlrpc_manager_->getServerURI();
00488
00489 if (!master::execute("registerSubscriber", args, result, payload, true))
00490 {
00491 return false;
00492 }
00493
00494 vector<string> pub_uris;
00495 for (int i = 0; i < payload.size(); i++)
00496 {
00497 if (payload[i] != xmlrpc_manager_->getServerURI())
00498 {
00499 pub_uris.push_back(string(payload[i]));
00500 }
00501 }
00502
00503 bool self_subscribed = false;
00504 PublicationPtr pub;
00505 const std::string& sub_md5sum = s->md5sum();
00506
00507 {
00508 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00509 V_Publication::const_iterator it = advertised_topics_.begin();
00510 V_Publication::const_iterator end = advertised_topics_.end();
00511 for (; it != end; ++it)
00512 {
00513 pub = *it;
00514 const std::string& pub_md5sum = pub->getMD5Sum();
00515
00516 if (pub->getName() == s->getName() && !pub->isDropped())
00517 {
00518 if (!md5sumsMatch(pub_md5sum, sub_md5sum))
00519 {
00520 ROS_ERROR("md5sum mismatch making local subscription to topic %s.",
00521 s->getName().c_str());
00522 ROS_ERROR("Subscriber expects type %s, md5sum %s",
00523 s->datatype().c_str(), s->md5sum().c_str());
00524 ROS_ERROR("Publisher provides type %s, md5sum %s",
00525 pub->getDataType().c_str(), pub->getMD5Sum().c_str());
00526 return false;
00527 }
00528
00529 self_subscribed = true;
00530 break;
00531 }
00532 }
00533 }
00534
00535 s->pubUpdate(pub_uris);
00536 if (self_subscribed)
00537 {
00538 s->addLocalConnection(pub);
00539 }
00540
00541 return true;
00542 }
00543
00544 bool TopicManager::unregisterSubscriber(const string &topic)
00545 {
00546 XmlRpcValue args, result, payload;
00547 args[0] = this_node::getName();
00548 args[1] = topic;
00549 args[2] = xmlrpc_manager_->getServerURI();
00550
00551 master::execute("unregisterSubscriber", args, result, payload, false);
00552
00553 return true;
00554 }
00555
00556 bool TopicManager::pubUpdate(const string &topic, const vector<string> &pubs)
00557 {
00558 SubscriptionPtr sub;
00559 {
00560 boost::mutex::scoped_lock lock(subs_mutex_);
00561
00562 if (isShuttingDown())
00563 {
00564 return false;
00565 }
00566
00567 ROS_DEBUG("Received update for topic [%s] (%d publishers)", topic.c_str(), (int)pubs.size());
00568
00569 for (L_Subscription::const_iterator s = subscriptions_.begin();
00570 s != subscriptions_.end(); ++s)
00571 {
00572 if ((*s)->getName() != topic || (*s)->isDropped())
00573 continue;
00574
00575 sub = *s;
00576 break;
00577 }
00578
00579 }
00580
00581 if (sub)
00582 {
00583 return sub->pubUpdate(pubs);
00584 }
00585 else
00586 {
00587 ROSCPP_LOG_DEBUG("got a request for updating publishers of topic %s, but I " \
00588 "don't have any subscribers to that topic.", topic.c_str());
00589 }
00590
00591 return false;
00592 }
00593
00594 bool TopicManager::requestTopic(const string &topic,
00595 XmlRpcValue &protos,
00596 XmlRpcValue &ret)
00597 {
00598 for (int proto_idx = 0; proto_idx < protos.size(); proto_idx++)
00599 {
00600 XmlRpcValue proto = protos[proto_idx];
00601 if (proto.getType() != XmlRpcValue::TypeArray)
00602 {
00603 ROSCPP_LOG_DEBUG( "requestTopic protocol list was not a list of lists");
00604 return false;
00605 }
00606
00607 if (proto[0].getType() != XmlRpcValue::TypeString)
00608 {
00609 ROSCPP_LOG_DEBUG( "requestTopic received a protocol list in which a sublist " \
00610 "did not start with a string");
00611 return false;
00612 }
00613
00614 string proto_name = proto[0];
00615 if (proto_name == string("TCPROS"))
00616 {
00617 XmlRpcValue tcpros_params;
00618 tcpros_params[0] = string("TCPROS");
00619 tcpros_params[1] = network::getHost();
00620 tcpros_params[2] = int(connection_manager_->getTCPPort());
00621 ret[0] = int(1);
00622 ret[1] = string();
00623 ret[2] = tcpros_params;
00624 return true;
00625 }
00626 else if (proto_name == string("UDPROS"))
00627 {
00628 if (proto.size() != 5 ||
00629 proto[1].getType() != XmlRpcValue::TypeBase64 ||
00630 proto[2].getType() != XmlRpcValue::TypeString ||
00631 proto[3].getType() != XmlRpcValue::TypeInt ||
00632 proto[4].getType() != XmlRpcValue::TypeInt)
00633 {
00634 ROSCPP_LOG_DEBUG("Invalid protocol parameters for UDPROS");
00635 return false;
00636 }
00637 std::vector<char> header_bytes = proto[1];
00638 boost::shared_array<uint8_t> buffer(new uint8_t[header_bytes.size()]);
00639 memcpy(buffer.get(), &header_bytes[0], header_bytes.size());
00640 Header h;
00641 string err;
00642 if (!h.parse(buffer, header_bytes.size(), err))
00643 {
00644 ROSCPP_LOG_DEBUG("Unable to parse UDPROS connection header: %s", err.c_str());
00645 return false;
00646 }
00647
00648 PublicationPtr pub_ptr = lookupPublication(topic);
00649 if(!pub_ptr)
00650 {
00651 ROSCPP_LOG_DEBUG("Unable to find advertised topic %s for UDPROS connection", topic.c_str());
00652 return false;
00653 }
00654
00655 std::string host = proto[2];
00656 int port = proto[3];
00657
00658 M_string m;
00659 std::string error_msg;
00660 if (!pub_ptr->validateHeader(h, error_msg))
00661 {
00662 ROSCPP_LOG_DEBUG("Error validating header from [%s:%d] for topic [%s]: %s", host.c_str(), port, topic.c_str(), error_msg.c_str());
00663 return false;
00664 }
00665
00666 int max_datagram_size = proto[4];
00667 int conn_id = connection_manager_->getNewConnectionID();
00668 TransportUDPPtr transport = connection_manager_->getUDPServerTransport()->createOutgoing(host, port, conn_id, max_datagram_size);
00669 if (!transport)
00670 {
00671 ROSCPP_LOG_DEBUG("Error creating outgoing transport for [%s:%d]", host.c_str(), port);
00672 return false;
00673 }
00674 connection_manager_->udprosIncomingConnection(transport, h);
00675
00676 XmlRpcValue udpros_params;
00677 udpros_params[0] = string("UDPROS");
00678 udpros_params[1] = network::getHost();
00679 udpros_params[2] = connection_manager_->getUDPServerTransport()->getServerPort();
00680 udpros_params[3] = conn_id;
00681 udpros_params[4] = max_datagram_size;
00682 m["topic"] = topic;
00683 m["md5sum"] = pub_ptr->getMD5Sum();
00684 m["type"] = pub_ptr->getDataType();
00685 m["callerid"] = this_node::getName();
00686 m["message_definition"] = pub_ptr->getMessageDefinition();
00687 boost::shared_array<uint8_t> msg_def_buffer;
00688 uint32_t len;
00689 Header::write(m, msg_def_buffer, len);
00690 XmlRpcValue v(msg_def_buffer.get(), len);
00691 udpros_params[5] = v;
00692 ret[0] = int(1);
00693 ret[1] = string();
00694 ret[2] = udpros_params;
00695 return true;
00696 }
00697 else
00698 {
00699 ROSCPP_LOG_DEBUG( "an unsupported protocol was offered: [%s]",
00700 proto_name.c_str());
00701 }
00702 }
00703
00704 ROSCPP_LOG_DEBUG( "Currently, roscpp only supports TCPROS. The caller to " \
00705 "requestTopic did not support TCPROS, so there are no " \
00706 "protocols in common.");
00707 return false;
00708 }
00709
00710 void TopicManager::publish(const std::string& topic, const boost::function<SerializedMessage(void)>& serfunc, SerializedMessage& m)
00711 {
00712 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00713
00714 if (isShuttingDown())
00715 {
00716 return;
00717 }
00718
00719 PublicationPtr p = lookupPublicationWithoutLock(topic);
00720 if (p->hasSubscribers() || p->isLatching())
00721 {
00722 ROS_DEBUG_NAMED("superdebug", "Publishing message on topic [%s] with sequence number [%d]", p->getName().c_str(), p->getSequence());
00723
00724
00725
00726 bool nocopy = false;
00727 bool serialize = false;
00728
00729
00730 if (m.type_info && m.message)
00731 {
00732 p->getPublishTypes(serialize, nocopy, *m.type_info);
00733 }
00734 else
00735 {
00736 serialize = true;
00737 }
00738
00739 if (!nocopy)
00740 {
00741 m.message.reset();
00742 m.type_info = 0;
00743 }
00744
00745 if (serialize || p->isLatching())
00746 {
00747 SerializedMessage m2 = serfunc();
00748 m.buf = m2.buf;
00749 m.num_bytes = m2.num_bytes;
00750 m.message_start = m2.message_start;
00751 }
00752
00753 p->publish(m);
00754
00755
00756
00757 if (serialize)
00758 {
00759 poll_manager_->getPollSet().signal();
00760 }
00761 }
00762 else
00763 {
00764 p->incrementSequence();
00765 }
00766 }
00767
00768 void TopicManager::incrementSequence(const std::string& topic)
00769 {
00770 PublicationPtr pub = lookupPublication(topic);
00771 if (pub)
00772 {
00773 pub->incrementSequence();
00774 }
00775 }
00776
00777 bool TopicManager::isLatched(const std::string& topic)
00778 {
00779 PublicationPtr pub = lookupPublication(topic);
00780 if (pub)
00781 {
00782 return pub->isLatched();
00783 }
00784
00785 return false;
00786 }
00787
00788 PublicationPtr TopicManager::lookupPublicationWithoutLock(const string &topic)
00789 {
00790 PublicationPtr t;
00791 for (V_Publication::iterator i = advertised_topics_.begin();
00792 !t && i != advertised_topics_.end(); ++i)
00793 {
00794 if (((*i)->getName() == topic) && (!(*i)->isDropped()))
00795 {
00796 t = *i;
00797 break;
00798 }
00799 }
00800
00801 return t;
00802 }
00803
00804 bool TopicManager::unsubscribe(const std::string &topic, const SubscriptionCallbackHelperPtr& helper)
00805 {
00806 SubscriptionPtr sub;
00807
00808 {
00809 boost::mutex::scoped_lock lock(subs_mutex_);
00810
00811 if (isShuttingDown())
00812 {
00813 return false;
00814 }
00815
00816 L_Subscription::iterator it;
00817 for (it = subscriptions_.begin();
00818 it != subscriptions_.end(); ++it)
00819 {
00820 if ((*it)->getName() == topic)
00821 {
00822 sub = *it;
00823 break;
00824 }
00825 }
00826 }
00827
00828 if (!sub)
00829 {
00830 return false;
00831 }
00832
00833 sub->removeCallback(helper);
00834
00835 if (sub->getNumCallbacks() == 0)
00836 {
00837
00838 {
00839 boost::mutex::scoped_lock lock(subs_mutex_);
00840
00841 L_Subscription::iterator it;
00842 for (it = subscriptions_.begin();
00843 it != subscriptions_.end(); ++it)
00844 {
00845 if ((*it)->getName() == topic)
00846 {
00847 subscriptions_.erase(it);
00848 break;
00849 }
00850 }
00851
00852 if (!unregisterSubscriber(topic))
00853 {
00854 ROSCPP_LOG_DEBUG("Couldn't unregister subscriber for topic [%s]", topic.c_str());
00855 }
00856 }
00857
00858 sub->shutdown();
00859 return true;
00860 }
00861
00862 return true;
00863 }
00864
00865 size_t TopicManager::getNumSubscribers(const std::string &topic)
00866 {
00867 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00868
00869 if (isShuttingDown())
00870 {
00871 return 0;
00872 }
00873
00874 PublicationPtr p = lookupPublicationWithoutLock(topic);
00875 if (p)
00876 {
00877 return p->getNumSubscribers();
00878 }
00879
00880 return 0;
00881 }
00882
00883 size_t TopicManager::getNumSubscriptions()
00884 {
00885 boost::mutex::scoped_lock lock(subs_mutex_);
00886 return subscriptions_.size();
00887 }
00888
00889 size_t TopicManager::getNumPublishers(const std::string &topic)
00890 {
00891 boost::mutex::scoped_lock lock(subs_mutex_);
00892
00893 if (isShuttingDown())
00894 {
00895 return 0;
00896 }
00897
00898 for (L_Subscription::const_iterator t = subscriptions_.begin();
00899 t != subscriptions_.end(); ++t)
00900 {
00901 if (!(*t)->isDropped() && (*t)->getName() == topic)
00902 {
00903 return (*t)->getNumPublishers();
00904 }
00905 }
00906
00907 return 0;
00908 }
00909
00910 void TopicManager::getBusStats(XmlRpcValue &stats)
00911 {
00912 XmlRpcValue publish_stats, subscribe_stats, service_stats;
00913
00914 publish_stats.setSize(0);
00915 subscribe_stats.setSize(0);
00916 service_stats.setSize(0);
00917
00918 uint32_t pidx = 0;
00919 {
00920 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00921 for (V_Publication::iterator t = advertised_topics_.begin();
00922 t != advertised_topics_.end(); ++t)
00923 {
00924 publish_stats[pidx++] = (*t)->getStats();
00925 }
00926 }
00927
00928 {
00929 uint32_t sidx = 0;
00930
00931 boost::mutex::scoped_lock lock(subs_mutex_);
00932 for (L_Subscription::iterator t = subscriptions_.begin(); t != subscriptions_.end(); ++t)
00933 {
00934 subscribe_stats[sidx++] = (*t)->getStats();
00935 }
00936 }
00937
00938 stats[0] = publish_stats;
00939 stats[1] = subscribe_stats;
00940 stats[2] = service_stats;
00941 }
00942
00943 void TopicManager::getBusInfo(XmlRpcValue &info)
00944 {
00945
00946 info.setSize(0);
00947
00948 {
00949 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00950
00951 for (V_Publication::iterator t = advertised_topics_.begin();
00952 t != advertised_topics_.end(); ++t)
00953 {
00954 (*t)->getInfo(info);
00955 }
00956 }
00957
00958 {
00959 boost::mutex::scoped_lock lock(subs_mutex_);
00960
00961 for (L_Subscription::iterator t = subscriptions_.begin(); t != subscriptions_.end(); ++t)
00962 {
00963 (*t)->getInfo(info);
00964 }
00965 }
00966 }
00967
00968 void TopicManager::getSubscriptions(XmlRpcValue &subs)
00969 {
00970
00971 subs.setSize(0);
00972
00973 {
00974 boost::mutex::scoped_lock lock(subs_mutex_);
00975
00976 uint32_t sidx = 0;
00977
00978 for (L_Subscription::iterator t = subscriptions_.begin(); t != subscriptions_.end(); ++t)
00979 {
00980 XmlRpcValue sub;
00981 sub[0] = (*t)->getName();
00982 sub[1] = (*t)->datatype();
00983 subs[sidx++] = sub;
00984 }
00985 }
00986 }
00987
00988 void TopicManager::getPublications(XmlRpcValue &pubs)
00989 {
00990
00991 pubs.setSize(0);
00992
00993 {
00994 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00995
00996 uint32_t sidx = 0;
00997
00998 for (V_Publication::iterator t = advertised_topics_.begin();
00999 t != advertised_topics_.end(); ++t)
01000 {
01001 XmlRpcValue pub;
01002 pub[0] = (*t)->getName();
01003 pub[1] = (*t)->getDataType();
01004 pubs[sidx++] = pub;
01005 }
01006
01007 }
01008 }
01009
01010 extern std::string console::g_last_error_message;
01011
01012 void TopicManager::pubUpdateCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result)
01013 {
01014 std::vector<std::string> pubs;
01015 for (int idx = 0; idx < params[2].size(); idx++)
01016 {
01017 pubs.push_back(params[2][idx]);
01018 }
01019 if (pubUpdate(params[1], pubs))
01020 {
01021 result = xmlrpc::responseInt(1, "", 0);
01022 }
01023 else
01024 {
01025 result = xmlrpc::responseInt(0, console::g_last_error_message, 0);
01026 }
01027 }
01028
01029 void TopicManager::requestTopicCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result)
01030 {
01031 if (!requestTopic(params[1], params[2], result))
01032 {
01033 result = xmlrpc::responseInt(0, console::g_last_error_message, 0);
01034 }
01035 }
01036
01037 void TopicManager::getBusStatsCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result)
01038 {
01039 (void)params;
01040 result[0] = 1;
01041 result[1] = std::string("");
01042 XmlRpcValue response;
01043 getBusStats(result);
01044 result[2] = response;
01045 }
01046
01047 void TopicManager::getBusInfoCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result)
01048 {
01049 (void)params;
01050 result[0] = 1;
01051 result[1] = std::string("");
01052 XmlRpcValue response;
01053 getBusInfo(response);
01054 result[2] = response;
01055 }
01056
01057 void TopicManager::getSubscriptionsCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result)
01058 {
01059 (void)params;
01060 result[0] = 1;
01061 result[1] = std::string("subscriptions");
01062 XmlRpcValue response;
01063 getSubscriptions(response);
01064 result[2] = response;
01065 }
01066
01067 void TopicManager::getPublicationsCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result)
01068 {
01069 (void)params;
01070 result[0] = 1;
01071 result[1] = std::string("publications");
01072 XmlRpcValue response;
01073 getPublications(response);
01074 result[2] = response;
01075 }
01076
01077 }