$search
00001 /* 00002 * Copyright (C) 2009, Willow Garage, Inc. 00003 * 00004 * Redistribution and use in source and binary forms, with or without 00005 * modification, are permitted provided that the following conditions are met: 00006 * * Redistributions of source code must retain the above copyright notice, 00007 * this list of conditions and the following disclaimer. 00008 * * Redistributions in binary form must reproduce the above copyright 00009 * notice, this list of conditions and the following disclaimer in the 00010 * documentation and/or other materials provided with the distribution. 00011 * * Neither the names of Willow Garage, Inc. nor the names of its 00012 * contributors may be used to endorse or promote products derived from 00013 * this software without specific prior written permission. 00014 * 00015 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 00016 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 00017 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 00018 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 00019 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 00020 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 00021 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00022 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 00023 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 00024 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00025 * POSSIBILITY OF SUCH DAMAGE. 00026 */ 00027 00028 #include "ros/topic_manager.h" 00029 #include "ros/xmlrpc_manager.h" 00030 #include "ros/connection_manager.h" 00031 #include "ros/poll_manager.h" 00032 #include "ros/publication.h" 00033 #include "ros/subscription.h" 00034 #include "ros/this_node.h" 00035 #include "ros/network.h" 00036 #include "ros/master.h" 00037 #include "ros/transport/transport_tcp.h" 00038 #include "ros/transport/transport_udp.h" 00039 #include "ros/rosout_appender.h" 00040 #include "ros/init.h" 00041 #include "ros/file_log.h" 00042 #include "ros/subscribe_options.h" 00043 00044 #include "XmlRpc.h" 00045 00046 #include <ros/console.h> 00047 00048 using namespace XmlRpc; // A battle to be fought later 00049 using namespace std; // sigh 00050 00052 00053 namespace ros 00054 { 00055 00056 TopicManagerPtr g_topic_manager; 00057 boost::mutex g_topic_manager_mutex; 00058 const TopicManagerPtr& TopicManager::instance() 00059 { 00060 if (!g_topic_manager) 00061 { 00062 boost::mutex::scoped_lock lock(g_topic_manager_mutex); 00063 if (!g_topic_manager) 00064 { 00065 g_topic_manager.reset(new TopicManager); 00066 } 00067 } 00068 00069 return g_topic_manager; 00070 } 00071 00072 TopicManager::TopicManager() 00073 : shutting_down_(false) 00074 { 00075 } 00076 00077 TopicManager::~TopicManager() 00078 { 00079 shutdown(); 00080 } 00081 00082 void TopicManager::start() 00083 { 00084 boost::mutex::scoped_lock shutdown_lock(shutting_down_mutex_); 00085 shutting_down_ = false; 00086 00087 poll_manager_ = PollManager::instance(); 00088 connection_manager_ = ConnectionManager::instance(); 00089 xmlrpc_manager_ = XMLRPCManager::instance(); 00090 00091 xmlrpc_manager_->bind("publisherUpdate", boost::bind(&TopicManager::pubUpdateCallback, this, _1, _2)); 00092 xmlrpc_manager_->bind("requestTopic", boost::bind(&TopicManager::requestTopicCallback, this, _1, _2)); 00093 xmlrpc_manager_->bind("getBusStats", boost::bind(&TopicManager::getBusStatsCallback, this, _1, _2)); 00094 xmlrpc_manager_->bind("getBusInfo", boost::bind(&TopicManager::getBusInfoCallback, this, _1, _2)); 00095 xmlrpc_manager_->bind("getSubscriptions", boost::bind(&TopicManager::getSubscriptionsCallback, this, _1, _2)); 00096 xmlrpc_manager_->bind("getPublications", boost::bind(&TopicManager::getPublicationsCallback, this, _1, _2)); 00097 00098 poll_manager_->addPollThreadListener(boost::bind(&TopicManager::processPublishQueues, this)); 00099 } 00100 00101 void TopicManager::shutdown() 00102 { 00103 boost::mutex::scoped_lock shutdown_lock(shutting_down_mutex_); 00104 if (shutting_down_) 00105 { 00106 return; 00107 } 00108 00109 { 00110 boost::recursive_mutex::scoped_lock lock1(advertised_topics_mutex_); 00111 boost::mutex::scoped_lock lock2(subs_mutex_); 00112 shutting_down_ = true; 00113 } 00114 00115 xmlrpc_manager_->unbind("publisherUpdate"); 00116 xmlrpc_manager_->unbind("requestTopic"); 00117 xmlrpc_manager_->unbind("getBusStats"); 00118 xmlrpc_manager_->unbind("getBusInfo"); 00119 xmlrpc_manager_->unbind("getSubscriptions"); 00120 xmlrpc_manager_->unbind("getPublications"); 00121 00122 ROSCPP_LOG_DEBUG("Shutting down topics..."); 00123 ROSCPP_LOG_DEBUG(" shutting down publishers"); 00124 { 00125 boost::recursive_mutex::scoped_lock adv_lock(advertised_topics_mutex_); 00126 00127 for (V_Publication::iterator i = advertised_topics_.begin(); 00128 i != advertised_topics_.end(); ++i) 00129 { 00130 if(!(*i)->isDropped()) 00131 { 00132 unregisterPublisher((*i)->getName()); 00133 } 00134 (*i)->drop(); 00135 } 00136 advertised_topics_.clear(); 00137 } 00138 00139 // unregister all of our subscriptions 00140 ROSCPP_LOG_DEBUG(" shutting down subscribers"); 00141 { 00142 boost::mutex::scoped_lock subs_lock(subs_mutex_); 00143 00144 for (L_Subscription::iterator s = subscriptions_.begin(); s != subscriptions_.end(); ++s) 00145 { 00146 // Remove us as a subscriber from the master 00147 unregisterSubscriber((*s)->getName()); 00148 // now, drop our side of the connection 00149 (*s)->shutdown(); 00150 } 00151 subscriptions_.clear(); 00152 } 00153 } 00154 00155 void TopicManager::processPublishQueues() 00156 { 00157 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_); 00158 00159 V_Publication::iterator it = advertised_topics_.begin(); 00160 V_Publication::iterator end = advertised_topics_.end(); 00161 for (; it != end; ++it) 00162 { 00163 const PublicationPtr& pub = *it; 00164 pub->processPublishQueue(); 00165 } 00166 } 00167 00168 void TopicManager::getAdvertisedTopics(V_string& topics) 00169 { 00170 boost::mutex::scoped_lock lock(advertised_topic_names_mutex_); 00171 00172 topics.resize(advertised_topic_names_.size()); 00173 std::copy(advertised_topic_names_.begin(), 00174 advertised_topic_names_.end(), 00175 topics.begin()); 00176 } 00177 00178 void TopicManager::getSubscribedTopics(V_string& topics) 00179 { 00180 boost::mutex::scoped_lock lock(subs_mutex_); 00181 00182 topics.reserve(subscriptions_.size()); 00183 L_Subscription::const_iterator it = subscriptions_.begin(); 00184 L_Subscription::const_iterator end = subscriptions_.end(); 00185 for (; it != end; ++it) 00186 { 00187 const SubscriptionPtr& sub = *it; 00188 topics.push_back(sub->getName()); 00189 } 00190 } 00191 00192 PublicationPtr TopicManager::lookupPublication(const std::string& topic) 00193 { 00194 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_); 00195 00196 return lookupPublicationWithoutLock(topic); 00197 } 00198 00199 bool md5sumsMatch(const std::string& lhs, const std::string& rhs) 00200 { 00201 return lhs == "*" || rhs == "*" || lhs == rhs; 00202 } 00203 00204 bool TopicManager::addSubCallback(const SubscribeOptions& ops) 00205 { 00206 // spin through the subscriptions and see if we find a match. if so, use it. 00207 bool found = false; 00208 bool found_topic = false; 00209 00210 SubscriptionPtr sub; 00211 00212 { 00213 if (isShuttingDown()) 00214 { 00215 return false; 00216 } 00217 00218 for (L_Subscription::iterator s = subscriptions_.begin(); 00219 s != subscriptions_.end() && !found; ++s) 00220 { 00221 sub = *s; 00222 if (!sub->isDropped() && sub->getName() == ops.topic) 00223 { 00224 found_topic = true; 00225 if (md5sumsMatch(ops.md5sum, sub->md5sum())) 00226 { 00227 found = true; 00228 } 00229 break; 00230 } 00231 } 00232 } 00233 00234 if (found_topic && !found) 00235 { 00236 std::stringstream ss; 00237 ss << "Tried to subscribe to a topic with the same name but different md5sum as a topic that was already subscribed [" << ops.datatype << "/" << ops.md5sum << " vs. " << sub->datatype() << "/" << sub->md5sum() << "]"; 00238 throw ConflictingSubscriptionException(ss.str()); 00239 } 00240 else if (found) 00241 { 00242 if (!sub->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object, ops.allow_concurrent_callbacks)) 00243 { 00244 return false; 00245 } 00246 } 00247 00248 return found; 00249 } 00250 00251 // this function has the subscription code that doesn't need to be templated. 00252 bool TopicManager::subscribe(const SubscribeOptions& ops) 00253 { 00254 boost::mutex::scoped_lock lock(subs_mutex_); 00255 00256 if (addSubCallback(ops)) 00257 { 00258 return true; 00259 } 00260 00261 if (isShuttingDown()) 00262 { 00263 return false; 00264 } 00265 00266 if (ops.md5sum.empty()) 00267 { 00268 throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] with an empty md5sum"); 00269 } 00270 00271 if (ops.datatype.empty()) 00272 { 00273 throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] with an empty datatype"); 00274 } 00275 00276 if (!ops.helper) 00277 { 00278 throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] without a callback"); 00279 } 00280 00281 const std::string& md5sum = ops.md5sum; 00282 std::string datatype = ops.datatype; 00283 00284 SubscriptionPtr s(new Subscription(ops.topic, md5sum, datatype, ops.transport_hints)); 00285 s->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object, ops.allow_concurrent_callbacks); 00286 00287 if (!registerSubscriber(s, ops.datatype)) 00288 { 00289 ROS_WARN("couldn't register subscriber on topic [%s]", ops.topic.c_str()); 00290 s->shutdown(); 00291 return false; 00292 } 00293 00294 subscriptions_.push_back(s); 00295 00296 return true; 00297 } 00298 00299 bool TopicManager::advertise(const AdvertiseOptions& ops, const SubscriberCallbacksPtr& callbacks) 00300 { 00301 if (ops.datatype == "*") 00302 { 00303 std::stringstream ss; 00304 ss << "Advertising with * as the datatype is not allowed. Topic [" << ops.topic << "]"; 00305 throw InvalidParameterException(ss.str()); 00306 } 00307 00308 if (ops.md5sum == "*") 00309 { 00310 std::stringstream ss; 00311 ss << "Advertising with * as the md5sum is not allowed. Topic [" << ops.topic << "]"; 00312 throw InvalidParameterException(ss.str()); 00313 } 00314 00315 if (ops.md5sum.empty()) 00316 { 00317 throw InvalidParameterException("Advertising on topic [" + ops.topic + "] with an empty md5sum"); 00318 } 00319 00320 if (ops.datatype.empty()) 00321 { 00322 throw InvalidParameterException("Advertising on topic [" + ops.topic + "] with an empty datatype"); 00323 } 00324 00325 if (ops.message_definition.empty()) 00326 { 00327 ROS_WARN("Advertising on topic [%s] with an empty message definition. Some tools (e.g. rosbag) may not work correctly.", ops.topic.c_str()); 00328 } 00329 00330 PublicationPtr pub; 00331 00332 { 00333 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_); 00334 00335 if (isShuttingDown()) 00336 { 00337 return false; 00338 } 00339 00340 pub = lookupPublicationWithoutLock(ops.topic); 00341 if (pub && pub->getNumCallbacks() == 0) 00342 { 00343 pub.reset(); 00344 } 00345 00346 if (pub) 00347 { 00348 if (pub->getMD5Sum() != ops.md5sum) 00349 { 00350 ROS_ERROR("Tried to advertise on topic [%s] with md5sum [%s] and datatype [%s], but the topic is already advertised as md5sum [%s] and datatype [%s]", 00351 ops.topic.c_str(), ops.md5sum.c_str(), ops.datatype.c_str(), pub->getMD5Sum().c_str(), pub->getDataType().c_str()); 00352 return false; 00353 } 00354 00355 pub->addCallbacks(callbacks); 00356 00357 return true; 00358 } 00359 00360 pub = PublicationPtr(new Publication(ops.topic, ops.datatype, ops.md5sum, ops.message_definition, ops.queue_size, ops.latch, ops.has_header)); 00361 pub->addCallbacks(callbacks); 00362 advertised_topics_.push_back(pub); 00363 } 00364 00365 00366 { 00367 boost::mutex::scoped_lock lock(advertised_topic_names_mutex_); 00368 advertised_topic_names_.push_back(ops.topic); 00369 } 00370 00371 // Check whether we've already subscribed to this topic. If so, we'll do 00372 // the self-subscription here, to avoid the deadlock that would happen if 00373 // the ROS thread later got the publisherUpdate with its own XMLRPC URI. 00374 // The assumption is that advertise() is called from somewhere other 00375 // than the ROS thread. 00376 bool found = false; 00377 SubscriptionPtr sub; 00378 { 00379 boost::mutex::scoped_lock lock(subs_mutex_); 00380 00381 for (L_Subscription::iterator s = subscriptions_.begin(); 00382 s != subscriptions_.end() && !found; ++s) 00383 { 00384 if ((*s)->getName() == ops.topic && md5sumsMatch((*s)->md5sum(), ops.md5sum) && !(*s)->isDropped()) 00385 { 00386 found = true; 00387 sub = *s; 00388 break; 00389 } 00390 } 00391 } 00392 00393 if(found) 00394 { 00395 sub->addLocalConnection(pub); 00396 } 00397 00398 XmlRpcValue args, result, payload; 00399 args[0] = this_node::getName(); 00400 args[1] = ops.topic; 00401 args[2] = ops.datatype; 00402 args[3] = xmlrpc_manager_->getServerURI(); 00403 master::execute("registerPublisher", args, result, payload, true); 00404 00405 return true; 00406 } 00407 00408 bool TopicManager::unadvertise(const std::string &topic, const SubscriberCallbacksPtr& callbacks) 00409 { 00410 PublicationPtr pub; 00411 V_Publication::iterator i; 00412 { 00413 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_); 00414 00415 if (isShuttingDown()) 00416 { 00417 return false; 00418 } 00419 00420 for (i = advertised_topics_.begin(); 00421 i != advertised_topics_.end(); ++i) 00422 { 00423 if(((*i)->getName() == topic) && (!(*i)->isDropped())) 00424 { 00425 pub = *i; 00426 break; 00427 } 00428 } 00429 } 00430 00431 if (!pub) 00432 { 00433 return false; 00434 } 00435 00436 pub->removeCallbacks(callbacks); 00437 00438 { 00439 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_); 00440 if (pub->getNumCallbacks() == 0) 00441 { 00442 unregisterPublisher(pub->getName()); 00443 pub->drop(); 00444 00445 advertised_topics_.erase(i); 00446 00447 { 00448 boost::mutex::scoped_lock lock(advertised_topic_names_mutex_); 00449 advertised_topic_names_.remove(pub->getName()); 00450 } 00451 } 00452 } 00453 00454 return true; 00455 } 00456 00457 bool TopicManager::unregisterPublisher(const std::string& topic) 00458 { 00459 XmlRpcValue args, result, payload; 00460 args[0] = this_node::getName(); 00461 args[1] = topic; 00462 args[2] = xmlrpc_manager_->getServerURI(); 00463 master::execute("unregisterPublisher", args, result, payload, false); 00464 00465 return true; 00466 } 00467 00468 bool TopicManager::isTopicAdvertised(const string &topic) 00469 { 00470 for (V_Publication::iterator t = advertised_topics_.begin(); t != advertised_topics_.end(); ++t) 00471 { 00472 if (((*t)->getName() == topic) && (!(*t)->isDropped())) 00473 { 00474 return true; 00475 } 00476 } 00477 00478 return false; 00479 } 00480 00481 bool TopicManager::registerSubscriber(const SubscriptionPtr& s, const string &datatype) 00482 { 00483 XmlRpcValue args, result, payload; 00484 args[0] = this_node::getName(); 00485 args[1] = s->getName(); 00486 args[2] = datatype; 00487 args[3] = xmlrpc_manager_->getServerURI(); 00488 00489 if (!master::execute("registerSubscriber", args, result, payload, true)) 00490 { 00491 return false; 00492 } 00493 00494 vector<string> pub_uris; 00495 for (int i = 0; i < payload.size(); i++) 00496 { 00497 if (payload[i] != xmlrpc_manager_->getServerURI()) 00498 { 00499 pub_uris.push_back(string(payload[i])); 00500 } 00501 } 00502 00503 bool self_subscribed = false; 00504 PublicationPtr pub; 00505 const std::string& sub_md5sum = s->md5sum(); 00506 // Figure out if we have a local publisher 00507 { 00508 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_); 00509 V_Publication::const_iterator it = advertised_topics_.begin(); 00510 V_Publication::const_iterator end = advertised_topics_.end(); 00511 for (; it != end; ++it) 00512 { 00513 pub = *it; 00514 const std::string& pub_md5sum = pub->getMD5Sum(); 00515 00516 if (pub->getName() == s->getName() && !pub->isDropped()) 00517 { 00518 if (!md5sumsMatch(pub_md5sum, sub_md5sum)) 00519 { 00520 ROS_ERROR("md5sum mismatch making local subscription to topic %s.", 00521 s->getName().c_str()); 00522 ROS_ERROR("Subscriber expects type %s, md5sum %s", 00523 s->datatype().c_str(), s->md5sum().c_str()); 00524 ROS_ERROR("Publisher provides type %s, md5sum %s", 00525 pub->getDataType().c_str(), pub->getMD5Sum().c_str()); 00526 return false; 00527 } 00528 00529 self_subscribed = true; 00530 break; 00531 } 00532 } 00533 } 00534 00535 s->pubUpdate(pub_uris); 00536 if (self_subscribed) 00537 { 00538 s->addLocalConnection(pub); 00539 } 00540 00541 return true; 00542 } 00543 00544 bool TopicManager::unregisterSubscriber(const string &topic) 00545 { 00546 XmlRpcValue args, result, payload; 00547 args[0] = this_node::getName(); 00548 args[1] = topic; 00549 args[2] = xmlrpc_manager_->getServerURI(); 00550 00551 master::execute("unregisterSubscriber", args, result, payload, false); 00552 00553 return true; 00554 } 00555 00556 bool TopicManager::pubUpdate(const string &topic, const vector<string> &pubs) 00557 { 00558 SubscriptionPtr sub; 00559 { 00560 boost::mutex::scoped_lock lock(subs_mutex_); 00561 00562 if (isShuttingDown()) 00563 { 00564 return false; 00565 } 00566 00567 ROS_DEBUG("Received update for topic [%s] (%d publishers)", topic.c_str(), (int)pubs.size()); 00568 // find the subscription 00569 for (L_Subscription::const_iterator s = subscriptions_.begin(); 00570 s != subscriptions_.end(); ++s) 00571 { 00572 if ((*s)->getName() != topic || (*s)->isDropped()) 00573 continue; 00574 00575 sub = *s; 00576 break; 00577 } 00578 00579 } 00580 00581 if (sub) 00582 { 00583 sub->pubUpdate(pubs); 00584 } 00585 else 00586 { 00587 ROSCPP_LOG_DEBUG("got a request for updating publishers of topic %s, but I " \ 00588 "don't have any subscribers to that topic.", topic.c_str()); 00589 } 00590 00591 return false; 00592 } 00593 00594 bool TopicManager::requestTopic(const string &topic, 00595 XmlRpcValue &protos, 00596 XmlRpcValue &ret) 00597 { 00598 for (int proto_idx = 0; proto_idx < protos.size(); proto_idx++) 00599 { 00600 XmlRpcValue proto = protos[proto_idx]; // save typing 00601 if (proto.getType() != XmlRpcValue::TypeArray) 00602 { 00603 ROSCPP_LOG_DEBUG( "requestTopic protocol list was not a list of lists"); 00604 return false; 00605 } 00606 00607 if (proto[0].getType() != XmlRpcValue::TypeString) 00608 { 00609 ROSCPP_LOG_DEBUG( "requestTopic received a protocol list in which a sublist " \ 00610 "did not start with a string"); 00611 return false; 00612 } 00613 00614 string proto_name = proto[0]; 00615 if (proto_name == string("TCPROS")) 00616 { 00617 XmlRpcValue tcpros_params; 00618 tcpros_params[0] = string("TCPROS"); 00619 tcpros_params[1] = network::getHost(); 00620 tcpros_params[2] = int(connection_manager_->getTCPPort()); 00621 ret[0] = int(1); 00622 ret[1] = string(); 00623 ret[2] = tcpros_params; 00624 return true; 00625 } 00626 else if (proto_name == string("UDPROS")) 00627 { 00628 if (proto.size() != 5 || 00629 proto[1].getType() != XmlRpcValue::TypeBase64 || 00630 proto[2].getType() != XmlRpcValue::TypeString || 00631 proto[3].getType() != XmlRpcValue::TypeInt || 00632 proto[4].getType() != XmlRpcValue::TypeInt) 00633 { 00634 ROSCPP_LOG_DEBUG("Invalid protocol parameters for UDPROS"); 00635 return false; 00636 } 00637 std::vector<char> header_bytes = proto[1]; 00638 boost::shared_array<uint8_t> buffer = boost::shared_array<uint8_t>(new uint8_t[header_bytes.size()]); 00639 memcpy(buffer.get(), &header_bytes[0], header_bytes.size()); 00640 Header h; 00641 string err; 00642 if (!h.parse(buffer, header_bytes.size(), err)) 00643 { 00644 ROSCPP_LOG_DEBUG("Unable to parse UDPROS connection header: %s", err.c_str()); 00645 return false; 00646 } 00647 00648 PublicationPtr pub_ptr = lookupPublication(topic); 00649 if(!pub_ptr) 00650 { 00651 ROSCPP_LOG_DEBUG("Unable to find advertised topic %s for UDPROS connection", topic.c_str()); 00652 return false; 00653 } 00654 00655 std::string host = proto[2]; 00656 int port = proto[3]; 00657 00658 M_string m; 00659 std::string error_msg; 00660 if (!pub_ptr->validateHeader(h, error_msg)) 00661 { 00662 ROSCPP_LOG_DEBUG("Error validating header from [%s:%d] for topic [%s]: %s", host.c_str(), port, topic.c_str(), error_msg.c_str()); 00663 return false; 00664 } 00665 00666 int max_datagram_size = proto[4]; 00667 int conn_id = connection_manager_->getNewConnectionID(); 00668 TransportUDPPtr transport = connection_manager_->getUDPServerTransport()->createOutgoing(host, port, conn_id, max_datagram_size); 00669 connection_manager_->udprosIncomingConnection(transport, h); 00670 00671 XmlRpcValue udpros_params; 00672 udpros_params[0] = string("UDPROS"); 00673 udpros_params[1] = network::getHost(); 00674 udpros_params[2] = connection_manager_->getUDPServerTransport()->getServerPort(); 00675 udpros_params[3] = conn_id; 00676 udpros_params[4] = max_datagram_size; 00677 m["topic"] = topic; 00678 m["md5sum"] = pub_ptr->getMD5Sum(); 00679 m["type"] = pub_ptr->getDataType(); 00680 m["callerid"] = this_node::getName(); 00681 m["message_definition"] = pub_ptr->getMessageDefinition(); 00682 boost::shared_array<uint8_t> msg_def_buffer; 00683 uint32_t len; 00684 Header::write(m, msg_def_buffer, len); 00685 XmlRpcValue v(msg_def_buffer.get(), len); 00686 udpros_params[5] = v; 00687 ret[0] = int(1); 00688 ret[1] = string(); 00689 ret[2] = udpros_params; 00690 return true; 00691 } 00692 else 00693 { 00694 ROSCPP_LOG_DEBUG( "an unsupported protocol was offered: [%s]", 00695 proto_name.c_str()); 00696 } 00697 } 00698 00699 ROSCPP_LOG_DEBUG( "Currently, roscpp only supports TCPROS. The caller to " \ 00700 "requestTopic did not support TCPROS, so there are no " \ 00701 "protocols in common."); 00702 return false; 00703 } 00704 00705 void TopicManager::publish(const std::string& topic, const boost::function<SerializedMessage(void)>& serfunc, SerializedMessage& m) 00706 { 00707 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_); 00708 00709 if (isShuttingDown()) 00710 { 00711 return; 00712 } 00713 00714 PublicationPtr p = lookupPublicationWithoutLock(topic); 00715 if (p->hasSubscribers() || p->isLatching()) 00716 { 00717 ROS_DEBUG_NAMED("superdebug", "Publishing message on topic [%s] with sequence number [%d]", p->getName().c_str(), p->getSequence()); 00718 00719 // Determine what kinds of subscribers we're publishing to. If they're intraprocess with the same C++ type we can 00720 // do a no-copy publish. 00721 bool nocopy = false; 00722 bool serialize = false; 00723 00724 // We can only do a no-copy publish if a shared_ptr to the message is provided, and we have type information for it 00725 if (m.type_info && m.message) 00726 { 00727 p->getPublishTypes(serialize, nocopy, *m.type_info); 00728 } 00729 else 00730 { 00731 serialize = true; 00732 } 00733 00734 if (!nocopy) 00735 { 00736 m.message.reset(); 00737 m.type_info = 0; 00738 } 00739 00740 if (serialize || p->isLatching()) 00741 { 00742 SerializedMessage m2 = serfunc(); 00743 m.buf = m2.buf; 00744 m.num_bytes = m2.num_bytes; 00745 m.message_start = m2.message_start; 00746 } 00747 00748 p->publish(m); 00749 00750 // If we're not doing a serialized publish we don't need to signal the pollset. The write() 00751 // call inside signal() is actually relatively expensive when doing a nocopy publish. 00752 if (serialize) 00753 { 00754 poll_manager_->getPollSet().signal(); 00755 } 00756 } 00757 else 00758 { 00759 p->incrementSequence(); 00760 } 00761 } 00762 00763 void TopicManager::incrementSequence(const std::string& topic) 00764 { 00765 PublicationPtr pub = lookupPublication(topic); 00766 if (pub) 00767 { 00768 pub->incrementSequence(); 00769 } 00770 } 00771 00772 bool TopicManager::isLatched(const std::string& topic) 00773 { 00774 PublicationPtr pub = lookupPublication(topic); 00775 if (pub) 00776 { 00777 return pub->isLatched(); 00778 } 00779 00780 return false; 00781 } 00782 00783 PublicationPtr TopicManager::lookupPublicationWithoutLock(const string &topic) 00784 { 00785 PublicationPtr t; 00786 for (V_Publication::iterator i = advertised_topics_.begin(); 00787 !t && i != advertised_topics_.end(); ++i) 00788 { 00789 if (((*i)->getName() == topic) && (!(*i)->isDropped())) 00790 { 00791 t = *i; 00792 break; 00793 } 00794 } 00795 00796 return t; 00797 } 00798 00799 bool TopicManager::unsubscribe(const std::string &topic, const SubscriptionCallbackHelperPtr& helper) 00800 { 00801 SubscriptionPtr sub; 00802 L_Subscription::iterator it; 00803 00804 { 00805 boost::mutex::scoped_lock lock(subs_mutex_); 00806 00807 if (isShuttingDown()) 00808 { 00809 return false; 00810 } 00811 00812 for (it = subscriptions_.begin(); 00813 it != subscriptions_.end() && !sub; ++it) 00814 { 00815 if ((*it)->getName() == topic) 00816 { 00817 sub = *it; 00818 break; 00819 } 00820 } 00821 } 00822 00823 if (!sub) 00824 { 00825 return false; 00826 } 00827 00828 sub->removeCallback(helper); 00829 00830 if (sub->getNumCallbacks() == 0) 00831 { 00832 // nobody is left. blow away the subscription. 00833 { 00834 boost::mutex::scoped_lock lock(subs_mutex_); 00835 00836 subscriptions_.erase(it); 00837 00838 if (!unregisterSubscriber(topic)) 00839 { 00840 ROSCPP_LOG_DEBUG("Couldn't unregister subscriber for topic [%s]", topic.c_str()); 00841 } 00842 } 00843 00844 sub->shutdown(); 00845 return true; 00846 } 00847 00848 return true; 00849 } 00850 00851 size_t TopicManager::getNumSubscribers(const std::string &topic) 00852 { 00853 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_); 00854 00855 if (isShuttingDown()) 00856 { 00857 return 0; 00858 } 00859 00860 PublicationPtr p = lookupPublicationWithoutLock(topic); 00861 if (p) 00862 { 00863 return p->getNumSubscribers(); 00864 } 00865 00866 return 0; 00867 } 00868 00869 size_t TopicManager::getNumSubscriptions() 00870 { 00871 boost::mutex::scoped_lock lock(subs_mutex_); 00872 return subscriptions_.size(); 00873 } 00874 00875 size_t TopicManager::getNumPublishers(const std::string &topic) 00876 { 00877 boost::mutex::scoped_lock lock(subs_mutex_); 00878 00879 if (isShuttingDown()) 00880 { 00881 return 0; 00882 } 00883 00884 for (L_Subscription::const_iterator t = subscriptions_.begin(); 00885 t != subscriptions_.end(); ++t) 00886 { 00887 if (!(*t)->isDropped() && (*t)->getName() == topic) 00888 { 00889 return (*t)->getNumPublishers(); 00890 } 00891 } 00892 00893 return 0; 00894 } 00895 00896 void TopicManager::getBusStats(XmlRpcValue &stats) 00897 { 00898 XmlRpcValue publish_stats, subscribe_stats, service_stats; 00899 // force these guys to be arrays, even if we don't populate them 00900 publish_stats.setSize(0); 00901 subscribe_stats.setSize(0); 00902 service_stats.setSize(0); 00903 00904 uint32_t pidx = 0; 00905 { 00906 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_); 00907 for (V_Publication::iterator t = advertised_topics_.begin(); 00908 t != advertised_topics_.end(); ++t) 00909 { 00910 publish_stats[pidx++] = (*t)->getStats(); 00911 } 00912 } 00913 00914 { 00915 uint32_t sidx = 0; 00916 00917 boost::mutex::scoped_lock lock(subs_mutex_); 00918 for (L_Subscription::iterator t = subscriptions_.begin(); t != subscriptions_.end(); ++t) 00919 { 00920 subscribe_stats[sidx++] = (*t)->getStats(); 00921 } 00922 } 00923 00924 stats[0] = publish_stats; 00925 stats[1] = subscribe_stats; 00926 stats[2] = service_stats; 00927 } 00928 00929 void TopicManager::getBusInfo(XmlRpcValue &info) 00930 { 00931 // force these guys to be arrays, even if we don't populate them 00932 info.setSize(0); 00933 00934 { 00935 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_); 00936 00937 for (V_Publication::iterator t = advertised_topics_.begin(); 00938 t != advertised_topics_.end(); ++t) 00939 { 00940 (*t)->getInfo(info); 00941 } 00942 } 00943 00944 { 00945 boost::mutex::scoped_lock lock(subs_mutex_); 00946 00947 for (L_Subscription::iterator t = subscriptions_.begin(); t != subscriptions_.end(); ++t) 00948 { 00949 (*t)->getInfo(info); 00950 } 00951 } 00952 } 00953 00954 void TopicManager::getSubscriptions(XmlRpcValue &subs) 00955 { 00956 // force these guys to be arrays, even if we don't populate them 00957 subs.setSize(0); 00958 00959 { 00960 boost::mutex::scoped_lock lock(subs_mutex_); 00961 00962 uint32_t sidx = 0; 00963 00964 for (L_Subscription::iterator t = subscriptions_.begin(); t != subscriptions_.end(); ++t) 00965 { 00966 XmlRpcValue sub; 00967 sub[0] = (*t)->getName(); 00968 sub[1] = (*t)->datatype(); 00969 subs[sidx++] = sub; 00970 } 00971 } 00972 } 00973 00974 void TopicManager::getPublications(XmlRpcValue &pubs) 00975 { 00976 // force these guys to be arrays, even if we don't populate them 00977 pubs.setSize(0); 00978 00979 { 00980 boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_); 00981 00982 uint32_t sidx = 0; 00983 00984 for (V_Publication::iterator t = advertised_topics_.begin(); 00985 t != advertised_topics_.end(); ++t) 00986 { 00987 XmlRpcValue pub; 00988 pub[0] = (*t)->getName(); 00989 pub[1] = (*t)->getDataType(); 00990 pubs[sidx++] = pub; 00991 } 00992 00993 } 00994 } 00995 00996 extern ROSOutAppenderPtr g_rosout_appender; 00997 00998 void TopicManager::pubUpdateCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result) 00999 { 01000 std::vector<std::string> pubs; 01001 for (int idx = 0; idx < params[2].size(); idx++) 01002 { 01003 pubs.push_back(params[2][idx]); 01004 } 01005 if (pubUpdate(params[1], pubs)) 01006 { 01007 result = xmlrpc::responseInt(1, "", 0); 01008 } 01009 else 01010 { 01011 std::string last_error = "Unknown Error"; 01012 if ( g_rosout_appender != 0 ) 01013 { 01014 last_error = g_rosout_appender->getLastError(); 01015 } 01016 01017 result = xmlrpc::responseInt(0, last_error, 0); 01018 } 01019 } 01020 01021 void TopicManager::requestTopicCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result) 01022 { 01023 if (!requestTopic(params[1], params[2], result)) 01024 { 01025 std::string last_error = "Unknown Error"; 01026 if ( g_rosout_appender != 0 ) 01027 { 01028 last_error = g_rosout_appender->getLastError(); 01029 } 01030 01031 result = xmlrpc::responseInt(0, last_error, 0); 01032 } 01033 } 01034 01035 void TopicManager::getBusStatsCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result) 01036 { 01037 result[0] = 1; 01038 result[1] = std::string(""); 01039 XmlRpcValue response; 01040 getBusStats(result); 01041 result[2] = response; 01042 } 01043 01044 void TopicManager::getBusInfoCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result) 01045 { 01046 result[0] = 1; 01047 result[1] = std::string(""); 01048 XmlRpcValue response; 01049 getBusInfo(response); 01050 result[2] = response; 01051 } 01052 01053 void TopicManager::getSubscriptionsCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result) 01054 { 01055 result[0] = 1; 01056 result[1] = std::string("subscriptions"); 01057 XmlRpcValue response; 01058 getSubscriptions(response); 01059 result[2] = response; 01060 } 01061 01062 void TopicManager::getPublicationsCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result) 01063 { 01064 result[0] = 1; 01065 result[1] = std::string("publications"); 01066 XmlRpcValue response; 01067 getPublications(response); 01068 result[2] = response; 01069 } 01070 01071 } // namespace ros