00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include "ros/topic_manager.h"
00029 #include "ros/xmlrpc_manager.h"
00030 #include "ros/connection_manager.h"
00031 #include "ros/poll_manager.h"
00032 #include "ros/publication.h"
00033 #include "ros/subscription.h"
00034 #include "ros/this_node.h"
00035 #include "ros/network.h"
00036 #include "ros/master.h"
00037 #include "ros/transport/transport_tcp.h"
00038 #include "ros/transport/transport_udp.h"
00039 #include "ros/rosout_appender.h"
00040 #include "ros/init.h"
00041 #include "ros/file_log.h"
00042 #include "ros/subscribe_options.h"
00043
00044 #include "XmlRpc.h"
00045
00046 #include <ros/console.h>
00047
00048 using namespace XmlRpc;
00049 using namespace std;
00050
00052
00053 namespace ros
00054 {
00055
00056 TopicManagerPtr g_topic_manager;
00057 boost::mutex g_topic_manager_mutex;
00058 const TopicManagerPtr& TopicManager::instance()
00059 {
00060 if (!g_topic_manager)
00061 {
00062 boost::mutex::scoped_lock lock(g_topic_manager_mutex);
00063 if (!g_topic_manager)
00064 {
00065 g_topic_manager.reset(new TopicManager);
00066 }
00067 }
00068
00069 return g_topic_manager;
00070 }
00071
00072 TopicManager::TopicManager()
00073 : shutting_down_(false)
00074 {
00075 }
00076
00077 TopicManager::~TopicManager()
00078 {
00079 shutdown();
00080 }
00081
00082 void TopicManager::start()
00083 {
00084 boost::mutex::scoped_lock shutdown_lock(shutting_down_mutex_);
00085 shutting_down_ = false;
00086
00087 poll_manager_ = PollManager::instance();
00088 connection_manager_ = ConnectionManager::instance();
00089 xmlrpc_manager_ = XMLRPCManager::instance();
00090
00091 xmlrpc_manager_->bind("publisherUpdate", boost::bind(&TopicManager::pubUpdateCallback, this, _1, _2));
00092 xmlrpc_manager_->bind("requestTopic", boost::bind(&TopicManager::requestTopicCallback, this, _1, _2));
00093 xmlrpc_manager_->bind("getBusStats", boost::bind(&TopicManager::getBusStatsCallback, this, _1, _2));
00094 xmlrpc_manager_->bind("getBusInfo", boost::bind(&TopicManager::getBusInfoCallback, this, _1, _2));
00095 xmlrpc_manager_->bind("getSubscriptions", boost::bind(&TopicManager::getSubscriptionsCallback, this, _1, _2));
00096 xmlrpc_manager_->bind("getPublications", boost::bind(&TopicManager::getPublicationsCallback, this, _1, _2));
00097
00098 poll_manager_->addPollThreadListener(boost::bind(&TopicManager::processPublishQueues, this));
00099 }
00100
00101 void TopicManager::shutdown()
00102 {
00103 boost::mutex::scoped_lock shutdown_lock(shutting_down_mutex_);
00104 if (shutting_down_)
00105 {
00106 return;
00107 }
00108
00109 {
00110 boost::recursive_mutex::scoped_lock lock1(advertised_topics_mutex_);
00111 boost::mutex::scoped_lock lock2(subs_mutex_);
00112 shutting_down_ = true;
00113 }
00114
00115 xmlrpc_manager_->unbind("publisherUpdate");
00116 xmlrpc_manager_->unbind("requestTopic");
00117 xmlrpc_manager_->unbind("getBusStats");
00118 xmlrpc_manager_->unbind("getBusInfo");
00119 xmlrpc_manager_->unbind("getSubscriptions");
00120 xmlrpc_manager_->unbind("getPublications");
00121
00122 ROSCPP_LOG_DEBUG("Shutting down topics...");
00123 ROSCPP_LOG_DEBUG(" shutting down publishers");
00124 {
00125 boost::recursive_mutex::scoped_lock adv_lock(advertised_topics_mutex_);
00126
00127 for (V_Publication::iterator i = advertised_topics_.begin();
00128 i != advertised_topics_.end(); ++i)
00129 {
00130 if(!(*i)->isDropped())
00131 {
00132 unregisterPublisher((*i)->getName());
00133 }
00134 (*i)->drop();
00135 }
00136 advertised_topics_.clear();
00137 }
00138
00139
00140 ROSCPP_LOG_DEBUG(" shutting down subscribers");
00141 {
00142 boost::mutex::scoped_lock subs_lock(subs_mutex_);
00143
00144 for (L_Subscription::iterator s = subscriptions_.begin(); s != subscriptions_.end(); ++s)
00145 {
00146
00147 unregisterSubscriber((*s)->getName());
00148
00149 (*s)->shutdown();
00150 }
00151 subscriptions_.clear();
00152 }
00153 }
00154
00155 void TopicManager::processPublishQueues()
00156 {
00157 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00158
00159 V_Publication::iterator it = advertised_topics_.begin();
00160 V_Publication::iterator end = advertised_topics_.end();
00161 for (; it != end; ++it)
00162 {
00163 const PublicationPtr& pub = *it;
00164 pub->processPublishQueue();
00165 }
00166 }
00167
00168 void TopicManager::getAdvertisedTopics(V_string& topics)
00169 {
00170 boost::mutex::scoped_lock lock(advertised_topic_names_mutex_);
00171
00172 topics.resize(advertised_topic_names_.size());
00173 std::copy(advertised_topic_names_.begin(),
00174 advertised_topic_names_.end(),
00175 topics.begin());
00176 }
00177
00178 void TopicManager::getSubscribedTopics(V_string& topics)
00179 {
00180 boost::mutex::scoped_lock lock(subs_mutex_);
00181
00182 topics.reserve(subscriptions_.size());
00183 L_Subscription::const_iterator it = subscriptions_.begin();
00184 L_Subscription::const_iterator end = subscriptions_.end();
00185 for (; it != end; ++it)
00186 {
00187 const SubscriptionPtr& sub = *it;
00188 topics.push_back(sub->getName());
00189 }
00190 }
00191
00192 PublicationPtr TopicManager::lookupPublication(const std::string& topic)
00193 {
00194 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00195
00196 return lookupPublicationWithoutLock(topic);
00197 }
00198
00199 bool md5sumsMatch(const std::string& lhs, const std::string& rhs)
00200 {
00201 return lhs == "*" || rhs == "*" || lhs == rhs;
00202 }
00203
00204 bool TopicManager::addSubCallback(const SubscribeOptions& ops)
00205 {
00206
00207 bool found = false;
00208 bool found_topic = false;
00209
00210 SubscriptionPtr sub;
00211
00212 {
00213 if (isShuttingDown())
00214 {
00215 return false;
00216 }
00217
00218 for (L_Subscription::iterator s = subscriptions_.begin();
00219 s != subscriptions_.end() && !found; ++s)
00220 {
00221 sub = *s;
00222 if (!sub->isDropped() && sub->getName() == ops.topic)
00223 {
00224 found_topic = true;
00225 if (md5sumsMatch(ops.md5sum, sub->md5sum()))
00226 {
00227 found = true;
00228 }
00229 break;
00230 }
00231 }
00232 }
00233
00234 if (found_topic && !found)
00235 {
00236 std::stringstream ss;
00237 ss << "Tried to subscribe to a topic with the same name but different md5sum as a topic that was already subscribed [" << ops.datatype << "/" << ops.md5sum << " vs. " << sub->datatype() << "/" << sub->md5sum() << "]";
00238 throw ConflictingSubscriptionException(ss.str());
00239 }
00240 else if (found)
00241 {
00242 if (!sub->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object, ops.allow_concurrent_callbacks))
00243 {
00244 return false;
00245 }
00246 }
00247
00248 return found;
00249 }
00250
00251
00252 bool TopicManager::subscribe(const SubscribeOptions& ops)
00253 {
00254 boost::mutex::scoped_lock lock(subs_mutex_);
00255
00256 if (addSubCallback(ops))
00257 {
00258 return true;
00259 }
00260
00261 if (isShuttingDown())
00262 {
00263 return false;
00264 }
00265
00266 if (ops.md5sum.empty())
00267 {
00268 throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] with an empty md5sum");
00269 }
00270
00271 if (ops.datatype.empty())
00272 {
00273 throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] with an empty datatype");
00274 }
00275
00276 if (!ops.helper)
00277 {
00278 throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] without a callback");
00279 }
00280
00281 const std::string& md5sum = ops.md5sum;
00282 std::string datatype = ops.datatype;
00283
00284 SubscriptionPtr s(new Subscription(ops.topic, md5sum, datatype, ops.transport_hints));
00285 s->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object, ops.allow_concurrent_callbacks);
00286
00287 if (!registerSubscriber(s, ops.datatype))
00288 {
00289 ROS_WARN("couldn't register subscriber on topic [%s]", ops.topic.c_str());
00290 s->shutdown();
00291 return false;
00292 }
00293
00294 subscriptions_.push_back(s);
00295
00296 return true;
00297 }
00298
00299 bool TopicManager::advertise(const AdvertiseOptions& ops, const SubscriberCallbacksPtr& callbacks)
00300 {
00301 if (ops.datatype == "*")
00302 {
00303 std::stringstream ss;
00304 ss << "Advertising with * as the datatype is not allowed. Topic [" << ops.topic << "]";
00305 throw InvalidParameterException(ss.str());
00306 }
00307
00308 if (ops.md5sum == "*")
00309 {
00310 std::stringstream ss;
00311 ss << "Advertising with * as the md5sum is not allowed. Topic [" << ops.topic << "]";
00312 throw InvalidParameterException(ss.str());
00313 }
00314
00315 if (ops.md5sum.empty())
00316 {
00317 throw InvalidParameterException("Advertising on topic [" + ops.topic + "] with an empty md5sum");
00318 }
00319
00320 if (ops.datatype.empty())
00321 {
00322 throw InvalidParameterException("Advertising on topic [" + ops.topic + "] with an empty datatype");
00323 }
00324
00325 if (ops.message_definition.empty())
00326 {
00327 ROS_WARN("Advertising on topic [%s] with an empty message definition. Some tools (e.g. rosbag) may not work correctly.", ops.topic.c_str());
00328 }
00329
00330 PublicationPtr pub;
00331
00332 {
00333 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00334
00335 if (isShuttingDown())
00336 {
00337 return false;
00338 }
00339
00340 pub = lookupPublicationWithoutLock(ops.topic);
00341 if (pub && pub->getNumCallbacks() == 0)
00342 {
00343 pub.reset();
00344 }
00345
00346 if (pub)
00347 {
00348 if (pub->getMD5Sum() != ops.md5sum)
00349 {
00350 ROS_ERROR("Tried to advertise on topic [%s] with md5sum [%s] and datatype [%s], but the topic is already advertised as md5sum [%s] and datatype [%s]",
00351 ops.topic.c_str(), ops.md5sum.c_str(), ops.datatype.c_str(), pub->getMD5Sum().c_str(), pub->getDataType().c_str());
00352 return false;
00353 }
00354
00355 pub->addCallbacks(callbacks);
00356
00357 return true;
00358 }
00359
00360 pub = PublicationPtr(new Publication(ops.topic, ops.datatype, ops.md5sum, ops.message_definition, ops.queue_size, ops.latch, ops.has_header));
00361 pub->addCallbacks(callbacks);
00362 advertised_topics_.push_back(pub);
00363 }
00364
00365
00366 {
00367 boost::mutex::scoped_lock lock(advertised_topic_names_mutex_);
00368 advertised_topic_names_.push_back(ops.topic);
00369 }
00370
00371
00372
00373
00374
00375
00376 bool found = false;
00377 SubscriptionPtr sub;
00378 {
00379 boost::mutex::scoped_lock lock(subs_mutex_);
00380
00381 for (L_Subscription::iterator s = subscriptions_.begin();
00382 s != subscriptions_.end() && !found; ++s)
00383 {
00384 if ((*s)->getName() == ops.topic && md5sumsMatch((*s)->md5sum(), ops.md5sum) && !(*s)->isDropped())
00385 {
00386 found = true;
00387 sub = *s;
00388 break;
00389 }
00390 }
00391 }
00392
00393 if(found)
00394 {
00395 sub->addLocalConnection(pub);
00396 }
00397
00398 XmlRpcValue args, result, payload;
00399 args[0] = this_node::getName();
00400 args[1] = ops.topic;
00401 args[2] = ops.datatype;
00402 args[3] = xmlrpc_manager_->getServerURI();
00403 master::execute("registerPublisher", args, result, payload, true);
00404
00405 return true;
00406 }
00407
00408 bool TopicManager::unadvertise(const std::string &topic, const SubscriberCallbacksPtr& callbacks)
00409 {
00410 PublicationPtr pub;
00411 V_Publication::iterator i;
00412 {
00413 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00414
00415 if (isShuttingDown())
00416 {
00417 return false;
00418 }
00419
00420 for (i = advertised_topics_.begin();
00421 i != advertised_topics_.end(); ++i)
00422 {
00423 if(((*i)->getName() == topic) && (!(*i)->isDropped()))
00424 {
00425 pub = *i;
00426 break;
00427 }
00428 }
00429 }
00430
00431 if (!pub)
00432 {
00433 return false;
00434 }
00435
00436 pub->removeCallbacks(callbacks);
00437
00438 {
00439 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00440 if (pub->getNumCallbacks() == 0)
00441 {
00442 unregisterPublisher(pub->getName());
00443 pub->drop();
00444
00445 advertised_topics_.erase(i);
00446
00447 {
00448 boost::mutex::scoped_lock lock(advertised_topic_names_mutex_);
00449 advertised_topic_names_.remove(pub->getName());
00450 }
00451 }
00452 }
00453
00454 return true;
00455 }
00456
00457 bool TopicManager::unregisterPublisher(const std::string& topic)
00458 {
00459 XmlRpcValue args, result, payload;
00460 args[0] = this_node::getName();
00461 args[1] = topic;
00462 args[2] = xmlrpc_manager_->getServerURI();
00463 master::execute("unregisterPublisher", args, result, payload, false);
00464
00465 return true;
00466 }
00467
00468 bool TopicManager::isTopicAdvertised(const string &topic)
00469 {
00470 for (V_Publication::iterator t = advertised_topics_.begin(); t != advertised_topics_.end(); ++t)
00471 {
00472 if (((*t)->getName() == topic) && (!(*t)->isDropped()))
00473 {
00474 return true;
00475 }
00476 }
00477
00478 return false;
00479 }
00480
00481 bool TopicManager::registerSubscriber(const SubscriptionPtr& s, const string &datatype)
00482 {
00483 XmlRpcValue args, result, payload;
00484 args[0] = this_node::getName();
00485 args[1] = s->getName();
00486 args[2] = datatype;
00487 args[3] = xmlrpc_manager_->getServerURI();
00488
00489 if (!master::execute("registerSubscriber", args, result, payload, true))
00490 {
00491 return false;
00492 }
00493
00494 vector<string> pub_uris;
00495 for (int i = 0; i < payload.size(); i++)
00496 {
00497 if (payload[i] != xmlrpc_manager_->getServerURI())
00498 {
00499 pub_uris.push_back(string(payload[i]));
00500 }
00501 }
00502
00503 bool self_subscribed = false;
00504 PublicationPtr pub;
00505 const std::string& sub_md5sum = s->md5sum();
00506
00507 {
00508 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00509 V_Publication::const_iterator it = advertised_topics_.begin();
00510 V_Publication::const_iterator end = advertised_topics_.end();
00511 for (; it != end; ++it)
00512 {
00513 pub = *it;
00514 const std::string& pub_md5sum = pub->getMD5Sum();
00515 if (pub->getName() == s->getName() && md5sumsMatch(pub_md5sum, sub_md5sum) && !pub->isDropped())
00516 {
00517 self_subscribed = true;
00518 break;
00519 }
00520 }
00521 }
00522
00523 s->pubUpdate(pub_uris);
00524 if (self_subscribed)
00525 {
00526 s->addLocalConnection(pub);
00527 }
00528
00529 return true;
00530 }
00531
00532 bool TopicManager::unregisterSubscriber(const string &topic)
00533 {
00534 XmlRpcValue args, result, payload;
00535 args[0] = this_node::getName();
00536 args[1] = topic;
00537 args[2] = xmlrpc_manager_->getServerURI();
00538
00539 master::execute("unregisterSubscriber", args, result, payload, false);
00540
00541 return true;
00542 }
00543
00544 bool TopicManager::pubUpdate(const string &topic, const vector<string> &pubs)
00545 {
00546 SubscriptionPtr sub;
00547 {
00548 boost::mutex::scoped_lock lock(subs_mutex_);
00549
00550 if (isShuttingDown())
00551 {
00552 return false;
00553 }
00554
00555 ROS_DEBUG("Received update for topic [%s] (%d publishers)", topic.c_str(), (int)pubs.size());
00556
00557 for (L_Subscription::const_iterator s = subscriptions_.begin();
00558 s != subscriptions_.end(); ++s)
00559 {
00560 if ((*s)->getName() != topic || (*s)->isDropped())
00561 continue;
00562
00563 sub = *s;
00564 break;
00565 }
00566
00567 }
00568
00569 if (sub)
00570 {
00571 sub->pubUpdate(pubs);
00572 }
00573 else
00574 {
00575 ROSCPP_LOG_DEBUG("got a request for updating publishers of topic %s, but I " \
00576 "don't have any subscribers to that topic.", topic.c_str());
00577 }
00578
00579 return false;
00580 }
00581
00582 bool TopicManager::requestTopic(const string &topic,
00583 XmlRpcValue &protos,
00584 XmlRpcValue &ret)
00585 {
00586 for (int proto_idx = 0; proto_idx < protos.size(); proto_idx++)
00587 {
00588 XmlRpcValue proto = protos[proto_idx];
00589 if (proto.getType() != XmlRpcValue::TypeArray)
00590 {
00591 ROSCPP_LOG_DEBUG( "requestTopic protocol list was not a list of lists");
00592 return false;
00593 }
00594
00595 if (proto[0].getType() != XmlRpcValue::TypeString)
00596 {
00597 ROSCPP_LOG_DEBUG( "requestTopic received a protocol list in which a sublist " \
00598 "did not start with a string");
00599 return false;
00600 }
00601
00602 string proto_name = proto[0];
00603 if (proto_name == string("TCPROS"))
00604 {
00605 XmlRpcValue tcpros_params;
00606 tcpros_params[0] = string("TCPROS");
00607 tcpros_params[1] = network::getHost();
00608 tcpros_params[2] = int(connection_manager_->getTCPPort());
00609 ret[0] = int(1);
00610 ret[1] = string();
00611 ret[2] = tcpros_params;
00612 return true;
00613 }
00614 else if (proto_name == string("UDPROS"))
00615 {
00616 if (proto.size() != 5 ||
00617 proto[1].getType() != XmlRpcValue::TypeBase64 ||
00618 proto[2].getType() != XmlRpcValue::TypeString ||
00619 proto[3].getType() != XmlRpcValue::TypeInt ||
00620 proto[4].getType() != XmlRpcValue::TypeInt)
00621 {
00622 ROSCPP_LOG_DEBUG("Invalid protocol parameters for UDPROS");
00623 return false;
00624 }
00625 std::vector<char> header_bytes = proto[1];
00626 boost::shared_array<uint8_t> buffer = boost::shared_array<uint8_t>(new uint8_t[header_bytes.size()]);
00627 memcpy(buffer.get(), &header_bytes[0], header_bytes.size());
00628 Header h;
00629 string err;
00630 if (!h.parse(buffer, header_bytes.size(), err))
00631 {
00632 ROSCPP_LOG_DEBUG("Unable to parse UDPROS connection header: %s", err.c_str());
00633 return false;
00634 }
00635
00636 PublicationPtr pub_ptr = lookupPublication(topic);
00637 if(!pub_ptr)
00638 {
00639 ROSCPP_LOG_DEBUG("Unable to find advertised topic %s for UDPROS connection", topic.c_str());
00640 return false;
00641 }
00642
00643 std::string host = proto[2];
00644 int port = proto[3];
00645
00646 M_string m;
00647 std::string error_msg;
00648 if (!pub_ptr->validateHeader(h, error_msg))
00649 {
00650 ROSCPP_LOG_DEBUG("Error validating header from [%s:%d] for topic [%s]: %s", host.c_str(), port, topic.c_str(), error_msg.c_str());
00651 return false;
00652 }
00653
00654 int max_datagram_size = proto[4];
00655 int conn_id = connection_manager_->getNewConnectionID();
00656 TransportUDPPtr transport = connection_manager_->getUDPServerTransport()->createOutgoing(host, port, conn_id, max_datagram_size);
00657 connection_manager_->udprosIncomingConnection(transport, h);
00658
00659 XmlRpcValue udpros_params;
00660 udpros_params[0] = string("UDPROS");
00661 udpros_params[1] = network::getHost();
00662 udpros_params[2] = connection_manager_->getUDPServerTransport()->getServerPort();
00663 udpros_params[3] = conn_id;
00664 udpros_params[4] = max_datagram_size;
00665 m["topic"] = topic;
00666 m["md5sum"] = pub_ptr->getMD5Sum();
00667 m["type"] = pub_ptr->getDataType();
00668 m["callerid"] = this_node::getName();
00669 m["message_definition"] = pub_ptr->getMessageDefinition();
00670 boost::shared_array<uint8_t> msg_def_buffer;
00671 uint32_t len;
00672 Header::write(m, msg_def_buffer, len);
00673 XmlRpcValue v(msg_def_buffer.get(), len);
00674 udpros_params[5] = v;
00675 ret[0] = int(1);
00676 ret[1] = string();
00677 ret[2] = udpros_params;
00678 return true;
00679 }
00680 else
00681 {
00682 ROSCPP_LOG_DEBUG( "an unsupported protocol was offered: [%s]",
00683 proto_name.c_str());
00684 }
00685 }
00686
00687 ROSCPP_LOG_DEBUG( "Currently, roscpp only supports TCPROS. The caller to " \
00688 "requestTopic did not support TCPROS, so there are no " \
00689 "protocols in common.");
00690 return false;
00691 }
00692
00693 void TopicManager::publish(const std::string& topic, const boost::function<SerializedMessage(void)>& serfunc, SerializedMessage& m)
00694 {
00695 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00696
00697 if (isShuttingDown())
00698 {
00699 return;
00700 }
00701
00702 PublicationPtr p = lookupPublicationWithoutLock(topic);
00703 if (p->hasSubscribers() || p->isLatching())
00704 {
00705 ROS_DEBUG_NAMED("superdebug", "Publishing message on topic [%s] with sequence number [%d]", p->getName().c_str(), p->getSequence());
00706
00707
00708
00709 bool nocopy = false;
00710 bool serialize = false;
00711
00712
00713 if (m.type_info && m.message)
00714 {
00715 p->getPublishTypes(serialize, nocopy, *m.type_info);
00716 }
00717 else
00718 {
00719 serialize = true;
00720 }
00721
00722 if (!nocopy)
00723 {
00724 m.message.reset();
00725 m.type_info = 0;
00726 }
00727
00728 if (serialize)
00729 {
00730 SerializedMessage m2 = serfunc();
00731 m.buf = m2.buf;
00732 m.num_bytes = m2.num_bytes;
00733 m.message_start = m2.message_start;
00734 }
00735
00736 p->publish(m);
00737
00738
00739
00740 if (serialize)
00741 {
00742 poll_manager_->getPollSet().signal();
00743 }
00744 }
00745 else
00746 {
00747 p->incrementSequence();
00748 }
00749 }
00750
00751 void TopicManager::incrementSequence(const std::string& topic)
00752 {
00753 PublicationPtr pub = lookupPublication(topic);
00754 if (pub)
00755 {
00756 pub->incrementSequence();
00757 }
00758 }
00759
00760 bool TopicManager::isLatched(const std::string& topic)
00761 {
00762 PublicationPtr pub = lookupPublication(topic);
00763 if (pub)
00764 {
00765 return pub->isLatched();
00766 }
00767
00768 return false;
00769 }
00770
00771 PublicationPtr TopicManager::lookupPublicationWithoutLock(const string &topic)
00772 {
00773 PublicationPtr t;
00774 for (V_Publication::iterator i = advertised_topics_.begin();
00775 !t && i != advertised_topics_.end(); ++i)
00776 {
00777 if (((*i)->getName() == topic) && (!(*i)->isDropped()))
00778 {
00779 t = *i;
00780 break;
00781 }
00782 }
00783
00784 return t;
00785 }
00786
00787 bool TopicManager::unsubscribe(const std::string &topic, const SubscriptionCallbackHelperPtr& helper)
00788 {
00789 SubscriptionPtr sub;
00790 L_Subscription::iterator it;
00791
00792 {
00793 boost::mutex::scoped_lock lock(subs_mutex_);
00794
00795 if (isShuttingDown())
00796 {
00797 return false;
00798 }
00799
00800 for (it = subscriptions_.begin();
00801 it != subscriptions_.end() && !sub; ++it)
00802 {
00803 if ((*it)->getName() == topic)
00804 {
00805 sub = *it;
00806 break;
00807 }
00808 }
00809 }
00810
00811 if (!sub)
00812 {
00813 return false;
00814 }
00815
00816 sub->removeCallback(helper);
00817
00818 if (sub->getNumCallbacks() == 0)
00819 {
00820
00821 {
00822 boost::mutex::scoped_lock lock(subs_mutex_);
00823
00824 subscriptions_.erase(it);
00825
00826 if (!unregisterSubscriber(topic))
00827 {
00828 ROSCPP_LOG_DEBUG("Couldn't unregister subscriber for topic [%s]", topic.c_str());
00829 }
00830 }
00831
00832 sub->shutdown();
00833 return true;
00834 }
00835
00836 return true;
00837 }
00838
00839 size_t TopicManager::getNumSubscribers(const std::string &topic)
00840 {
00841 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00842
00843 if (isShuttingDown())
00844 {
00845 return 0;
00846 }
00847
00848 PublicationPtr p = lookupPublicationWithoutLock(topic);
00849 if (p)
00850 {
00851 return p->getNumSubscribers();
00852 }
00853
00854 return 0;
00855 }
00856
00857 size_t TopicManager::getNumSubscriptions()
00858 {
00859 boost::mutex::scoped_lock lock(subs_mutex_);
00860 return subscriptions_.size();
00861 }
00862
00863 size_t TopicManager::getNumPublishers(const std::string &topic)
00864 {
00865 boost::mutex::scoped_lock lock(subs_mutex_);
00866
00867 if (isShuttingDown())
00868 {
00869 return 0;
00870 }
00871
00872 for (L_Subscription::const_iterator t = subscriptions_.begin();
00873 t != subscriptions_.end(); ++t)
00874 {
00875 if (!(*t)->isDropped() && (*t)->getName() == topic)
00876 {
00877 return (*t)->getNumPublishers();
00878 }
00879 }
00880
00881 return 0;
00882 }
00883
00884 void TopicManager::getBusStats(XmlRpcValue &stats)
00885 {
00886 XmlRpcValue publish_stats, subscribe_stats, service_stats;
00887
00888 publish_stats.setSize(0);
00889 subscribe_stats.setSize(0);
00890 service_stats.setSize(0);
00891
00892 uint32_t pidx = 0;
00893 {
00894 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00895 for (V_Publication::iterator t = advertised_topics_.begin();
00896 t != advertised_topics_.end(); ++t)
00897 {
00898 publish_stats[pidx++] = (*t)->getStats();
00899 }
00900 }
00901
00902 {
00903 uint32_t sidx = 0;
00904
00905 boost::mutex::scoped_lock lock(subs_mutex_);
00906 for (L_Subscription::iterator t = subscriptions_.begin(); t != subscriptions_.end(); ++t)
00907 {
00908 subscribe_stats[sidx++] = (*t)->getStats();
00909 }
00910 }
00911
00912 stats[0] = publish_stats;
00913 stats[1] = subscribe_stats;
00914 stats[2] = service_stats;
00915 }
00916
00917 void TopicManager::getBusInfo(XmlRpcValue &info)
00918 {
00919
00920 info.setSize(0);
00921
00922 {
00923 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00924
00925 for (V_Publication::iterator t = advertised_topics_.begin();
00926 t != advertised_topics_.end(); ++t)
00927 {
00928 (*t)->getInfo(info);
00929 }
00930 }
00931
00932 {
00933 boost::mutex::scoped_lock lock(subs_mutex_);
00934
00935 for (L_Subscription::iterator t = subscriptions_.begin(); t != subscriptions_.end(); ++t)
00936 {
00937 (*t)->getInfo(info);
00938 }
00939 }
00940 }
00941
00942 void TopicManager::getSubscriptions(XmlRpcValue &subs)
00943 {
00944
00945 subs.setSize(0);
00946
00947 {
00948 boost::mutex::scoped_lock lock(subs_mutex_);
00949
00950 uint32_t sidx = 0;
00951
00952 for (L_Subscription::iterator t = subscriptions_.begin(); t != subscriptions_.end(); ++t)
00953 {
00954 XmlRpcValue sub;
00955 sub[0] = (*t)->getName();
00956 sub[1] = (*t)->datatype();
00957 subs[sidx++] = sub;
00958 }
00959 }
00960 }
00961
00962 void TopicManager::getPublications(XmlRpcValue &pubs)
00963 {
00964
00965 pubs.setSize(0);
00966
00967 {
00968 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00969
00970 uint32_t sidx = 0;
00971
00972 for (V_Publication::iterator t = advertised_topics_.begin();
00973 t != advertised_topics_.end(); ++t)
00974 {
00975 XmlRpcValue pub;
00976 pub[0] = (*t)->getName();
00977 pub[1] = (*t)->getDataType();
00978 pubs[sidx++] = pub;
00979 }
00980
00981 }
00982 }
00983
00984 extern ROSOutAppenderPtr g_rosout_appender;
00985
00986 void TopicManager::pubUpdateCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result)
00987 {
00988 std::vector<std::string> pubs;
00989 for (int idx = 0; idx < params[2].size(); idx++)
00990 {
00991 pubs.push_back(params[2][idx]);
00992 }
00993 if (pubUpdate(params[1], pubs))
00994 {
00995 result = xmlrpc::responseInt(1, "", 0);
00996 }
00997 else
00998 {
00999 std::string last_error = "Unknown Error";
01000 if ( g_rosout_appender != 0 )
01001 {
01002 last_error = g_rosout_appender->getLastError();
01003 }
01004
01005 result = xmlrpc::responseInt(0, last_error, 0);
01006 }
01007 }
01008
01009 void TopicManager::requestTopicCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result)
01010 {
01011 if (!requestTopic(params[1], params[2], result))
01012 {
01013 std::string last_error = "Unknown Error";
01014 if ( g_rosout_appender != 0 )
01015 {
01016 last_error = g_rosout_appender->getLastError();
01017 }
01018
01019 result = xmlrpc::responseInt(0, last_error, 0);
01020 }
01021 }
01022
01023 void TopicManager::getBusStatsCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result)
01024 {
01025 result[0] = 1;
01026 result[1] = std::string("");
01027 XmlRpcValue response;
01028 getBusStats(result);
01029 result[2] = response;
01030 }
01031
01032 void TopicManager::getBusInfoCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result)
01033 {
01034 result[0] = 1;
01035 result[1] = std::string("");
01036 XmlRpcValue response;
01037 getBusInfo(response);
01038 result[2] = response;
01039 }
01040
01041 void TopicManager::getSubscriptionsCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result)
01042 {
01043 result[0] = 1;
01044 result[1] = std::string("subscriptions");
01045 XmlRpcValue response;
01046 getSubscriptions(response);
01047 result[2] = response;
01048 }
01049
01050 void TopicManager::getPublicationsCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result)
01051 {
01052 result[0] = 1;
01053 result[1] = std::string("publications");
01054 XmlRpcValue response;
01055 getPublications(response);
01056 result[2] = response;
01057 }
01058
01059 }