topic_manager.cpp
Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2009, Willow Garage, Inc.
00003  *
00004  * Redistribution and use in source and binary forms, with or without
00005  * modification, are permitted provided that the following conditions are met:
00006  *   * Redistributions of source code must retain the above copyright notice,
00007  *     this list of conditions and the following disclaimer.
00008  *   * Redistributions in binary form must reproduce the above copyright
00009  *     notice, this list of conditions and the following disclaimer in the
00010  *     documentation and/or other materials provided with the distribution.
00011  *   * Neither the names of Willow Garage, Inc. nor the names of its
00012  *     contributors may be used to endorse or promote products derived from
00013  *     this software without specific prior written permission.
00014  *
00015  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00016  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00017  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00018  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00019  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00020  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00021  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00022  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00023  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00024  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00025  * POSSIBILITY OF SUCH DAMAGE.
00026  */
00027 
00028 #include "ros/topic_manager.h"
00029 #include "ros/xmlrpc_manager.h"
00030 #include "ros/connection_manager.h"
00031 #include "ros/poll_manager.h"
00032 #include "ros/publication.h"
00033 #include "ros/subscription.h"
00034 #include "ros/this_node.h"
00035 #include "ros/network.h"
00036 #include "ros/master.h"
00037 #include "ros/transport/transport_tcp.h"
00038 #include "ros/transport/transport_udp.h"
00039 #include "ros/rosout_appender.h"
00040 #include "ros/init.h"
00041 #include "ros/file_log.h"
00042 #include "ros/subscribe_options.h"
00043 
00044 #include "XmlRpc.h"
00045 
00046 #include <ros/console.h>
00047 
00048 using namespace XmlRpc; // A battle to be fought later
00049 using namespace std; // sigh
00050 
00052 
00053 namespace ros
00054 {
00055 
00056 TopicManagerPtr g_topic_manager;
00057 boost::mutex g_topic_manager_mutex;
00058 const TopicManagerPtr& TopicManager::instance()
00059 {
00060   if (!g_topic_manager)
00061   {
00062     boost::mutex::scoped_lock lock(g_topic_manager_mutex);
00063     if (!g_topic_manager)
00064     {
00065       g_topic_manager.reset(new TopicManager);
00066     }
00067   }
00068 
00069   return g_topic_manager;
00070 }
00071 
00072 TopicManager::TopicManager()
00073 : shutting_down_(false)
00074 {
00075 }
00076 
00077 TopicManager::~TopicManager()
00078 {
00079   shutdown();
00080 }
00081 
00082 void TopicManager::start()
00083 {
00084   boost::mutex::scoped_lock shutdown_lock(shutting_down_mutex_);
00085   shutting_down_ = false;
00086 
00087   poll_manager_ = PollManager::instance();
00088   connection_manager_ = ConnectionManager::instance();
00089   xmlrpc_manager_ = XMLRPCManager::instance();
00090 
00091   xmlrpc_manager_->bind("publisherUpdate", boost::bind(&TopicManager::pubUpdateCallback, this, _1, _2));
00092   xmlrpc_manager_->bind("requestTopic", boost::bind(&TopicManager::requestTopicCallback, this, _1, _2));
00093   xmlrpc_manager_->bind("getBusStats", boost::bind(&TopicManager::getBusStatsCallback, this, _1, _2));
00094   xmlrpc_manager_->bind("getBusInfo", boost::bind(&TopicManager::getBusInfoCallback, this, _1, _2));
00095   xmlrpc_manager_->bind("getSubscriptions", boost::bind(&TopicManager::getSubscriptionsCallback, this, _1, _2));
00096   xmlrpc_manager_->bind("getPublications", boost::bind(&TopicManager::getPublicationsCallback, this, _1, _2));
00097 
00098   poll_manager_->addPollThreadListener(boost::bind(&TopicManager::processPublishQueues, this));
00099 }
00100 
00101 void TopicManager::shutdown()
00102 {
00103   boost::mutex::scoped_lock shutdown_lock(shutting_down_mutex_);
00104   if (shutting_down_)
00105   {
00106     return;
00107   }
00108 
00109   {
00110     boost::recursive_mutex::scoped_lock lock1(advertised_topics_mutex_);
00111     boost::mutex::scoped_lock lock2(subs_mutex_);
00112     shutting_down_ = true;
00113   }
00114 
00115   xmlrpc_manager_->unbind("publisherUpdate");
00116   xmlrpc_manager_->unbind("requestTopic");
00117   xmlrpc_manager_->unbind("getBusStats");
00118   xmlrpc_manager_->unbind("getBusInfo");
00119   xmlrpc_manager_->unbind("getSubscriptions");
00120   xmlrpc_manager_->unbind("getPublications");
00121 
00122   ROSCPP_LOG_DEBUG("Shutting down topics...");
00123   ROSCPP_LOG_DEBUG("  shutting down publishers");
00124   {
00125     boost::recursive_mutex::scoped_lock adv_lock(advertised_topics_mutex_);
00126 
00127     for (V_Publication::iterator i = advertised_topics_.begin();
00128          i != advertised_topics_.end(); ++i)
00129     {
00130       if(!(*i)->isDropped())
00131       {
00132         unregisterPublisher((*i)->getName());
00133       }
00134       (*i)->drop();
00135     }
00136     advertised_topics_.clear();
00137   }
00138 
00139   // unregister all of our subscriptions
00140   ROSCPP_LOG_DEBUG("  shutting down subscribers");
00141   {
00142     boost::mutex::scoped_lock subs_lock(subs_mutex_);
00143 
00144     for (L_Subscription::iterator s = subscriptions_.begin(); s != subscriptions_.end(); ++s)
00145     {
00146       // Remove us as a subscriber from the master
00147       unregisterSubscriber((*s)->getName());
00148       // now, drop our side of the connection
00149       (*s)->shutdown();
00150     }
00151     subscriptions_.clear();
00152   }
00153 }
00154 
00155 void TopicManager::processPublishQueues()
00156 {
00157   boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00158 
00159   V_Publication::iterator it = advertised_topics_.begin();
00160   V_Publication::iterator end = advertised_topics_.end();
00161   for (; it != end; ++it)
00162   {
00163     const PublicationPtr& pub = *it;
00164     pub->processPublishQueue();
00165   }
00166 }
00167 
00168 void TopicManager::getAdvertisedTopics(V_string& topics)
00169 {
00170   boost::mutex::scoped_lock lock(advertised_topic_names_mutex_);
00171 
00172   topics.resize(advertised_topic_names_.size());
00173   std::copy(advertised_topic_names_.begin(),
00174             advertised_topic_names_.end(),
00175             topics.begin());
00176 }
00177 
00178 void TopicManager::getSubscribedTopics(V_string& topics)
00179 {
00180   boost::mutex::scoped_lock lock(subs_mutex_);
00181 
00182   topics.reserve(subscriptions_.size());
00183   L_Subscription::const_iterator it = subscriptions_.begin();
00184   L_Subscription::const_iterator end = subscriptions_.end();
00185   for (; it != end; ++it)
00186   {
00187     const SubscriptionPtr& sub = *it;
00188     topics.push_back(sub->getName());
00189   }
00190 }
00191 
00192 PublicationPtr TopicManager::lookupPublication(const std::string& topic)
00193 {
00194   boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00195 
00196   return lookupPublicationWithoutLock(topic);
00197 }
00198 
00199 bool md5sumsMatch(const std::string& lhs, const std::string& rhs)
00200 {
00201   return lhs == "*" || rhs == "*" || lhs == rhs;
00202 }
00203 
00204 bool TopicManager::addSubCallback(const SubscribeOptions& ops)
00205 {
00206   // spin through the subscriptions and see if we find a match. if so, use it.
00207   bool found = false;
00208   bool found_topic = false;
00209 
00210   SubscriptionPtr sub;
00211 
00212   {
00213     if (isShuttingDown())
00214     {
00215       return false;
00216     }
00217 
00218     for (L_Subscription::iterator s = subscriptions_.begin();
00219          s != subscriptions_.end() && !found; ++s)
00220     {
00221       sub = *s;
00222       if (!sub->isDropped() && sub->getName() == ops.topic)
00223       {
00224         found_topic = true;
00225         if (md5sumsMatch(ops.md5sum, sub->md5sum()))
00226         {
00227           found = true;
00228         }
00229         break;
00230       }
00231     }
00232   }
00233 
00234   if (found_topic && !found)
00235   {
00236     std::stringstream ss;
00237     ss << "Tried to subscribe to a topic with the same name but different md5sum as a topic that was already subscribed [" << ops.datatype << "/" << ops.md5sum << " vs. " << sub->datatype() << "/" << sub->md5sum() << "]";
00238     throw ConflictingSubscriptionException(ss.str());
00239   }
00240   else if (found)
00241   {
00242     if (!sub->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object, ops.allow_concurrent_callbacks))
00243     {
00244       return false;
00245     }
00246   }
00247 
00248   return found;
00249 }
00250 
00251 // this function has the subscription code that doesn't need to be templated.
00252 bool TopicManager::subscribe(const SubscribeOptions& ops)
00253 {
00254   boost::mutex::scoped_lock lock(subs_mutex_);
00255 
00256   if (addSubCallback(ops))
00257   {
00258     return true;
00259   }
00260 
00261   if (isShuttingDown())
00262   {
00263     return false;
00264   }
00265 
00266   if (ops.md5sum.empty())
00267   {
00268     throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] with an empty md5sum");
00269   }
00270 
00271   if (ops.datatype.empty())
00272   {
00273     throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] with an empty datatype");
00274   }
00275 
00276   if (!ops.helper)
00277   {
00278     throw InvalidParameterException("Subscribing to topic [" + ops.topic + "] without a callback");
00279   }
00280 
00281   const std::string& md5sum = ops.md5sum;
00282   std::string datatype = ops.datatype;
00283 
00284   SubscriptionPtr s(new Subscription(ops.topic, md5sum, datatype, ops.transport_hints));
00285   s->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object, ops.allow_concurrent_callbacks);
00286 
00287   if (!registerSubscriber(s, ops.datatype))
00288   {
00289     ROS_WARN("couldn't register subscriber on topic [%s]", ops.topic.c_str());
00290     s->shutdown();
00291     return false;
00292   }
00293 
00294   subscriptions_.push_back(s);
00295 
00296   return true;
00297 }
00298 
00299 bool TopicManager::advertise(const AdvertiseOptions& ops, const SubscriberCallbacksPtr& callbacks)
00300 {
00301   if (ops.datatype == "*")
00302   {
00303     std::stringstream ss;
00304     ss << "Advertising with * as the datatype is not allowed.  Topic [" << ops.topic << "]";
00305     throw InvalidParameterException(ss.str());
00306   }
00307 
00308   if (ops.md5sum == "*")
00309   {
00310     std::stringstream ss;
00311     ss << "Advertising with * as the md5sum is not allowed.  Topic [" << ops.topic << "]";
00312     throw InvalidParameterException(ss.str());
00313   }
00314 
00315   if (ops.md5sum.empty())
00316   {
00317     throw InvalidParameterException("Advertising on topic [" + ops.topic + "] with an empty md5sum");
00318   }
00319 
00320   if (ops.datatype.empty())
00321   {
00322     throw InvalidParameterException("Advertising on topic [" + ops.topic + "] with an empty datatype");
00323   }
00324 
00325   if (ops.message_definition.empty())
00326   {
00327     ROS_WARN("Advertising on topic [%s] with an empty message definition.  Some tools (e.g. rosbag) may not work correctly.", ops.topic.c_str());
00328   }
00329 
00330   PublicationPtr pub;
00331 
00332   {
00333     boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00334 
00335     if (isShuttingDown())
00336     {
00337       return false;
00338     }
00339 
00340     pub = lookupPublicationWithoutLock(ops.topic);
00341     if (pub && pub->getNumCallbacks() == 0)
00342     {
00343       pub.reset();
00344     }
00345 
00346     if (pub)
00347     {
00348       if (pub->getMD5Sum() != ops.md5sum)
00349       {
00350         ROS_ERROR("Tried to advertise on topic [%s] with md5sum [%s] and datatype [%s], but the topic is already advertised as md5sum [%s] and datatype [%s]",
00351                   ops.topic.c_str(), ops.md5sum.c_str(), ops.datatype.c_str(), pub->getMD5Sum().c_str(), pub->getDataType().c_str());
00352         return false;
00353       }
00354 
00355       pub->addCallbacks(callbacks);
00356 
00357       return true;
00358     }
00359 
00360     pub = PublicationPtr(new Publication(ops.topic, ops.datatype, ops.md5sum, ops.message_definition, ops.queue_size, ops.latch, ops.has_header));
00361     pub->addCallbacks(callbacks);
00362     advertised_topics_.push_back(pub);
00363   }
00364 
00365 
00366   {
00367     boost::mutex::scoped_lock lock(advertised_topic_names_mutex_);
00368     advertised_topic_names_.push_back(ops.topic);
00369   }
00370 
00371   // Check whether we've already subscribed to this topic.  If so, we'll do
00372   // the self-subscription here, to avoid the deadlock that would happen if
00373   // the ROS thread later got the publisherUpdate with its own XMLRPC URI.
00374   // The assumption is that advertise() is called from somewhere other
00375   // than the ROS thread.
00376   bool found = false;
00377   SubscriptionPtr sub;
00378   {
00379     boost::mutex::scoped_lock lock(subs_mutex_);
00380 
00381     for (L_Subscription::iterator s = subscriptions_.begin();
00382          s != subscriptions_.end() && !found; ++s)
00383     {
00384       if ((*s)->getName() == ops.topic && md5sumsMatch((*s)->md5sum(), ops.md5sum) && !(*s)->isDropped())
00385       {
00386         found = true;
00387         sub = *s;
00388         break;
00389       }
00390     }
00391   }
00392 
00393   if(found)
00394   {
00395     sub->addLocalConnection(pub);
00396   }
00397 
00398   XmlRpcValue args, result, payload;
00399   args[0] = this_node::getName();
00400   args[1] = ops.topic;
00401   args[2] = ops.datatype;
00402   args[3] = xmlrpc_manager_->getServerURI();
00403   master::execute("registerPublisher", args, result, payload, true);
00404 
00405   return true;
00406 }
00407 
00408 bool TopicManager::unadvertise(const std::string &topic, const SubscriberCallbacksPtr& callbacks)
00409 {
00410   PublicationPtr pub;
00411   V_Publication::iterator i;
00412   {
00413     boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00414 
00415     if (isShuttingDown())
00416     {
00417       return false;
00418     }
00419 
00420     for (i = advertised_topics_.begin();
00421          i != advertised_topics_.end(); ++i)
00422     {
00423       if(((*i)->getName() == topic) && (!(*i)->isDropped()))
00424       {
00425         pub = *i;
00426         break;
00427       }
00428     }
00429   }
00430 
00431   if (!pub)
00432   {
00433     return false;
00434   }
00435 
00436   pub->removeCallbacks(callbacks);
00437 
00438   {
00439     boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00440     if (pub->getNumCallbacks() == 0)
00441     {
00442       unregisterPublisher(pub->getName());
00443       pub->drop();
00444 
00445       advertised_topics_.erase(i);
00446 
00447       {
00448         boost::mutex::scoped_lock lock(advertised_topic_names_mutex_);
00449         advertised_topic_names_.remove(pub->getName());
00450       }
00451     }
00452   }
00453 
00454   return true;
00455 }
00456 
00457 bool TopicManager::unregisterPublisher(const std::string& topic)
00458 {
00459   XmlRpcValue args, result, payload;
00460   args[0] = this_node::getName();
00461   args[1] = topic;
00462   args[2] = xmlrpc_manager_->getServerURI();
00463   master::execute("unregisterPublisher", args, result, payload, false);
00464 
00465   return true;
00466 }
00467 
00468 bool TopicManager::isTopicAdvertised(const string &topic)
00469 {
00470   for (V_Publication::iterator t = advertised_topics_.begin(); t != advertised_topics_.end(); ++t)
00471   {
00472     if (((*t)->getName() == topic) && (!(*t)->isDropped()))
00473     {
00474       return true;
00475     }
00476   }
00477 
00478   return false;
00479 }
00480 
00481 bool TopicManager::registerSubscriber(const SubscriptionPtr& s, const string &datatype)
00482 {
00483   XmlRpcValue args, result, payload;
00484   args[0] = this_node::getName();
00485   args[1] = s->getName();
00486   args[2] = datatype;
00487   args[3] = xmlrpc_manager_->getServerURI();
00488 
00489   if (!master::execute("registerSubscriber", args, result, payload, true))
00490   {
00491     return false;
00492   }
00493 
00494   vector<string> pub_uris;
00495   for (int i = 0; i < payload.size(); i++)
00496   {
00497     if (payload[i] != xmlrpc_manager_->getServerURI())
00498     {
00499       pub_uris.push_back(string(payload[i]));
00500     }
00501   }
00502 
00503   bool self_subscribed = false;
00504   PublicationPtr pub;
00505   const std::string& sub_md5sum = s->md5sum();
00506   // Figure out if we have a local publisher
00507   {
00508     boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00509     V_Publication::const_iterator it = advertised_topics_.begin();
00510     V_Publication::const_iterator end = advertised_topics_.end();
00511     for (; it != end; ++it)
00512     {
00513       pub = *it;
00514       const std::string& pub_md5sum = pub->getMD5Sum();
00515 
00516       if (pub->getName() == s->getName() && !pub->isDropped())
00517         {
00518           if (!md5sumsMatch(pub_md5sum, sub_md5sum))
00519             {
00520               ROS_ERROR("md5sum mismatch making local subscription to topic %s.",
00521                         s->getName().c_str());
00522               ROS_ERROR("Subscriber expects type %s, md5sum %s",
00523                         s->datatype().c_str(), s->md5sum().c_str());
00524               ROS_ERROR("Publisher provides type %s, md5sum %s",
00525                         pub->getDataType().c_str(), pub->getMD5Sum().c_str());
00526               return false;
00527             }
00528 
00529           self_subscribed = true;
00530           break;
00531         }
00532     }
00533   }
00534 
00535   s->pubUpdate(pub_uris);
00536   if (self_subscribed)
00537   {
00538     s->addLocalConnection(pub);
00539   }
00540 
00541   return true;
00542 }
00543 
00544 bool TopicManager::unregisterSubscriber(const string &topic)
00545 {
00546   XmlRpcValue args, result, payload;
00547   args[0] = this_node::getName();
00548   args[1] = topic;
00549   args[2] = xmlrpc_manager_->getServerURI();
00550 
00551   master::execute("unregisterSubscriber", args, result, payload, false);
00552 
00553   return true;
00554 }
00555 
00556 bool TopicManager::pubUpdate(const string &topic, const vector<string> &pubs)
00557 {
00558   SubscriptionPtr sub;
00559   {
00560     boost::mutex::scoped_lock lock(subs_mutex_);
00561 
00562     if (isShuttingDown())
00563     {
00564       return false;
00565     }
00566 
00567     ROS_DEBUG("Received update for topic [%s] (%d publishers)", topic.c_str(), (int)pubs.size());
00568     // find the subscription
00569     for (L_Subscription::const_iterator s  = subscriptions_.begin();
00570                                             s != subscriptions_.end(); ++s)
00571     {
00572       if ((*s)->getName() != topic || (*s)->isDropped())
00573         continue;
00574 
00575       sub = *s;
00576       break;
00577     }
00578 
00579   }
00580 
00581   if (sub)
00582   {
00583     return sub->pubUpdate(pubs);
00584   }
00585   else
00586   {
00587     ROSCPP_LOG_DEBUG("got a request for updating publishers of topic %s, but I " \
00588               "don't have any subscribers to that topic.", topic.c_str());
00589   }
00590 
00591   return false;
00592 }
00593 
00594 bool TopicManager::requestTopic(const string &topic,
00595                          XmlRpcValue &protos,
00596                          XmlRpcValue &ret)
00597 {
00598   for (int proto_idx = 0; proto_idx < protos.size(); proto_idx++)
00599   {
00600     XmlRpcValue proto = protos[proto_idx]; // save typing
00601     if (proto.getType() != XmlRpcValue::TypeArray)
00602     {
00603         ROSCPP_LOG_DEBUG( "requestTopic protocol list was not a list of lists");
00604       return false;
00605     }
00606 
00607     if (proto[0].getType() != XmlRpcValue::TypeString)
00608     {
00609         ROSCPP_LOG_DEBUG( "requestTopic received a protocol list in which a sublist " \
00610                  "did not start with a string");
00611       return false;
00612     }
00613 
00614     string proto_name = proto[0];
00615     if (proto_name == string("TCPROS"))
00616     {
00617       XmlRpcValue tcpros_params;
00618       tcpros_params[0] = string("TCPROS");
00619       tcpros_params[1] = network::getHost();
00620       tcpros_params[2] = int(connection_manager_->getTCPPort());
00621       ret[0] = int(1);
00622       ret[1] = string();
00623       ret[2] = tcpros_params;
00624       return true;
00625     }
00626     else if (proto_name == string("UDPROS"))
00627     {
00628       if (proto.size() != 5 ||
00629           proto[1].getType() != XmlRpcValue::TypeBase64 ||
00630           proto[2].getType() != XmlRpcValue::TypeString ||
00631           proto[3].getType() != XmlRpcValue::TypeInt ||
00632           proto[4].getType() != XmlRpcValue::TypeInt)
00633       {
00634         ROSCPP_LOG_DEBUG("Invalid protocol parameters for UDPROS");
00635         return false;
00636       }
00637       std::vector<char> header_bytes = proto[1];
00638       boost::shared_array<uint8_t> buffer = boost::shared_array<uint8_t>(new uint8_t[header_bytes.size()]);
00639       memcpy(buffer.get(), &header_bytes[0], header_bytes.size());
00640       Header h;
00641       string err;
00642       if (!h.parse(buffer, header_bytes.size(), err))
00643       {
00644         ROSCPP_LOG_DEBUG("Unable to parse UDPROS connection header: %s", err.c_str());
00645         return false;
00646       }
00647 
00648       PublicationPtr pub_ptr = lookupPublication(topic);
00649       if(!pub_ptr)
00650       {
00651         ROSCPP_LOG_DEBUG("Unable to find advertised topic %s for UDPROS connection", topic.c_str());
00652         return false;
00653       }
00654 
00655       std::string host = proto[2];
00656       int port = proto[3];
00657 
00658       M_string m;
00659       std::string error_msg;
00660       if (!pub_ptr->validateHeader(h, error_msg))
00661       {
00662         ROSCPP_LOG_DEBUG("Error validating header from [%s:%d] for topic [%s]: %s", host.c_str(), port, topic.c_str(), error_msg.c_str());
00663         return false;
00664       }
00665 
00666       int max_datagram_size = proto[4];
00667       int conn_id = connection_manager_->getNewConnectionID();
00668       TransportUDPPtr transport = connection_manager_->getUDPServerTransport()->createOutgoing(host, port, conn_id, max_datagram_size);
00669       connection_manager_->udprosIncomingConnection(transport, h);
00670 
00671       XmlRpcValue udpros_params;
00672       udpros_params[0] = string("UDPROS");
00673       udpros_params[1] = network::getHost();
00674       udpros_params[2] = connection_manager_->getUDPServerTransport()->getServerPort();
00675       udpros_params[3] = conn_id;
00676       udpros_params[4] = max_datagram_size;
00677       m["topic"] = topic;
00678       m["md5sum"] = pub_ptr->getMD5Sum();
00679       m["type"] = pub_ptr->getDataType();
00680       m["callerid"] = this_node::getName();
00681       m["message_definition"] = pub_ptr->getMessageDefinition();
00682       boost::shared_array<uint8_t> msg_def_buffer;
00683       uint32_t len;
00684       Header::write(m, msg_def_buffer, len);
00685       XmlRpcValue v(msg_def_buffer.get(), len);
00686       udpros_params[5] = v;
00687       ret[0] = int(1);
00688       ret[1] = string();
00689       ret[2] = udpros_params;
00690       return true;
00691     }
00692     else
00693     {
00694       ROSCPP_LOG_DEBUG( "an unsupported protocol was offered: [%s]",
00695           proto_name.c_str());
00696     }
00697   }
00698 
00699   ROSCPP_LOG_DEBUG( "Currently, roscpp only supports TCPROS. The caller to " \
00700              "requestTopic did not support TCPROS, so there are no " \
00701              "protocols in common.");
00702   return false;
00703 }
00704 
00705 void TopicManager::publish(const std::string& topic, const boost::function<SerializedMessage(void)>& serfunc, SerializedMessage& m)
00706 {
00707   boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00708 
00709   if (isShuttingDown())
00710   {
00711     return;
00712   }
00713 
00714   PublicationPtr p = lookupPublicationWithoutLock(topic);
00715   if (p->hasSubscribers() || p->isLatching())
00716   {
00717     ROS_DEBUG_NAMED("superdebug", "Publishing message on topic [%s] with sequence number [%d]", p->getName().c_str(), p->getSequence());
00718 
00719     // Determine what kinds of subscribers we're publishing to.  If they're intraprocess with the same C++ type we can
00720     // do a no-copy publish.
00721     bool nocopy = false;
00722     bool serialize = false;
00723 
00724     // We can only do a no-copy publish if a shared_ptr to the message is provided, and we have type information for it
00725     if (m.type_info && m.message)
00726     {
00727       p->getPublishTypes(serialize, nocopy, *m.type_info);
00728     }
00729     else
00730     {
00731       serialize = true;
00732     }
00733 
00734     if (!nocopy)
00735     {
00736       m.message.reset();
00737       m.type_info = 0;
00738     }
00739 
00740     if (serialize || p->isLatching())
00741     {
00742       SerializedMessage m2 = serfunc();
00743       m.buf = m2.buf;
00744       m.num_bytes = m2.num_bytes;
00745       m.message_start = m2.message_start;
00746     }
00747 
00748     p->publish(m);
00749 
00750     // If we're not doing a serialized publish we don't need to signal the pollset.  The write()
00751     // call inside signal() is actually relatively expensive when doing a nocopy publish.
00752     if (serialize)
00753     {
00754       poll_manager_->getPollSet().signal();
00755     }
00756   }
00757   else
00758   {
00759     p->incrementSequence();
00760   }
00761 }
00762 
00763 void TopicManager::incrementSequence(const std::string& topic)
00764 {
00765   PublicationPtr pub = lookupPublication(topic);
00766   if (pub)
00767   {
00768     pub->incrementSequence();
00769   }
00770 }
00771 
00772 bool TopicManager::isLatched(const std::string& topic)
00773 {
00774   PublicationPtr pub = lookupPublication(topic);
00775   if (pub)
00776   {
00777     return pub->isLatched();
00778   }
00779 
00780   return false;
00781 }
00782 
00783 PublicationPtr TopicManager::lookupPublicationWithoutLock(const string &topic)
00784 {
00785   PublicationPtr t;
00786   for (V_Publication::iterator i = advertised_topics_.begin();
00787        !t && i != advertised_topics_.end(); ++i)
00788   {
00789     if (((*i)->getName() == topic) && (!(*i)->isDropped()))
00790     {
00791       t = *i;
00792       break;
00793     }
00794   }
00795 
00796   return t;
00797 }
00798 
00799 bool TopicManager::unsubscribe(const std::string &topic, const SubscriptionCallbackHelperPtr& helper)
00800 {
00801   SubscriptionPtr sub;
00802 
00803   {
00804     boost::mutex::scoped_lock lock(subs_mutex_);
00805 
00806     if (isShuttingDown())
00807     {
00808       return false;
00809     }
00810 
00811     L_Subscription::iterator it;
00812     for (it = subscriptions_.begin();
00813          it != subscriptions_.end(); ++it)
00814     {
00815       if ((*it)->getName() == topic)
00816       {
00817         sub = *it;
00818         break;
00819       }
00820     }
00821   }
00822 
00823   if (!sub)
00824   {
00825     return false;
00826   }
00827 
00828   sub->removeCallback(helper);
00829 
00830   if (sub->getNumCallbacks() == 0)
00831   {
00832     // nobody is left. blow away the subscription.
00833     {
00834       boost::mutex::scoped_lock lock(subs_mutex_);
00835 
00836       L_Subscription::iterator it;
00837       for (it = subscriptions_.begin();
00838            it != subscriptions_.end(); ++it)
00839       {
00840         if ((*it)->getName() == topic)
00841         {
00842           subscriptions_.erase(it);
00843           break;
00844         }
00845       }
00846 
00847       if (!unregisterSubscriber(topic))
00848       {
00849         ROSCPP_LOG_DEBUG("Couldn't unregister subscriber for topic [%s]", topic.c_str());
00850       }
00851     }
00852 
00853     sub->shutdown();
00854     return true;
00855   }
00856 
00857   return true;
00858 }
00859 
00860 size_t TopicManager::getNumSubscribers(const std::string &topic)
00861 {
00862   boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00863 
00864   if (isShuttingDown())
00865   {
00866     return 0;
00867   }
00868 
00869   PublicationPtr p = lookupPublicationWithoutLock(topic);
00870   if (p)
00871   {
00872     return p->getNumSubscribers();
00873   }
00874 
00875   return 0;
00876 }
00877 
00878 size_t TopicManager::getNumSubscriptions()
00879 {
00880   boost::mutex::scoped_lock lock(subs_mutex_);
00881   return subscriptions_.size();
00882 }
00883 
00884 size_t TopicManager::getNumPublishers(const std::string &topic)
00885 {
00886   boost::mutex::scoped_lock lock(subs_mutex_);
00887 
00888   if (isShuttingDown())
00889   {
00890     return 0;
00891   }
00892 
00893   for (L_Subscription::const_iterator t = subscriptions_.begin();
00894        t != subscriptions_.end(); ++t)
00895   {
00896     if (!(*t)->isDropped() && (*t)->getName() == topic)
00897     {
00898       return (*t)->getNumPublishers();
00899     }
00900   }
00901 
00902   return 0;
00903 }
00904 
00905 void TopicManager::getBusStats(XmlRpcValue &stats)
00906 {
00907   XmlRpcValue publish_stats, subscribe_stats, service_stats;
00908   // force these guys to be arrays, even if we don't populate them
00909   publish_stats.setSize(0);
00910   subscribe_stats.setSize(0);
00911   service_stats.setSize(0);
00912 
00913   uint32_t pidx = 0;
00914   {
00915     boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00916     for (V_Publication::iterator t = advertised_topics_.begin();
00917          t != advertised_topics_.end(); ++t)
00918     {
00919       publish_stats[pidx++] = (*t)->getStats();
00920     }
00921   }
00922 
00923   {
00924     uint32_t sidx = 0;
00925 
00926     boost::mutex::scoped_lock lock(subs_mutex_);
00927     for (L_Subscription::iterator t = subscriptions_.begin(); t != subscriptions_.end(); ++t)
00928     {
00929       subscribe_stats[sidx++] = (*t)->getStats();
00930     }
00931   }
00932 
00933   stats[0] = publish_stats;
00934   stats[1] = subscribe_stats;
00935   stats[2] = service_stats;
00936 }
00937 
00938 void TopicManager::getBusInfo(XmlRpcValue &info)
00939 {
00940   // force these guys to be arrays, even if we don't populate them
00941   info.setSize(0);
00942 
00943   {
00944     boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00945 
00946     for (V_Publication::iterator t = advertised_topics_.begin();
00947          t != advertised_topics_.end(); ++t)
00948     {
00949       (*t)->getInfo(info);
00950     }
00951   }
00952 
00953   {
00954     boost::mutex::scoped_lock lock(subs_mutex_);
00955 
00956     for (L_Subscription::iterator t = subscriptions_.begin(); t != subscriptions_.end(); ++t)
00957     {
00958       (*t)->getInfo(info);
00959     }
00960   }
00961 }
00962 
00963 void TopicManager::getSubscriptions(XmlRpcValue &subs)
00964 {
00965   // force these guys to be arrays, even if we don't populate them
00966   subs.setSize(0);
00967 
00968   {
00969     boost::mutex::scoped_lock lock(subs_mutex_);
00970 
00971     uint32_t sidx = 0;
00972 
00973     for (L_Subscription::iterator t = subscriptions_.begin(); t != subscriptions_.end(); ++t)
00974     {
00975       XmlRpcValue sub;
00976       sub[0] = (*t)->getName();
00977       sub[1] = (*t)->datatype();
00978       subs[sidx++] = sub;
00979     }
00980   }
00981 }
00982 
00983 void TopicManager::getPublications(XmlRpcValue &pubs)
00984 {
00985   // force these guys to be arrays, even if we don't populate them
00986   pubs.setSize(0);
00987 
00988   {
00989     boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
00990 
00991     uint32_t sidx = 0;
00992 
00993     for (V_Publication::iterator t = advertised_topics_.begin();
00994          t != advertised_topics_.end(); ++t)
00995     {
00996       XmlRpcValue pub;
00997       pub[0] = (*t)->getName();
00998       pub[1] = (*t)->getDataType();
00999       pubs[sidx++] = pub;
01000     }
01001 
01002   }
01003 }
01004 
01005 extern std::string console::g_last_error_message;
01006 
01007 void TopicManager::pubUpdateCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result)
01008 {
01009   std::vector<std::string> pubs;
01010   for (int idx = 0; idx < params[2].size(); idx++)
01011   {
01012     pubs.push_back(params[2][idx]);
01013   }
01014   if (pubUpdate(params[1], pubs))
01015   {
01016     result = xmlrpc::responseInt(1, "", 0);
01017   }
01018   else
01019   {
01020     result = xmlrpc::responseInt(0, console::g_last_error_message, 0);
01021   }
01022 }
01023 
01024 void TopicManager::requestTopicCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result)
01025 {
01026   if (!requestTopic(params[1], params[2], result))
01027   {
01028     result = xmlrpc::responseInt(0, console::g_last_error_message, 0);
01029   }
01030 }
01031 
01032 void TopicManager::getBusStatsCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result)
01033 {
01034   result[0] = 1;
01035   result[1] = std::string("");
01036   XmlRpcValue response;
01037   getBusStats(result);
01038   result[2] = response;
01039 }
01040 
01041 void TopicManager::getBusInfoCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result)
01042 {
01043   result[0] = 1;
01044   result[1] = std::string("");
01045   XmlRpcValue response;
01046   getBusInfo(response);
01047   result[2] = response;
01048 }
01049 
01050 void TopicManager::getSubscriptionsCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result)
01051 {
01052   result[0] = 1;
01053   result[1] = std::string("subscriptions");
01054   XmlRpcValue response;
01055   getSubscriptions(response);
01056   result[2] = response;
01057 }
01058 
01059 void TopicManager::getPublicationsCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result)
01060 {
01061   result[0] = 1;
01062   result[1] = std::string("publications");
01063   XmlRpcValue response;
01064   getPublications(response);
01065   result[2] = response;
01066 }
01067 
01068 } // namespace ros


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Fri Aug 28 2015 12:33:11