00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include "ros/topic_manager.h"
00029 #include "ros/xmlrpc_manager.h"
00030 #include "ros/connection_manager.h"
00031 #include "ros/poll_manager.h"
00032 #include "ros/publication.h"
00033 #include "ros/subscription.h"
00034 #include "ros/this_node.h"
00035 #include "ros/network.h"
00036 #include "ros/master.h"
00037 #include "ros/transport/transport_tcp.h"
00038 #include "ros/transport/transport_udp.h"
00039 #include "ros/rosout_appender.h"
00040 #include "ros/init.h"
00041 #include "ros/file_log.h"
00042 #include "ros/subscribe_options.h"
00043
00044 #include "XmlRpc.h"
00045
00046 #include <ros/console.h>
00047
00048 using namespace XmlRpc;
00049 using namespace std;
00050
00052
00053 namespace ros
00054 {
00055
00056 TopicManagerPtr g_topic_manager;
00057 boost::mutex g_topic_manager_mutex;
00058 const TopicManagerPtr& TopicManager::instance()
00059 {
00060 if (!g_topic_manager)
00061 {
00062 boost::mutex::scoped_lock lock(g_topic_manager_mutex);
00063 if (!g_topic_manager)
00064 {
00065 g_topic_manager.reset(new TopicManager);
00066 }
00067 }
00068
00069 return g_topic_manager;
00070 }
00071
00072 TopicManager::TopicManager()
00073 : shutting_down_(false)
00074 {
00075 }
00076
00077 TopicManager::~TopicManager()
00078 {
00079 shutdown();
00080 }
00081
00082 void TopicManager::start()
00083 {
00084 boost::mutex::scoped_lock shutdown_lock(shutting_down_mutex_);
00085 shutting_down_ = false;
00086
00087 poll_manager_ = PollManager::instance();
00088 connection_manager_ = ConnectionManager::instance();
00089 xmlrpc_manager_ = XMLRPCManager::instance();
00090
00091 xmlrpc_manager_->bind("publisherUpdate", boost::bind(&TopicManager::pubUpdateCallback, this, _1, _2));
00092 xmlrpc_manager_->bind("requestTopic", boost::bind(&TopicManager::requestTopicCallback, this, _1, _2));
00093 xmlrpc_manager_->bind("getBusStats", boost::bind(&TopicManager::getBusStatsCallback, this, _1, _2));
00094 xmlrpc_manager_->bind("getBusInfo", boost::bind(&TopicManager::getBusInfoCallback, this, _1, _2));
00095 xmlrpc_manager_->bind("getSubscriptions", boost::bind(&TopicManager::getSubscriptionsCallback, this, _1, _2));
00096 xmlrpc_manager_->bind("getPublications", boost::bind(&TopicManager::getPublicationsCallback, this, _1, _2));
00097
00098 poll_manager_->addPollThreadListener(boost::bind(&TopicManager::processPublishQueues, this));
00099 }
00100
00101 void TopicManager::shutdown()
00102 {
00103 boost::mutex::scoped_lock shutdown_lock(shutting_down_mutex_);
00104 if (shutting_down_)
00105 {
00106 return;
00107 }
00108
00109 {
00110 boost::recursive_mutex::scoped_lock lock1(advertised_topics_mutex_);
00111 boost::mutex::scoped_lock lock2(subs_mutex_);
00112 shutting_down_ = true;
00113 }
00114
00115 xmlrpc_manager_->unbind("publisherUpdate");
00116 xmlrpc_manager_->unbind("requestTopic");
00117 xmlrpc_manager_->unbind("getBusStats");
00118 xmlrpc_manager_->unbind("getBusInfo");
00119 xmlrpc_manager_->unbind("getSubscriptions");
00120 xmlrpc_manager_->unbind("getPublications");
00121
00122 ROSCPP_LOG_DEBUG("Shutting down topics...");
00123 ROSCPP_LOG_DEBUG(" shutting down publishers");
00124 {
00125 boost::recursive_mutex::scoped_lock adv_lock(advertised_topics_mutex_);
00126
00127 for (V_Publication::iterator i = advertised_topics_.begin();
00128 i != advertised_topics_.end(); ++i)
00129 {
00130 if(!(*i)->isDropped())
00131 {
00132 unregisterPublisher((*i)->getName());
00133 }
00134 (*i)->drop();
00135 }
00136 advertised_topics_.clear();
00137 }
00138
00139
00140 ROSCPP_LOG_DEBUG(" shutting down subscribers");
00141 {
00142 boost::mutex::scoped_lock subs_lock(subs_mutex_);
00143
00144 for (L_Subscription::iterator s = subscriptions_.begin(); s != subscriptions_.end(); ++s)
00145 {
00146
00147 unregisterSubscriber((*s)->getName());
00148
00149 (*s)->shutdown();
00150 }
00151 subscriptions_.clear();
00152 }
00153 }
00154
00155 void TopicManager::processPublishQueues()
00156 {
00157 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00158
00159 V_Publication::iterator it = advertised_topics_.begin();
00160 V_Publication::iterator end = advertised_topics_.end();
00161 for (; it != end; ++it)
00162 {
00163 const PublicationPtr& pub = *it;
00164 pub->processPublishQueue();
00165 }
00166 }
00167
00168 void TopicManager::getAdvertisedTopics(V_string& topics)
00169 {
00170 boost::mutex::scoped_lock lock(advertised_topic_names_mutex_);
00171
00172 topics.resize(advertised_topic_names_.size());
00173 std::copy(advertised_topic_names_.begin(),
00174 advertised_topic_names_.end(),
00175 topics.begin());
00176 }
00177
00178 void TopicManager::getSubscribedTopics(V_string& topics)
00179 {
00180 boost::mutex::scoped_lock lock(subs_mutex_);
00181
00182 topics.reserve(subscriptions_.size());
00183 L_Subscription::const_iterator it = subscriptions_.begin();
00184 L_Subscription::const_iterator end = subscriptions_.end();
00185 for (; it != end; ++it)
00186 {
00187 const SubscriptionPtr& sub = *it;
00188 topics.push_back(sub->getName());
00189 }
00190 }
00191
00192 PublicationPtr TopicManager::lookupPublication(const std::string& topic)
00193 {
00194 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00195
00196 return lookupPublicationWithoutLock(topic);
00197 }
00198
00199 bool md5sumsMatch(const std::string& lhs, const std::string& rhs)
00200 {
00201 return lhs == "*" || rhs == "*" || lhs == rhs;
00202 }
00203
00204 bool TopicManager::addSubCallback(const SubscribeOptions& ops)
00205 {
00206
00207 bool found = false;
00208 bool found_topic = false;
00209
00210 SubscriptionPtr sub;
00211
00212 {
00213 if (isShuttingDown())
00214 {
00215 return false;
00216 }
00217
00218 for (L_Subscription::iterator s = subscriptions_.begin();
00219 s != subscriptions_.end() && !found; ++s)
00220 {
00221 sub = *s;
00222 if (!sub->isDropped() && sub->getName() == ops.topic)
00223 {
00224 found_topic = true;
00225 if (md5sumsMatch(ops.md5sum, sub->md5sum()))
00226 {
00227 found = true;
00228 }
00229 break;
00230 }
00231 }
00232 }
00233
00234 if (found_topic && !found)
00235 {
00236 std::stringstream ss;
00237 ss << "Tried to subscribe to a topic with the same name but different md5sum as a topic that was already subscribed [" << ops.datatype << "/" << ops.md5sum << " vs. " << sub->datatype() << "/" << sub->md5sum() << "]";
00238 throw ConflictingSubscriptionException(ss.str());
00239 }
00240 else if (found)
00241 {
00242 if (!sub->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object, ops.allow_concurrent_callbacks))
00243 {
00244 return false;
00245 }
00246 }
00247
00248 return found;
00249 }
00250
00251
00252 bool TopicManager::subscribe(const SubscribeOptions& ops)
00253 {
00254 boost::mutex::scoped_lock lock(subs_mutex_);
00255
00256 if (addSubCallback(ops))
00257 {
00258 return true;
00259 }
00260
00261 if (isShuttingDown())
00262 {
00263 return false;
00264 }
00265
00266 if (ops.md5sum.empty())
00267 {
00268 throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] with an empty md5sum");
00269 }
00270
00271 if (ops.datatype.empty())
00272 {
00273 throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] with an empty datatype");
00274 }
00275
00276 if (!ops.helper)
00277 {
00278 throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] without a callback");
00279 }
00280
00281 const std::string& md5sum = ops.md5sum;
00282 std::string datatype = ops.datatype;
00283
00284 SubscriptionPtr s(new Subscription(ops.topic, md5sum, datatype, ops.transport_hints));
00285 s->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object, ops.allow_concurrent_callbacks);
00286
00287 if (!registerSubscriber(s, ops.datatype))
00288 {
00289 ROS_WARN("couldn't register subscriber on topic [%s]", ops.topic.c_str());
00290 s->shutdown();
00291 return false;
00292 }
00293
00294 subscriptions_.push_back(s);
00295
00296 return true;
00297 }
00298
00299 bool TopicManager::advertise(const AdvertiseOptions& ops, const SubscriberCallbacksPtr& callbacks)
00300 {
00301 if (ops.datatype == "*")
00302 {
00303 std::stringstream ss;
00304 ss << "Advertising with * as the datatype is not allowed. Topic [" << ops.topic << "]";
00305 throw InvalidParameterException(ss.str());
00306 }
00307
00308 if (ops.md5sum == "*")
00309 {
00310 std::stringstream ss;
00311 ss << "Advertising with * as the md5sum is not allowed. Topic [" << ops.topic << "]";
00312 throw InvalidParameterException(ss.str());
00313 }
00314
00315 if (ops.md5sum.empty())
00316 {
00317 throw InvalidParameterException("Advertising on topic [" + ops.topic + "] with an empty md5sum");
00318 }
00319
00320 if (ops.datatype.empty())
00321 {
00322 throw InvalidParameterException("Advertising on topic [" + ops.topic + "] with an empty datatype");
00323 }
00324
00325 if (ops.message_definition.empty())
00326 {
00327 ROS_WARN("Advertising on topic [%s] with an empty message definition. Some tools (e.g. rosbag) may not work correctly.", ops.topic.c_str());
00328 }
00329
00330 PublicationPtr pub;
00331
00332 {
00333 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00334
00335 if (isShuttingDown())
00336 {
00337 return false;
00338 }
00339
00340 pub = lookupPublicationWithoutLock(ops.topic);
00341 if (pub && pub->getNumCallbacks() == 0)
00342 {
00343 pub.reset();
00344 }
00345
00346 if (pub)
00347 {
00348 if (pub->getMD5Sum() != ops.md5sum)
00349 {
00350 ROS_ERROR("Tried to advertise on topic [%s] with md5sum [%s] and datatype [%s], but the topic is already advertised as md5sum [%s] and datatype [%s]",
00351 ops.topic.c_str(), ops.md5sum.c_str(), ops.datatype.c_str(), pub->getMD5Sum().c_str(), pub->getDataType().c_str());
00352 return false;
00353 }
00354
00355 pub->addCallbacks(callbacks);
00356
00357 return true;
00358 }
00359
00360 pub = PublicationPtr(new Publication(ops.topic, ops.datatype, ops.md5sum, ops.message_definition, ops.queue_size, ops.latch, ops.has_header));
00361 pub->addCallbacks(callbacks);
00362 advertised_topics_.push_back(pub);
00363 }
00364
00365
00366 {
00367 boost::mutex::scoped_lock lock(advertised_topic_names_mutex_);
00368 advertised_topic_names_.push_back(ops.topic);
00369 }
00370
00371
00372
00373
00374
00375
00376 bool found = false;
00377 SubscriptionPtr sub;
00378 {
00379 boost::mutex::scoped_lock lock(subs_mutex_);
00380
00381 for (L_Subscription::iterator s = subscriptions_.begin();
00382 s != subscriptions_.end() && !found; ++s)
00383 {
00384 if ((*s)->getName() == ops.topic && md5sumsMatch((*s)->md5sum(), ops.md5sum) && !(*s)->isDropped())
00385 {
00386 found = true;
00387 sub = *s;
00388 break;
00389 }
00390 }
00391 }
00392
00393 if(found)
00394 {
00395 sub->addLocalConnection(pub);
00396 }
00397
00398 XmlRpcValue args, result, payload;
00399 args[0] = this_node::getName();
00400 args[1] = ops.topic;
00401 args[2] = ops.datatype;
00402 args[3] = xmlrpc_manager_->getServerURI();
00403 master::execute("registerPublisher", args, result, payload, true);
00404
00405 return true;
00406 }
00407
00408 bool TopicManager::unadvertise(const std::string &topic, const SubscriberCallbacksPtr& callbacks)
00409 {
00410 PublicationPtr pub;
00411 V_Publication::iterator i;
00412 {
00413 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00414
00415 if (isShuttingDown())
00416 {
00417 return false;
00418 }
00419
00420 for (i = advertised_topics_.begin();
00421 i != advertised_topics_.end(); ++i)
00422 {
00423 if(((*i)->getName() == topic) && (!(*i)->isDropped()))
00424 {
00425 pub = *i;
00426 break;
00427 }
00428 }
00429 }
00430
00431 if (!pub)
00432 {
00433 return false;
00434 }
00435
00436 pub->removeCallbacks(callbacks);
00437
00438 {
00439 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00440 if (pub->getNumCallbacks() == 0)
00441 {
00442 unregisterPublisher(pub->getName());
00443 pub->drop();
00444
00445 advertised_topics_.erase(i);
00446
00447 {
00448 boost::mutex::scoped_lock lock(advertised_topic_names_mutex_);
00449 advertised_topic_names_.remove(pub->getName());
00450 }
00451 }
00452 }
00453
00454 return true;
00455 }
00456
00457 bool TopicManager::unregisterPublisher(const std::string& topic)
00458 {
00459 XmlRpcValue args, result, payload;
00460 args[0] = this_node::getName();
00461 args[1] = topic;
00462 args[2] = xmlrpc_manager_->getServerURI();
00463 master::execute("unregisterPublisher", args, result, payload, false);
00464
00465 return true;
00466 }
00467
00468 bool TopicManager::isTopicAdvertised(const string &topic)
00469 {
00470 for (V_Publication::iterator t = advertised_topics_.begin(); t != advertised_topics_.end(); ++t)
00471 {
00472 if (((*t)->getName() == topic) && (!(*t)->isDropped()))
00473 {
00474 return true;
00475 }
00476 }
00477
00478 return false;
00479 }
00480
00481 bool TopicManager::registerSubscriber(const SubscriptionPtr& s, const string &datatype)
00482 {
00483 XmlRpcValue args, result, payload;
00484 args[0] = this_node::getName();
00485 args[1] = s->getName();
00486 args[2] = datatype;
00487 args[3] = xmlrpc_manager_->getServerURI();
00488
00489 if (!master::execute("registerSubscriber", args, result, payload, true))
00490 {
00491 return false;
00492 }
00493
00494 vector<string> pub_uris;
00495 for (int i = 0; i < payload.size(); i++)
00496 {
00497 if (payload[i] != xmlrpc_manager_->getServerURI())
00498 {
00499 pub_uris.push_back(string(payload[i]));
00500 }
00501 }
00502
00503 bool self_subscribed = false;
00504 PublicationPtr pub;
00505 const std::string& sub_md5sum = s->md5sum();
00506
00507 {
00508 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00509 V_Publication::const_iterator it = advertised_topics_.begin();
00510 V_Publication::const_iterator end = advertised_topics_.end();
00511 for (; it != end; ++it)
00512 {
00513 pub = *it;
00514 const std::string& pub_md5sum = pub->getMD5Sum();
00515
00516 if (pub->getName() == s->getName() && !pub->isDropped())
00517 {
00518 if (!md5sumsMatch(pub_md5sum, sub_md5sum))
00519 {
00520 ROS_ERROR("md5sum mismatch making local subscription to topic %s.",
00521 s->getName().c_str());
00522 ROS_ERROR("Subscriber expects type %s, md5sum %s",
00523 s->datatype().c_str(), s->md5sum().c_str());
00524 ROS_ERROR("Publisher provides type %s, md5sum %s",
00525 pub->getDataType().c_str(), pub->getMD5Sum().c_str());
00526 return false;
00527 }
00528
00529 self_subscribed = true;
00530 break;
00531 }
00532 }
00533 }
00534
00535 s->pubUpdate(pub_uris);
00536 if (self_subscribed)
00537 {
00538 s->addLocalConnection(pub);
00539 }
00540
00541 return true;
00542 }
00543
00544 bool TopicManager::unregisterSubscriber(const string &topic)
00545 {
00546 XmlRpcValue args, result, payload;
00547 args[0] = this_node::getName();
00548 args[1] = topic;
00549 args[2] = xmlrpc_manager_->getServerURI();
00550
00551 master::execute("unregisterSubscriber", args, result, payload, false);
00552
00553 return true;
00554 }
00555
00556 bool TopicManager::pubUpdate(const string &topic, const vector<string> &pubs)
00557 {
00558 SubscriptionPtr sub;
00559 {
00560 boost::mutex::scoped_lock lock(subs_mutex_);
00561
00562 if (isShuttingDown())
00563 {
00564 return false;
00565 }
00566
00567 ROS_DEBUG("Received update for topic [%s] (%d publishers)", topic.c_str(), (int)pubs.size());
00568
00569 for (L_Subscription::const_iterator s = subscriptions_.begin();
00570 s != subscriptions_.end(); ++s)
00571 {
00572 if ((*s)->getName() != topic || (*s)->isDropped())
00573 continue;
00574
00575 sub = *s;
00576 break;
00577 }
00578
00579 }
00580
00581 if (sub)
00582 {
00583 return sub->pubUpdate(pubs);
00584 }
00585 else
00586 {
00587 ROSCPP_LOG_DEBUG("got a request for updating publishers of topic %s, but I " \
00588 "don't have any subscribers to that topic.", topic.c_str());
00589 }
00590
00591 return false;
00592 }
00593
00594 bool TopicManager::requestTopic(const string &topic,
00595 XmlRpcValue &protos,
00596 XmlRpcValue &ret)
00597 {
00598 for (int proto_idx = 0; proto_idx < protos.size(); proto_idx++)
00599 {
00600 XmlRpcValue proto = protos[proto_idx];
00601 if (proto.getType() != XmlRpcValue::TypeArray)
00602 {
00603 ROSCPP_LOG_DEBUG( "requestTopic protocol list was not a list of lists");
00604 return false;
00605 }
00606
00607 if (proto[0].getType() != XmlRpcValue::TypeString)
00608 {
00609 ROSCPP_LOG_DEBUG( "requestTopic received a protocol list in which a sublist " \
00610 "did not start with a string");
00611 return false;
00612 }
00613
00614 string proto_name = proto[0];
00615 if (proto_name == string("TCPROS"))
00616 {
00617 XmlRpcValue tcpros_params;
00618 tcpros_params[0] = string("TCPROS");
00619 tcpros_params[1] = network::getHost();
00620 tcpros_params[2] = int(connection_manager_->getTCPPort());
00621 ret[0] = int(1);
00622 ret[1] = string();
00623 ret[2] = tcpros_params;
00624 return true;
00625 }
00626 else if (proto_name == string("UDPROS"))
00627 {
00628 if (proto.size() != 5 ||
00629 proto[1].getType() != XmlRpcValue::TypeBase64 ||
00630 proto[2].getType() != XmlRpcValue::TypeString ||
00631 proto[3].getType() != XmlRpcValue::TypeInt ||
00632 proto[4].getType() != XmlRpcValue::TypeInt)
00633 {
00634 ROSCPP_LOG_DEBUG("Invalid protocol parameters for UDPROS");
00635 return false;
00636 }
00637 std::vector<char> header_bytes = proto[1];
00638 boost::shared_array<uint8_t> buffer = boost::shared_array<uint8_t>(new uint8_t[header_bytes.size()]);
00639 memcpy(buffer.get(), &header_bytes[0], header_bytes.size());
00640 Header h;
00641 string err;
00642 if (!h.parse(buffer, header_bytes.size(), err))
00643 {
00644 ROSCPP_LOG_DEBUG("Unable to parse UDPROS connection header: %s", err.c_str());
00645 return false;
00646 }
00647
00648 PublicationPtr pub_ptr = lookupPublication(topic);
00649 if(!pub_ptr)
00650 {
00651 ROSCPP_LOG_DEBUG("Unable to find advertised topic %s for UDPROS connection", topic.c_str());
00652 return false;
00653 }
00654
00655 std::string host = proto[2];
00656 int port = proto[3];
00657
00658 M_string m;
00659 std::string error_msg;
00660 if (!pub_ptr->validateHeader(h, error_msg))
00661 {
00662 ROSCPP_LOG_DEBUG("Error validating header from [%s:%d] for topic [%s]: %s", host.c_str(), port, topic.c_str(), error_msg.c_str());
00663 return false;
00664 }
00665
00666 int max_datagram_size = proto[4];
00667 int conn_id = connection_manager_->getNewConnectionID();
00668 TransportUDPPtr transport = connection_manager_->getUDPServerTransport()->createOutgoing(host, port, conn_id, max_datagram_size);
00669 connection_manager_->udprosIncomingConnection(transport, h);
00670
00671 XmlRpcValue udpros_params;
00672 udpros_params[0] = string("UDPROS");
00673 udpros_params[1] = network::getHost();
00674 udpros_params[2] = connection_manager_->getUDPServerTransport()->getServerPort();
00675 udpros_params[3] = conn_id;
00676 udpros_params[4] = max_datagram_size;
00677 m["topic"] = topic;
00678 m["md5sum"] = pub_ptr->getMD5Sum();
00679 m["type"] = pub_ptr->getDataType();
00680 m["callerid"] = this_node::getName();
00681 m["message_definition"] = pub_ptr->getMessageDefinition();
00682 boost::shared_array<uint8_t> msg_def_buffer;
00683 uint32_t len;
00684 Header::write(m, msg_def_buffer, len);
00685 XmlRpcValue v(msg_def_buffer.get(), len);
00686 udpros_params[5] = v;
00687 ret[0] = int(1);
00688 ret[1] = string();
00689 ret[2] = udpros_params;
00690 return true;
00691 }
00692 else
00693 {
00694 ROSCPP_LOG_DEBUG( "an unsupported protocol was offered: [%s]",
00695 proto_name.c_str());
00696 }
00697 }
00698
00699 ROSCPP_LOG_DEBUG( "Currently, roscpp only supports TCPROS. The caller to " \
00700 "requestTopic did not support TCPROS, so there are no " \
00701 "protocols in common.");
00702 return false;
00703 }
00704
00705 void TopicManager::publish(const std::string& topic, const boost::function<SerializedMessage(void)>& serfunc, SerializedMessage& m)
00706 {
00707 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00708
00709 if (isShuttingDown())
00710 {
00711 return;
00712 }
00713
00714 PublicationPtr p = lookupPublicationWithoutLock(topic);
00715 if (p->hasSubscribers() || p->isLatching())
00716 {
00717 ROS_DEBUG_NAMED("superdebug", "Publishing message on topic [%s] with sequence number [%d]", p->getName().c_str(), p->getSequence());
00718
00719
00720
00721 bool nocopy = false;
00722 bool serialize = false;
00723
00724
00725 if (m.type_info && m.message)
00726 {
00727 p->getPublishTypes(serialize, nocopy, *m.type_info);
00728 }
00729 else
00730 {
00731 serialize = true;
00732 }
00733
00734 if (!nocopy)
00735 {
00736 m.message.reset();
00737 m.type_info = 0;
00738 }
00739
00740 if (serialize || p->isLatching())
00741 {
00742 SerializedMessage m2 = serfunc();
00743 m.buf = m2.buf;
00744 m.num_bytes = m2.num_bytes;
00745 m.message_start = m2.message_start;
00746 }
00747
00748 p->publish(m);
00749
00750
00751
00752 if (serialize)
00753 {
00754 poll_manager_->getPollSet().signal();
00755 }
00756 }
00757 else
00758 {
00759 p->incrementSequence();
00760 }
00761 }
00762
00763 void TopicManager::incrementSequence(const std::string& topic)
00764 {
00765 PublicationPtr pub = lookupPublication(topic);
00766 if (pub)
00767 {
00768 pub->incrementSequence();
00769 }
00770 }
00771
00772 bool TopicManager::isLatched(const std::string& topic)
00773 {
00774 PublicationPtr pub = lookupPublication(topic);
00775 if (pub)
00776 {
00777 return pub->isLatched();
00778 }
00779
00780 return false;
00781 }
00782
00783 PublicationPtr TopicManager::lookupPublicationWithoutLock(const string &topic)
00784 {
00785 PublicationPtr t;
00786 for (V_Publication::iterator i = advertised_topics_.begin();
00787 !t && i != advertised_topics_.end(); ++i)
00788 {
00789 if (((*i)->getName() == topic) && (!(*i)->isDropped()))
00790 {
00791 t = *i;
00792 break;
00793 }
00794 }
00795
00796 return t;
00797 }
00798
00799 bool TopicManager::unsubscribe(const std::string &topic, const SubscriptionCallbackHelperPtr& helper)
00800 {
00801 SubscriptionPtr sub;
00802
00803 {
00804 boost::mutex::scoped_lock lock(subs_mutex_);
00805
00806 if (isShuttingDown())
00807 {
00808 return false;
00809 }
00810
00811 L_Subscription::iterator it;
00812 for (it = subscriptions_.begin();
00813 it != subscriptions_.end(); ++it)
00814 {
00815 if ((*it)->getName() == topic)
00816 {
00817 sub = *it;
00818 break;
00819 }
00820 }
00821 }
00822
00823 if (!sub)
00824 {
00825 return false;
00826 }
00827
00828 sub->removeCallback(helper);
00829
00830 if (sub->getNumCallbacks() == 0)
00831 {
00832
00833 {
00834 boost::mutex::scoped_lock lock(subs_mutex_);
00835
00836 L_Subscription::iterator it;
00837 for (it = subscriptions_.begin();
00838 it != subscriptions_.end(); ++it)
00839 {
00840 if ((*it)->getName() == topic)
00841 {
00842 subscriptions_.erase(it);
00843 break;
00844 }
00845 }
00846
00847 if (!unregisterSubscriber(topic))
00848 {
00849 ROSCPP_LOG_DEBUG("Couldn't unregister subscriber for topic [%s]", topic.c_str());
00850 }
00851 }
00852
00853 sub->shutdown();
00854 return true;
00855 }
00856
00857 return true;
00858 }
00859
00860 size_t TopicManager::getNumSubscribers(const std::string &topic)
00861 {
00862 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00863
00864 if (isShuttingDown())
00865 {
00866 return 0;
00867 }
00868
00869 PublicationPtr p = lookupPublicationWithoutLock(topic);
00870 if (p)
00871 {
00872 return p->getNumSubscribers();
00873 }
00874
00875 return 0;
00876 }
00877
00878 size_t TopicManager::getNumSubscriptions()
00879 {
00880 boost::mutex::scoped_lock lock(subs_mutex_);
00881 return subscriptions_.size();
00882 }
00883
00884 size_t TopicManager::getNumPublishers(const std::string &topic)
00885 {
00886 boost::mutex::scoped_lock lock(subs_mutex_);
00887
00888 if (isShuttingDown())
00889 {
00890 return 0;
00891 }
00892
00893 for (L_Subscription::const_iterator t = subscriptions_.begin();
00894 t != subscriptions_.end(); ++t)
00895 {
00896 if (!(*t)->isDropped() && (*t)->getName() == topic)
00897 {
00898 return (*t)->getNumPublishers();
00899 }
00900 }
00901
00902 return 0;
00903 }
00904
00905 void TopicManager::getBusStats(XmlRpcValue &stats)
00906 {
00907 XmlRpcValue publish_stats, subscribe_stats, service_stats;
00908
00909 publish_stats.setSize(0);
00910 subscribe_stats.setSize(0);
00911 service_stats.setSize(0);
00912
00913 uint32_t pidx = 0;
00914 {
00915 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00916 for (V_Publication::iterator t = advertised_topics_.begin();
00917 t != advertised_topics_.end(); ++t)
00918 {
00919 publish_stats[pidx++] = (*t)->getStats();
00920 }
00921 }
00922
00923 {
00924 uint32_t sidx = 0;
00925
00926 boost::mutex::scoped_lock lock(subs_mutex_);
00927 for (L_Subscription::iterator t = subscriptions_.begin(); t != subscriptions_.end(); ++t)
00928 {
00929 subscribe_stats[sidx++] = (*t)->getStats();
00930 }
00931 }
00932
00933 stats[0] = publish_stats;
00934 stats[1] = subscribe_stats;
00935 stats[2] = service_stats;
00936 }
00937
00938 void TopicManager::getBusInfo(XmlRpcValue &info)
00939 {
00940
00941 info.setSize(0);
00942
00943 {
00944 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00945
00946 for (V_Publication::iterator t = advertised_topics_.begin();
00947 t != advertised_topics_.end(); ++t)
00948 {
00949 (*t)->getInfo(info);
00950 }
00951 }
00952
00953 {
00954 boost::mutex::scoped_lock lock(subs_mutex_);
00955
00956 for (L_Subscription::iterator t = subscriptions_.begin(); t != subscriptions_.end(); ++t)
00957 {
00958 (*t)->getInfo(info);
00959 }
00960 }
00961 }
00962
00963 void TopicManager::getSubscriptions(XmlRpcValue &subs)
00964 {
00965
00966 subs.setSize(0);
00967
00968 {
00969 boost::mutex::scoped_lock lock(subs_mutex_);
00970
00971 uint32_t sidx = 0;
00972
00973 for (L_Subscription::iterator t = subscriptions_.begin(); t != subscriptions_.end(); ++t)
00974 {
00975 XmlRpcValue sub;
00976 sub[0] = (*t)->getName();
00977 sub[1] = (*t)->datatype();
00978 subs[sidx++] = sub;
00979 }
00980 }
00981 }
00982
00983 void TopicManager::getPublications(XmlRpcValue &pubs)
00984 {
00985
00986 pubs.setSize(0);
00987
00988 {
00989 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00990
00991 uint32_t sidx = 0;
00992
00993 for (V_Publication::iterator t = advertised_topics_.begin();
00994 t != advertised_topics_.end(); ++t)
00995 {
00996 XmlRpcValue pub;
00997 pub[0] = (*t)->getName();
00998 pub[1] = (*t)->getDataType();
00999 pubs[sidx++] = pub;
01000 }
01001
01002 }
01003 }
01004
01005 extern std::string console::g_last_error_message;
01006
01007 void TopicManager::pubUpdateCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result)
01008 {
01009 std::vector<std::string> pubs;
01010 for (int idx = 0; idx < params[2].size(); idx++)
01011 {
01012 pubs.push_back(params[2][idx]);
01013 }
01014 if (pubUpdate(params[1], pubs))
01015 {
01016 result = xmlrpc::responseInt(1, "", 0);
01017 }
01018 else
01019 {
01020 result = xmlrpc::responseInt(0, console::g_last_error_message, 0);
01021 }
01022 }
01023
01024 void TopicManager::requestTopicCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result)
01025 {
01026 if (!requestTopic(params[1], params[2], result))
01027 {
01028 result = xmlrpc::responseInt(0, console::g_last_error_message, 0);
01029 }
01030 }
01031
01032 void TopicManager::getBusStatsCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result)
01033 {
01034 result[0] = 1;
01035 result[1] = std::string("");
01036 XmlRpcValue response;
01037 getBusStats(result);
01038 result[2] = response;
01039 }
01040
01041 void TopicManager::getBusInfoCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result)
01042 {
01043 result[0] = 1;
01044 result[1] = std::string("");
01045 XmlRpcValue response;
01046 getBusInfo(response);
01047 result[2] = response;
01048 }
01049
01050 void TopicManager::getSubscriptionsCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result)
01051 {
01052 result[0] = 1;
01053 result[1] = std::string("subscriptions");
01054 XmlRpcValue response;
01055 getSubscriptions(response);
01056 result[2] = response;
01057 }
01058
01059 void TopicManager::getPublicationsCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result)
01060 {
01061 result[0] = 1;
01062 result[1] = std::string("publications");
01063 XmlRpcValue response;
01064 getPublications(response);
01065 result[2] = response;
01066 }
01067
01068 }