topic_manager.cpp
Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2009, Willow Garage, Inc.
00003  *
00004  * Redistribution and use in source and binary forms, with or without
00005  * modification, are permitted provided that the following conditions are met:
00006  *   * Redistributions of source code must retain the above copyright notice,
00007  *     this list of conditions and the following disclaimer.
00008  *   * Redistributions in binary form must reproduce the above copyright
00009  *     notice, this list of conditions and the following disclaimer in the
00010  *     documentation and/or other materials provided with the distribution.
00011  *   * Neither the names of Willow Garage, Inc. nor the names of its
00012  *     contributors may be used to endorse or promote products derived from
00013  *     this software without specific prior written permission.
00014  *
00015  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00016  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00017  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00018  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00019  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00020  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00021  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00022  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00023  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00024  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00025  * POSSIBILITY OF SUCH DAMAGE.
00026  */
00027 
00028 #include "ros/topic_manager.h"
00029 #include "ros/xmlrpc_manager.h"
00030 #include "ros/connection_manager.h"
00031 #include "ros/poll_manager.h"
00032 #include "ros/publication.h"
00033 #include "ros/subscription.h"
00034 #include "ros/this_node.h"
00035 #include "ros/network.h"
00036 #include "ros/master.h"
00037 #include "ros/transport/transport_tcp.h"
00038 #include "ros/transport/transport_udp.h"
00039 #include "ros/rosout_appender.h"
00040 #include "ros/init.h"
00041 #include "ros/file_log.h"
00042 #include "ros/subscribe_options.h"
00043 
00044 #include "XmlRpc.h"
00045 
00046 #include <ros/console.h>
00047 
00048 using namespace XmlRpc; // A battle to be fought later
00049 using namespace std; // sigh
00050 
00052 
00053 namespace ros
00054 {
00055 
00056 TopicManagerPtr g_topic_manager;
00057 boost::mutex g_topic_manager_mutex;
00058 const TopicManagerPtr& TopicManager::instance()
00059 {
00060   if (!g_topic_manager)
00061   {
00062     boost::mutex::scoped_lock lock(g_topic_manager_mutex);
00063     if (!g_topic_manager)
00064     {
00065       g_topic_manager = boost::make_shared<TopicManager>();
00066     }
00067   }
00068 
00069   return g_topic_manager;
00070 }
00071 
00072 TopicManager::TopicManager()
00073 : shutting_down_(false)
00074 {
00075 }
00076 
00077 TopicManager::~TopicManager()
00078 {
00079   shutdown();
00080 }
00081 
00082 void TopicManager::start()
00083 {
00084   boost::mutex::scoped_lock shutdown_lock(shutting_down_mutex_);
00085   shutting_down_ = false;
00086 
00087   poll_manager_ = PollManager::instance();
00088   connection_manager_ = ConnectionManager::instance();
00089   xmlrpc_manager_ = XMLRPCManager::instance();
00090 
00091   xmlrpc_manager_->bind("publisherUpdate", boost::bind(&TopicManager::pubUpdateCallback, this, _1, _2));
00092   xmlrpc_manager_->bind("requestTopic", boost::bind(&TopicManager::requestTopicCallback, this, _1, _2));
00093   xmlrpc_manager_->bind("getBusStats", boost::bind(&TopicManager::getBusStatsCallback, this, _1, _2));
00094   xmlrpc_manager_->bind("getBusInfo", boost::bind(&TopicManager::getBusInfoCallback, this, _1, _2));
00095   xmlrpc_manager_->bind("getSubscriptions", boost::bind(&TopicManager::getSubscriptionsCallback, this, _1, _2));
00096   xmlrpc_manager_->bind("getPublications", boost::bind(&TopicManager::getPublicationsCallback, this, _1, _2));
00097 
00098   poll_manager_->addPollThreadListener(boost::bind(&TopicManager::processPublishQueues, this));
00099 }
00100 
00101 void TopicManager::shutdown()
00102 {
00103   boost::mutex::scoped_lock shutdown_lock(shutting_down_mutex_);
00104   if (shutting_down_)
00105   {
00106     return;
00107   }
00108 
00109   {
00110     boost::recursive_mutex::scoped_lock lock1(advertised_topics_mutex_);
00111     boost::mutex::scoped_lock lock2(subs_mutex_);
00112     shutting_down_ = true;
00113   }
00114 
00115   xmlrpc_manager_->unbind("publisherUpdate");
00116   xmlrpc_manager_->unbind("requestTopic");
00117   xmlrpc_manager_->unbind("getBusStats");
00118   xmlrpc_manager_->unbind("getBusInfo");
00119   xmlrpc_manager_->unbind("getSubscriptions");
00120   xmlrpc_manager_->unbind("getPublications");
00121 
00122   ROSCPP_LOG_DEBUG("Shutting down topics...");
00123   ROSCPP_LOG_DEBUG("  shutting down publishers");
00124   {
00125     boost::recursive_mutex::scoped_lock adv_lock(advertised_topics_mutex_);
00126 
00127     for (V_Publication::iterator i = advertised_topics_.begin();
00128          i != advertised_topics_.end(); ++i)
00129     {
00130       if(!(*i)->isDropped())
00131       {
00132         unregisterPublisher((*i)->getName());
00133       }
00134       (*i)->drop();
00135     }
00136     advertised_topics_.clear();
00137   }
00138 
00139   // unregister all of our subscriptions
00140   ROSCPP_LOG_DEBUG("  shutting down subscribers");
00141   {
00142     boost::mutex::scoped_lock subs_lock(subs_mutex_);
00143 
00144     for (L_Subscription::iterator s = subscriptions_.begin(); s != subscriptions_.end(); ++s)
00145     {
00146       // Remove us as a subscriber from the master
00147       unregisterSubscriber((*s)->getName());
00148       // now, drop our side of the connection
00149       (*s)->shutdown();
00150     }
00151     subscriptions_.clear();
00152   }
00153 }
00154 
00155 void TopicManager::processPublishQueues()
00156 {
00157   boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00158 
00159   V_Publication::iterator it = advertised_topics_.begin();
00160   V_Publication::iterator end = advertised_topics_.end();
00161   for (; it != end; ++it)
00162   {
00163     const PublicationPtr& pub = *it;
00164     pub->processPublishQueue();
00165   }
00166 }
00167 
00168 void TopicManager::getAdvertisedTopics(V_string& topics)
00169 {
00170   boost::mutex::scoped_lock lock(advertised_topic_names_mutex_);
00171 
00172   topics.resize(advertised_topic_names_.size());
00173   std::copy(advertised_topic_names_.begin(),
00174             advertised_topic_names_.end(),
00175             topics.begin());
00176 }
00177 
00178 void TopicManager::getSubscribedTopics(V_string& topics)
00179 {
00180   boost::mutex::scoped_lock lock(subs_mutex_);
00181 
00182   topics.reserve(subscriptions_.size());
00183   L_Subscription::const_iterator it = subscriptions_.begin();
00184   L_Subscription::const_iterator end = subscriptions_.end();
00185   for (; it != end; ++it)
00186   {
00187     const SubscriptionPtr& sub = *it;
00188     topics.push_back(sub->getName());
00189   }
00190 }
00191 
00192 PublicationPtr TopicManager::lookupPublication(const std::string& topic)
00193 {
00194   boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00195 
00196   return lookupPublicationWithoutLock(topic);
00197 }
00198 
00199 bool md5sumsMatch(const std::string& lhs, const std::string& rhs)
00200 {
00201   return lhs == "*" || rhs == "*" || lhs == rhs;
00202 }
00203 
00204 bool TopicManager::addSubCallback(const SubscribeOptions& ops)
00205 {
00206   // spin through the subscriptions and see if we find a match. if so, use it.
00207   bool found = false;
00208   bool found_topic = false;
00209 
00210   SubscriptionPtr sub;
00211 
00212   {
00213     if (isShuttingDown())
00214     {
00215       return false;
00216     }
00217 
00218     for (L_Subscription::iterator s = subscriptions_.begin();
00219          s != subscriptions_.end() && !found; ++s)
00220     {
00221       sub = *s;
00222       if (!sub->isDropped() && sub->getName() == ops.topic)
00223       {
00224         found_topic = true;
00225         if (md5sumsMatch(ops.md5sum, sub->md5sum()))
00226         {
00227           found = true;
00228         }
00229         break;
00230       }
00231     }
00232   }
00233 
00234   if (found_topic && !found)
00235   {
00236     std::stringstream ss;
00237     ss << "Tried to subscribe to a topic with the same name but different md5sum as a topic that was already subscribed [" << ops.datatype << "/" << ops.md5sum << " vs. " << sub->datatype() << "/" << sub->md5sum() << "]";
00238     throw ConflictingSubscriptionException(ss.str());
00239   }
00240   else if (found)
00241   {
00242     if (!sub->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object, ops.allow_concurrent_callbacks))
00243     {
00244       return false;
00245     }
00246   }
00247 
00248   return found;
00249 }
00250 
00251 // this function has the subscription code that doesn't need to be templated.
00252 bool TopicManager::subscribe(const SubscribeOptions& ops)
00253 {
00254   boost::mutex::scoped_lock lock(subs_mutex_);
00255 
00256   if (addSubCallback(ops))
00257   {
00258     return true;
00259   }
00260 
00261   if (isShuttingDown())
00262   {
00263     return false;
00264   }
00265 
00266   if (ops.md5sum.empty())
00267   {
00268     throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] with an empty md5sum");
00269   }
00270 
00271   if (ops.datatype.empty())
00272   {
00273     throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] with an empty datatype");
00274   }
00275 
00276   if (!ops.helper)
00277   {
00278     throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] without a callback");
00279   }
00280 
00281   const std::string& md5sum = ops.md5sum;
00282   std::string datatype = ops.datatype;
00283 
00284   SubscriptionPtr s(boost::make_shared<Subscription>(ops.topic, md5sum, datatype, ops.transport_hints));
00285   s->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object, ops.allow_concurrent_callbacks);
00286 
00287   if (!registerSubscriber(s, ops.datatype))
00288   {
00289     ROS_WARN("couldn't register subscriber on topic [%s]", ops.topic.c_str());
00290     s->shutdown();
00291     return false;
00292   }
00293 
00294   subscriptions_.push_back(s);
00295 
00296   return true;
00297 }
00298 
00299 bool TopicManager::advertise(const AdvertiseOptions& ops, const SubscriberCallbacksPtr& callbacks)
00300 {
00301   if (ops.datatype == "*")
00302   {
00303     std::stringstream ss;
00304     ss << "Advertising with * as the datatype is not allowed.  Topic [" << ops.topic << "]";
00305     throw InvalidParameterException(ss.str());
00306   }
00307 
00308   if (ops.md5sum == "*")
00309   {
00310     std::stringstream ss;
00311     ss << "Advertising with * as the md5sum is not allowed.  Topic [" << ops.topic << "]";
00312     throw InvalidParameterException(ss.str());
00313   }
00314 
00315   if (ops.md5sum.empty())
00316   {
00317     throw InvalidParameterException("Advertising on topic [" + ops.topic + "] with an empty md5sum");
00318   }
00319 
00320   if (ops.datatype.empty())
00321   {
00322     throw InvalidParameterException("Advertising on topic [" + ops.topic + "] with an empty datatype");
00323   }
00324 
00325   if (ops.message_definition.empty())
00326   {
00327     ROS_WARN("Advertising on topic [%s] with an empty message definition.  Some tools (e.g. rosbag) may not work correctly.", ops.topic.c_str());
00328   }
00329 
00330   PublicationPtr pub;
00331 
00332   {
00333     boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00334 
00335     if (isShuttingDown())
00336     {
00337       return false;
00338     }
00339 
00340     pub = lookupPublicationWithoutLock(ops.topic);
00341     if (pub && pub->getNumCallbacks() == 0)
00342     {
00343       pub.reset();
00344     }
00345 
00346     if (pub)
00347     {
00348       if (pub->getMD5Sum() != ops.md5sum)
00349       {
00350         ROS_ERROR("Tried to advertise on topic [%s] with md5sum [%s] and datatype [%s], but the topic is already advertised as md5sum [%s] and datatype [%s]",
00351                   ops.topic.c_str(), ops.md5sum.c_str(), ops.datatype.c_str(), pub->getMD5Sum().c_str(), pub->getDataType().c_str());
00352         return false;
00353       }
00354 
00355       pub->addCallbacks(callbacks);
00356 
00357       return true;
00358     }
00359 
00360     pub = PublicationPtr(boost::make_shared<Publication>(ops.topic, ops.datatype, ops.md5sum, ops.message_definition, ops.queue_size, ops.latch, ops.has_header));
00361     pub->addCallbacks(callbacks);
00362     advertised_topics_.push_back(pub);
00363   }
00364 
00365 
00366   {
00367     boost::mutex::scoped_lock lock(advertised_topic_names_mutex_);
00368     advertised_topic_names_.push_back(ops.topic);
00369   }
00370 
00371   // Check whether we've already subscribed to this topic.  If so, we'll do
00372   // the self-subscription here, to avoid the deadlock that would happen if
00373   // the ROS thread later got the publisherUpdate with its own XMLRPC URI.
00374   // The assumption is that advertise() is called from somewhere other
00375   // than the ROS thread.
00376   bool found = false;
00377   SubscriptionPtr sub;
00378   {
00379     boost::mutex::scoped_lock lock(subs_mutex_);
00380 
00381     for (L_Subscription::iterator s = subscriptions_.begin();
00382          s != subscriptions_.end() && !found; ++s)
00383     {
00384       if ((*s)->getName() == ops.topic && md5sumsMatch((*s)->md5sum(), ops.md5sum) && !(*s)->isDropped())
00385       {
00386         found = true;
00387         sub = *s;
00388         break;
00389       }
00390     }
00391   }
00392 
00393   if(found)
00394   {
00395     sub->addLocalConnection(pub);
00396   }
00397 
00398   XmlRpcValue args, result, payload;
00399   args[0] = this_node::getName();
00400   args[1] = ops.topic;
00401   args[2] = ops.datatype;
00402   args[3] = xmlrpc_manager_->getServerURI();
00403   master::execute("registerPublisher", args, result, payload, true);
00404 
00405   return true;
00406 }
00407 
00408 bool TopicManager::unadvertise(const std::string &topic, const SubscriberCallbacksPtr& callbacks)
00409 {
00410   PublicationPtr pub;
00411   V_Publication::iterator i;
00412   {
00413     boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00414 
00415     if (isShuttingDown())
00416     {
00417       return false;
00418     }
00419 
00420     for (i = advertised_topics_.begin();
00421          i != advertised_topics_.end(); ++i)
00422     {
00423       if(((*i)->getName() == topic) && (!(*i)->isDropped()))
00424       {
00425         pub = *i;
00426         break;
00427       }
00428     }
00429   }
00430 
00431   if (!pub)
00432   {
00433     return false;
00434   }
00435 
00436   pub->removeCallbacks(callbacks);
00437 
00438   {
00439     boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00440     if (pub->getNumCallbacks() == 0)
00441     {
00442       unregisterPublisher(pub->getName());
00443       pub->drop();
00444 
00445       advertised_topics_.erase(i);
00446 
00447       {
00448         boost::mutex::scoped_lock lock(advertised_topic_names_mutex_);
00449         advertised_topic_names_.remove(pub->getName());
00450       }
00451     }
00452   }
00453 
00454   return true;
00455 }
00456 
00457 bool TopicManager::unregisterPublisher(const std::string& topic)
00458 {
00459   XmlRpcValue args, result, payload;
00460   args[0] = this_node::getName();
00461   args[1] = topic;
00462   args[2] = xmlrpc_manager_->getServerURI();
00463   master::execute("unregisterPublisher", args, result, payload, false);
00464 
00465   return true;
00466 }
00467 
00468 bool TopicManager::isTopicAdvertised(const string &topic)
00469 {
00470   for (V_Publication::iterator t = advertised_topics_.begin(); t != advertised_topics_.end(); ++t)
00471   {
00472     if (((*t)->getName() == topic) && (!(*t)->isDropped()))
00473     {
00474       return true;
00475     }
00476   }
00477 
00478   return false;
00479 }
00480 
00481 bool TopicManager::registerSubscriber(const SubscriptionPtr& s, const string &datatype)
00482 {
00483   XmlRpcValue args, result, payload;
00484   args[0] = this_node::getName();
00485   args[1] = s->getName();
00486   args[2] = datatype;
00487   args[3] = xmlrpc_manager_->getServerURI();
00488 
00489   if (!master::execute("registerSubscriber", args, result, payload, true))
00490   {
00491     return false;
00492   }
00493 
00494   vector<string> pub_uris;
00495   for (int i = 0; i < payload.size(); i++)
00496   {
00497     if (payload[i] != xmlrpc_manager_->getServerURI())
00498     {
00499       pub_uris.push_back(string(payload[i]));
00500     }
00501   }
00502 
00503   bool self_subscribed = false;
00504   PublicationPtr pub;
00505   const std::string& sub_md5sum = s->md5sum();
00506   // Figure out if we have a local publisher
00507   {
00508     boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00509     V_Publication::const_iterator it = advertised_topics_.begin();
00510     V_Publication::const_iterator end = advertised_topics_.end();
00511     for (; it != end; ++it)
00512     {
00513       pub = *it;
00514       const std::string& pub_md5sum = pub->getMD5Sum();
00515 
00516       if (pub->getName() == s->getName() && !pub->isDropped())
00517         {
00518           if (!md5sumsMatch(pub_md5sum, sub_md5sum))
00519             {
00520               ROS_ERROR("md5sum mismatch making local subscription to topic %s.",
00521                         s->getName().c_str());
00522               ROS_ERROR("Subscriber expects type %s, md5sum %s",
00523                         s->datatype().c_str(), s->md5sum().c_str());
00524               ROS_ERROR("Publisher provides type %s, md5sum %s",
00525                         pub->getDataType().c_str(), pub->getMD5Sum().c_str());
00526               return false;
00527             }
00528 
00529           self_subscribed = true;
00530           break;
00531         }
00532     }
00533   }
00534 
00535   s->pubUpdate(pub_uris);
00536   if (self_subscribed)
00537   {
00538     s->addLocalConnection(pub);
00539   }
00540 
00541   return true;
00542 }
00543 
00544 bool TopicManager::unregisterSubscriber(const string &topic)
00545 {
00546   XmlRpcValue args, result, payload;
00547   args[0] = this_node::getName();
00548   args[1] = topic;
00549   args[2] = xmlrpc_manager_->getServerURI();
00550 
00551   master::execute("unregisterSubscriber", args, result, payload, false);
00552 
00553   return true;
00554 }
00555 
00556 bool TopicManager::pubUpdate(const string &topic, const vector<string> &pubs)
00557 {
00558   SubscriptionPtr sub;
00559   {
00560     boost::mutex::scoped_lock lock(subs_mutex_);
00561 
00562     if (isShuttingDown())
00563     {
00564       return false;
00565     }
00566 
00567     ROS_DEBUG("Received update for topic [%s] (%d publishers)", topic.c_str(), (int)pubs.size());
00568     // find the subscription
00569     for (L_Subscription::const_iterator s  = subscriptions_.begin();
00570                                             s != subscriptions_.end(); ++s)
00571     {
00572       if ((*s)->getName() != topic || (*s)->isDropped())
00573         continue;
00574 
00575       sub = *s;
00576       break;
00577     }
00578 
00579   }
00580 
00581   if (sub)
00582   {
00583     return sub->pubUpdate(pubs);
00584   }
00585   else
00586   {
00587     ROSCPP_LOG_DEBUG("got a request for updating publishers of topic %s, but I " \
00588               "don't have any subscribers to that topic.", topic.c_str());
00589   }
00590 
00591   return false;
00592 }
00593 
00594 bool TopicManager::requestTopic(const string &topic,
00595                          XmlRpcValue &protos,
00596                          XmlRpcValue &ret)
00597 {
00598   for (int proto_idx = 0; proto_idx < protos.size(); proto_idx++)
00599   {
00600     XmlRpcValue proto = protos[proto_idx]; // save typing
00601     if (proto.getType() != XmlRpcValue::TypeArray)
00602     {
00603         ROSCPP_LOG_DEBUG( "requestTopic protocol list was not a list of lists");
00604       return false;
00605     }
00606 
00607     if (proto[0].getType() != XmlRpcValue::TypeString)
00608     {
00609         ROSCPP_LOG_DEBUG( "requestTopic received a protocol list in which a sublist " \
00610                  "did not start with a string");
00611       return false;
00612     }
00613 
00614     string proto_name = proto[0];
00615     if (proto_name == string("TCPROS"))
00616     {
00617       XmlRpcValue tcpros_params;
00618       tcpros_params[0] = string("TCPROS");
00619       tcpros_params[1] = network::getHost();
00620       tcpros_params[2] = int(connection_manager_->getTCPPort());
00621       ret[0] = int(1);
00622       ret[1] = string();
00623       ret[2] = tcpros_params;
00624       return true;
00625     }
00626     else if (proto_name == string("UDPROS"))
00627     {
00628       if (proto.size() != 5 ||
00629           proto[1].getType() != XmlRpcValue::TypeBase64 ||
00630           proto[2].getType() != XmlRpcValue::TypeString ||
00631           proto[3].getType() != XmlRpcValue::TypeInt ||
00632           proto[4].getType() != XmlRpcValue::TypeInt)
00633       {
00634         ROSCPP_LOG_DEBUG("Invalid protocol parameters for UDPROS");
00635         return false;
00636       }
00637       std::vector<char> header_bytes = proto[1];
00638       boost::shared_array<uint8_t> buffer(new uint8_t[header_bytes.size()]);
00639       memcpy(buffer.get(), &header_bytes[0], header_bytes.size());
00640       Header h;
00641       string err;
00642       if (!h.parse(buffer, header_bytes.size(), err))
00643       {
00644         ROSCPP_LOG_DEBUG("Unable to parse UDPROS connection header: %s", err.c_str());
00645         return false;
00646       }
00647 
00648       PublicationPtr pub_ptr = lookupPublication(topic);
00649       if(!pub_ptr)
00650       {
00651         ROSCPP_LOG_DEBUG("Unable to find advertised topic %s for UDPROS connection", topic.c_str());
00652         return false;
00653       }
00654 
00655       std::string host = proto[2];
00656       int port = proto[3];
00657 
00658       M_string m;
00659       std::string error_msg;
00660       if (!pub_ptr->validateHeader(h, error_msg))
00661       {
00662         ROSCPP_LOG_DEBUG("Error validating header from [%s:%d] for topic [%s]: %s", host.c_str(), port, topic.c_str(), error_msg.c_str());
00663         return false;
00664       }
00665 
00666       int max_datagram_size = proto[4];
00667       int conn_id = connection_manager_->getNewConnectionID();
00668       TransportUDPPtr transport = connection_manager_->getUDPServerTransport()->createOutgoing(host, port, conn_id, max_datagram_size);
00669       if (!transport)
00670       {
00671         ROSCPP_LOG_DEBUG("Error creating outgoing transport for [%s:%d]", host.c_str(), port);
00672         return false;
00673       }
00674       connection_manager_->udprosIncomingConnection(transport, h);
00675 
00676       XmlRpcValue udpros_params;
00677       udpros_params[0] = string("UDPROS");
00678       udpros_params[1] = network::getHost();
00679       udpros_params[2] = connection_manager_->getUDPServerTransport()->getServerPort();
00680       udpros_params[3] = conn_id;
00681       udpros_params[4] = max_datagram_size;
00682       m["topic"] = topic;
00683       m["md5sum"] = pub_ptr->getMD5Sum();
00684       m["type"] = pub_ptr->getDataType();
00685       m["callerid"] = this_node::getName();
00686       m["message_definition"] = pub_ptr->getMessageDefinition();
00687       boost::shared_array<uint8_t> msg_def_buffer;
00688       uint32_t len;
00689       Header::write(m, msg_def_buffer, len);
00690       XmlRpcValue v(msg_def_buffer.get(), len);
00691       udpros_params[5] = v;
00692       ret[0] = int(1);
00693       ret[1] = string();
00694       ret[2] = udpros_params;
00695       return true;
00696     }
00697     else
00698     {
00699       ROSCPP_LOG_DEBUG( "an unsupported protocol was offered: [%s]",
00700           proto_name.c_str());
00701     }
00702   }
00703 
00704   ROSCPP_LOG_DEBUG( "Currently, roscpp only supports TCPROS. The caller to " \
00705              "requestTopic did not support TCPROS, so there are no " \
00706              "protocols in common.");
00707   return false;
00708 }
00709 
00710 void TopicManager::publish(const std::string& topic, const boost::function<SerializedMessage(void)>& serfunc, SerializedMessage& m)
00711 {
00712   boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00713 
00714   if (isShuttingDown())
00715   {
00716     return;
00717   }
00718 
00719   PublicationPtr p = lookupPublicationWithoutLock(topic);
00720   if (p->hasSubscribers() || p->isLatching())
00721   {
00722     ROS_DEBUG_NAMED("superdebug", "Publishing message on topic [%s] with sequence number [%d]", p->getName().c_str(), p->getSequence());
00723 
00724     // Determine what kinds of subscribers we're publishing to.  If they're intraprocess with the same C++ type we can
00725     // do a no-copy publish.
00726     bool nocopy = false;
00727     bool serialize = false;
00728 
00729     // We can only do a no-copy publish if a shared_ptr to the message is provided, and we have type information for it
00730     if (m.type_info && m.message)
00731     {
00732       p->getPublishTypes(serialize, nocopy, *m.type_info);
00733     }
00734     else
00735     {
00736       serialize = true;
00737     }
00738 
00739     if (!nocopy)
00740     {
00741       m.message.reset();
00742       m.type_info = 0;
00743     }
00744 
00745     if (serialize || p->isLatching())
00746     {
00747       SerializedMessage m2 = serfunc();
00748       m.buf = m2.buf;
00749       m.num_bytes = m2.num_bytes;
00750       m.message_start = m2.message_start;
00751     }
00752 
00753     p->publish(m);
00754 
00755     // If we're not doing a serialized publish we don't need to signal the pollset.  The write()
00756     // call inside signal() is actually relatively expensive when doing a nocopy publish.
00757     if (serialize)
00758     {
00759       poll_manager_->getPollSet().signal();
00760     }
00761   }
00762   else
00763   {
00764     p->incrementSequence();
00765   }
00766 }
00767 
00768 void TopicManager::incrementSequence(const std::string& topic)
00769 {
00770   PublicationPtr pub = lookupPublication(topic);
00771   if (pub)
00772   {
00773     pub->incrementSequence();
00774   }
00775 }
00776 
00777 bool TopicManager::isLatched(const std::string& topic)
00778 {
00779   PublicationPtr pub = lookupPublication(topic);
00780   if (pub)
00781   {
00782     return pub->isLatched();
00783   }
00784 
00785   return false;
00786 }
00787 
00788 PublicationPtr TopicManager::lookupPublicationWithoutLock(const string &topic)
00789 {
00790   PublicationPtr t;
00791   for (V_Publication::iterator i = advertised_topics_.begin();
00792        !t && i != advertised_topics_.end(); ++i)
00793   {
00794     if (((*i)->getName() == topic) && (!(*i)->isDropped()))
00795     {
00796       t = *i;
00797       break;
00798     }
00799   }
00800 
00801   return t;
00802 }
00803 
00804 bool TopicManager::unsubscribe(const std::string &topic, const SubscriptionCallbackHelperPtr& helper)
00805 {
00806   SubscriptionPtr sub;
00807 
00808   {
00809     boost::mutex::scoped_lock lock(subs_mutex_);
00810 
00811     if (isShuttingDown())
00812     {
00813       return false;
00814     }
00815 
00816     L_Subscription::iterator it;
00817     for (it = subscriptions_.begin();
00818          it != subscriptions_.end(); ++it)
00819     {
00820       if ((*it)->getName() == topic)
00821       {
00822         sub = *it;
00823         break;
00824       }
00825     }
00826   }
00827 
00828   if (!sub)
00829   {
00830     return false;
00831   }
00832 
00833   sub->removeCallback(helper);
00834 
00835   if (sub->getNumCallbacks() == 0)
00836   {
00837     // nobody is left. blow away the subscription.
00838     {
00839       boost::mutex::scoped_lock lock(subs_mutex_);
00840 
00841       L_Subscription::iterator it;
00842       for (it = subscriptions_.begin();
00843            it != subscriptions_.end(); ++it)
00844       {
00845         if ((*it)->getName() == topic)
00846         {
00847           subscriptions_.erase(it);
00848           break;
00849         }
00850       }
00851 
00852       if (!unregisterSubscriber(topic))
00853       {
00854         ROSCPP_LOG_DEBUG("Couldn't unregister subscriber for topic [%s]", topic.c_str());
00855       }
00856     }
00857 
00858     sub->shutdown();
00859     return true;
00860   }
00861 
00862   return true;
00863 }
00864 
00865 size_t TopicManager::getNumSubscribers(const std::string &topic)
00866 {
00867   boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00868 
00869   if (isShuttingDown())
00870   {
00871     return 0;
00872   }
00873 
00874   PublicationPtr p = lookupPublicationWithoutLock(topic);
00875   if (p)
00876   {
00877     return p->getNumSubscribers();
00878   }
00879 
00880   return 0;
00881 }
00882 
00883 size_t TopicManager::getNumSubscriptions()
00884 {
00885   boost::mutex::scoped_lock lock(subs_mutex_);
00886   return subscriptions_.size();
00887 }
00888 
00889 size_t TopicManager::getNumPublishers(const std::string &topic)
00890 {
00891   boost::mutex::scoped_lock lock(subs_mutex_);
00892 
00893   if (isShuttingDown())
00894   {
00895     return 0;
00896   }
00897 
00898   for (L_Subscription::const_iterator t = subscriptions_.begin();
00899        t != subscriptions_.end(); ++t)
00900   {
00901     if (!(*t)->isDropped() && (*t)->getName() == topic)
00902     {
00903       return (*t)->getNumPublishers();
00904     }
00905   }
00906 
00907   return 0;
00908 }
00909 
00910 void TopicManager::getBusStats(XmlRpcValue &stats)
00911 {
00912   XmlRpcValue publish_stats, subscribe_stats, service_stats;
00913   // force these guys to be arrays, even if we don't populate them
00914   publish_stats.setSize(0);
00915   subscribe_stats.setSize(0);
00916   service_stats.setSize(0);
00917 
00918   uint32_t pidx = 0;
00919   {
00920     boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00921     for (V_Publication::iterator t = advertised_topics_.begin();
00922          t != advertised_topics_.end(); ++t)
00923     {
00924       publish_stats[pidx++] = (*t)->getStats();
00925     }
00926   }
00927 
00928   {
00929     uint32_t sidx = 0;
00930 
00931     boost::mutex::scoped_lock lock(subs_mutex_);
00932     for (L_Subscription::iterator t = subscriptions_.begin(); t != subscriptions_.end(); ++t)
00933     {
00934       subscribe_stats[sidx++] = (*t)->getStats();
00935     }
00936   }
00937 
00938   stats[0] = publish_stats;
00939   stats[1] = subscribe_stats;
00940   stats[2] = service_stats;
00941 }
00942 
00943 void TopicManager::getBusInfo(XmlRpcValue &info)
00944 {
00945   // force these guys to be arrays, even if we don't populate them
00946   info.setSize(0);
00947 
00948   {
00949     boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00950 
00951     for (V_Publication::iterator t = advertised_topics_.begin();
00952          t != advertised_topics_.end(); ++t)
00953     {
00954       (*t)->getInfo(info);
00955     }
00956   }
00957 
00958   {
00959     boost::mutex::scoped_lock lock(subs_mutex_);
00960 
00961     for (L_Subscription::iterator t = subscriptions_.begin(); t != subscriptions_.end(); ++t)
00962     {
00963       (*t)->getInfo(info);
00964     }
00965   }
00966 }
00967 
00968 void TopicManager::getSubscriptions(XmlRpcValue &subs)
00969 {
00970   // force these guys to be arrays, even if we don't populate them
00971   subs.setSize(0);
00972 
00973   {
00974     boost::mutex::scoped_lock lock(subs_mutex_);
00975 
00976     uint32_t sidx = 0;
00977 
00978     for (L_Subscription::iterator t = subscriptions_.begin(); t != subscriptions_.end(); ++t)
00979     {
00980       XmlRpcValue sub;
00981       sub[0] = (*t)->getName();
00982       sub[1] = (*t)->datatype();
00983       subs[sidx++] = sub;
00984     }
00985   }
00986 }
00987 
00988 void TopicManager::getPublications(XmlRpcValue &pubs)
00989 {
00990   // force these guys to be arrays, even if we don't populate them
00991   pubs.setSize(0);
00992 
00993   {
00994     boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00995 
00996     uint32_t sidx = 0;
00997 
00998     for (V_Publication::iterator t = advertised_topics_.begin();
00999          t != advertised_topics_.end(); ++t)
01000     {
01001       XmlRpcValue pub;
01002       pub[0] = (*t)->getName();
01003       pub[1] = (*t)->getDataType();
01004       pubs[sidx++] = pub;
01005     }
01006 
01007   }
01008 }
01009 
01010 extern std::string console::g_last_error_message;
01011 
01012 void TopicManager::pubUpdateCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result)
01013 {
01014   std::vector<std::string> pubs;
01015   for (int idx = 0; idx < params[2].size(); idx++)
01016   {
01017     pubs.push_back(params[2][idx]);
01018   }
01019   if (pubUpdate(params[1], pubs))
01020   {
01021     result = xmlrpc::responseInt(1, "", 0);
01022   }
01023   else
01024   {
01025     result = xmlrpc::responseInt(0, console::g_last_error_message, 0);
01026   }
01027 }
01028 
01029 void TopicManager::requestTopicCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result)
01030 {
01031   if (!requestTopic(params[1], params[2], result))
01032   {
01033     result = xmlrpc::responseInt(0, console::g_last_error_message, 0);
01034   }
01035 }
01036 
01037 void TopicManager::getBusStatsCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result)
01038 {
01039   (void)params;
01040   result[0] = 1;
01041   result[1] = std::string("");
01042   XmlRpcValue response;
01043   getBusStats(result);
01044   result[2] = response;
01045 }
01046 
01047 void TopicManager::getBusInfoCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result)
01048 {
01049   (void)params;
01050   result[0] = 1;
01051   result[1] = std::string("");
01052   XmlRpcValue response;
01053   getBusInfo(response);
01054   result[2] = response;
01055 }
01056 
01057 void TopicManager::getSubscriptionsCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result)
01058 {
01059   (void)params;
01060   result[0] = 1;
01061   result[1] = std::string("subscriptions");
01062   XmlRpcValue response;
01063   getSubscriptions(response);
01064   result[2] = response;
01065 }
01066 
01067 void TopicManager::getPublicationsCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result)
01068 {
01069   (void)params;
01070   result[0] = 1;
01071   result[1] = std::string("publications");
01072   XmlRpcValue response;
01073   getPublications(response);
01074   result[2] = response;
01075 }
01076 
01077 } // namespace ros


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Thu Jun 6 2019 21:10:05