subscription.cpp
Go to the documentation of this file.
00001 /*
00002  * Software License Agreement (BSD License)
00003  *
00004  *  Copyright (c) 2008, Willow Garage, Inc.
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions
00009  *  are met:
00010  *
00011  *   * Redistributions of source code must retain the above copyright
00012  *     notice, this list of conditions and the following disclaimer.
00013  *   * Redistributions in binary form must reproduce the above
00014  *     copyright notice, this list of conditions and the following
00015  *     disclaimer in the documentation and/or other materials provided
00016  *     with the distribution.
00017  *   * Neither the name of Willow Garage, Inc. nor the names of its
00018  *     contributors may be used to endorse or promote products derived
00019  *     from this software without specific prior written permission.
00020  *
00021  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024  *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025  *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027  *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028  *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029  *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030  *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031  *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032  *  POSSIBILITY OF SUCH DAMAGE.
00033  */
00034 
00035 #include <sstream>
00036 #include <fcntl.h>
00037 #include <cerrno>
00038 #include <cstring>
00039 #include <typeinfo>
00040 
00041 #include "ros/common.h"
00042 #include "ros/io.h"
00043 #include "ros/subscription.h"
00044 #include "ros/publication.h"
00045 #include "ros/transport_publisher_link.h"
00046 #include "ros/intraprocess_publisher_link.h"
00047 #include "ros/intraprocess_subscriber_link.h"
00048 #include "ros/connection.h"
00049 #include "ros/transport/transport_tcp.h"
00050 #include "ros/transport/transport_udp.h"
00051 #include "ros/callback_queue_interface.h"
00052 #include "ros/this_node.h"
00053 #include "ros/network.h"
00054 #include "ros/poll_manager.h"
00055 #include "ros/connection_manager.h"
00056 #include "ros/message_deserializer.h"
00057 #include "ros/subscription_queue.h"
00058 #include "ros/file_log.h"
00059 #include "ros/transport_hints.h"
00060 #include "ros/subscription_callback_helper.h"
00061 
00062 #include <boost/make_shared.hpp>
00063 
00064 using XmlRpc::XmlRpcValue;
00065 
00066 namespace ros
00067 {
00068 
00069 Subscription::Subscription(const std::string &name, const std::string& md5sum, const std::string& datatype, const TransportHints& transport_hints)
00070 : name_(name)
00071 , md5sum_(md5sum)
00072 , datatype_(datatype)
00073 , nonconst_callbacks_(0)
00074 , dropped_(false)
00075 , shutting_down_(false)
00076 , transport_hints_(transport_hints)
00077 {
00078 }
00079 
00080 Subscription::~Subscription()
00081 {
00082   pending_connections_.clear();
00083   callbacks_.clear();
00084 }
00085 
00086 void Subscription::shutdown()
00087 {
00088   {
00089     boost::mutex::scoped_lock lock(shutdown_mutex_);
00090     shutting_down_ = true;
00091   }
00092 
00093   drop();
00094 }
00095 
00096 XmlRpcValue Subscription::getStats()
00097 {
00098   XmlRpcValue stats;
00099   stats[0] = name_;
00100   XmlRpcValue conn_data;
00101   conn_data.setSize(0);
00102 
00103   boost::mutex::scoped_lock lock(publisher_links_mutex_);
00104 
00105   uint32_t cidx = 0;
00106   for (V_PublisherLink::iterator c = publisher_links_.begin();
00107        c != publisher_links_.end(); ++c)
00108   {
00109     const PublisherLink::Stats& s = (*c)->getStats();
00110     conn_data[cidx][0] = (*c)->getConnectionID();
00111     conn_data[cidx][1] = (int)s.bytes_received_;
00112     conn_data[cidx][2] = (int)s.messages_received_;
00113     conn_data[cidx][3] = (int)s.drops_;
00114     conn_data[cidx][4] = 0; // figure out something for this. not sure.
00115   }
00116 
00117   stats[1] = conn_data;
00118   return stats;
00119 }
00120 
00121 // rospy returns values like this:
00122 // (1, 'http://127.0.0.1:62365/', 'i', 'TCPROS', '/chatter')
00123 //
00124 // We're outputting something like this:
00125 // (0, http://127.0.0.1:62438/, i, TCPROS, /chatter)
00126 void Subscription::getInfo(XmlRpc::XmlRpcValue& info)
00127 {
00128   boost::mutex::scoped_lock lock(publisher_links_mutex_);
00129 
00130   for (V_PublisherLink::iterator c = publisher_links_.begin();
00131        c != publisher_links_.end(); ++c)
00132   {
00133     XmlRpcValue curr_info;
00134     curr_info[0] = (int)(*c)->getConnectionID();
00135     curr_info[1] = (*c)->getPublisherXMLRPCURI();
00136     curr_info[2] = "i";
00137     curr_info[3] = (*c)->getTransportType();
00138     curr_info[4] = name_;
00139     info[info.size()] = curr_info;
00140   }
00141 }
00142 
00143 uint32_t Subscription::getNumPublishers()
00144 {
00145         boost::mutex::scoped_lock lock(publisher_links_mutex_);
00146         return (uint32_t)publisher_links_.size();
00147 }
00148 
00149 void Subscription::drop()
00150 {
00151   if (!dropped_)
00152   {
00153     dropped_ = true;
00154 
00155     dropAllConnections();
00156   }
00157 }
00158 
00159 void Subscription::dropAllConnections()
00160 {
00161   // Swap our subscribers list with a local one so we can only lock for a short period of time, because a
00162   // side effect of our calling drop() on connections can be re-locking the subscribers mutex
00163   V_PublisherLink localsubscribers;
00164 
00165   {
00166     boost::mutex::scoped_lock lock(publisher_links_mutex_);
00167 
00168     localsubscribers.swap(publisher_links_);
00169   }
00170 
00171   V_PublisherLink::iterator it = localsubscribers.begin();
00172   V_PublisherLink::iterator end = localsubscribers.end();
00173   for (;it != end; ++it)
00174   {
00175     (*it)->drop();
00176   }
00177 }
00178 
00179 void Subscription::addLocalConnection(const PublicationPtr& pub)
00180 {
00181   boost::mutex::scoped_lock lock(publisher_links_mutex_);
00182   if (dropped_)
00183   {
00184     return;
00185   }
00186 
00187   ROSCPP_LOG_DEBUG("Creating intraprocess link for topic [%s]", name_.c_str());
00188 
00189   IntraProcessPublisherLinkPtr pub_link(new IntraProcessPublisherLink(shared_from_this(), XMLRPCManager::instance()->getServerURI(), transport_hints_));
00190   IntraProcessSubscriberLinkPtr sub_link(new IntraProcessSubscriberLink(pub));
00191   pub_link->setPublisher(sub_link);
00192   sub_link->setSubscriber(pub_link);
00193 
00194   addPublisherLink(pub_link);
00195   pub->addSubscriberLink(sub_link);
00196 }
00197 
00198 bool urisEqual(const std::string& uri1, const std::string& uri2)
00199 {
00200   std::string host1, host2;
00201   uint32_t port1 = 0, port2 = 0;
00202   network::splitURI(uri1, host1, port1);
00203   network::splitURI(uri2, host2, port2);
00204   return port1 == port2 && host1 == host2;
00205 }
00206 
00207 bool Subscription::pubUpdate(const V_string& new_pubs)
00208 {
00209   boost::mutex::scoped_lock lock(shutdown_mutex_);
00210 
00211   if (shutting_down_ || dropped_)
00212   {
00213     return false;
00214   }
00215 
00216   bool retval = true;
00217 
00218   {
00219     std::stringstream ss;
00220 
00221     for (V_string::const_iterator up_i = new_pubs.begin();
00222          up_i != new_pubs.end(); ++up_i)
00223     {
00224       ss << *up_i << ", ";
00225     }
00226 
00227     ss << " already have these connections: ";
00228     for (V_PublisherLink::iterator spc = publisher_links_.begin();
00229          spc!= publisher_links_.end(); ++spc)
00230     {
00231       ss << (*spc)->getPublisherXMLRPCURI() << ", ";
00232     }
00233 
00234     boost::mutex::scoped_lock lock(pending_connections_mutex_);
00235     S_PendingConnection::iterator it = pending_connections_.begin();
00236     S_PendingConnection::iterator end = pending_connections_.end();
00237     for (; it != end; ++it)
00238     {
00239       ss << (*it)->getRemoteURI() << ", ";
00240     }
00241 
00242     ROSCPP_LOG_DEBUG("Publisher update for [%s]: %s", name_.c_str(), ss.str().c_str());
00243   }
00244 
00245   V_string additions;
00246   V_PublisherLink subtractions;
00247   V_PublisherLink to_add;
00248   // could use the STL set operations... but these sets are so small
00249   // it doesn't really matter.
00250   {
00251     boost::mutex::scoped_lock lock(publisher_links_mutex_);
00252 
00253     for (V_PublisherLink::iterator spc = publisher_links_.begin();
00254          spc!= publisher_links_.end(); ++spc)
00255     {
00256       bool found = false;
00257       for (V_string::const_iterator up_i = new_pubs.begin();
00258            !found && up_i != new_pubs.end(); ++up_i)
00259       {
00260         if (urisEqual((*spc)->getPublisherXMLRPCURI(), *up_i))
00261         {
00262           found = true;
00263           break;
00264         }
00265       }
00266 
00267       if (!found)
00268       {
00269         subtractions.push_back(*spc);
00270       }
00271     }
00272 
00273     for (V_string::const_iterator up_i  = new_pubs.begin(); up_i != new_pubs.end(); ++up_i)
00274     {
00275       bool found = false;
00276       for (V_PublisherLink::iterator spc = publisher_links_.begin();
00277            !found && spc != publisher_links_.end(); ++spc)
00278       {
00279         if (urisEqual(*up_i, (*spc)->getPublisherXMLRPCURI()))
00280         {
00281           found = true;
00282           break;
00283         }
00284       }
00285 
00286       if (!found)
00287       {
00288         boost::mutex::scoped_lock lock(pending_connections_mutex_);
00289         S_PendingConnection::iterator it = pending_connections_.begin();
00290         S_PendingConnection::iterator end = pending_connections_.end();
00291         for (; it != end; ++it)
00292         {
00293           if (urisEqual(*up_i, (*it)->getRemoteURI()))
00294           {
00295             found = true;
00296             break;
00297           }
00298         }
00299       }
00300 
00301       if (!found)
00302       {
00303         additions.push_back(*up_i);
00304       }
00305     }
00306   }
00307 
00308   for (V_PublisherLink::iterator i = subtractions.begin(); i != subtractions.end(); ++i)
00309   {
00310         const PublisherLinkPtr& link = *i;
00311     if (link->getPublisherXMLRPCURI() != XMLRPCManager::instance()->getServerURI())
00312     {
00313       ROSCPP_LOG_DEBUG("Disconnecting from publisher [%s] of topic [%s] at [%s]",
00314                         link->getCallerID().c_str(), name_.c_str(), link->getPublisherXMLRPCURI().c_str());
00315                   link->drop();
00316           }
00317           else
00318           {
00319                   ROSCPP_LOG_DEBUG("Disconnect: skipping myself for topic [%s]", name_.c_str());
00320           }
00321         }
00322 
00323   for (V_string::iterator i = additions.begin();
00324             i != additions.end(); ++i)
00325   {
00326     // this function should never negotiate a self-subscription
00327     if (XMLRPCManager::instance()->getServerURI() != *i)
00328     {
00329       retval &= negotiateConnection(*i);
00330     }
00331     else
00332     {
00333       ROSCPP_LOG_DEBUG("Skipping myself (%s, %s)", name_.c_str(), XMLRPCManager::instance()->getServerURI().c_str());
00334     }
00335   }
00336 
00337   return retval;
00338 }
00339 
00340 bool Subscription::negotiateConnection(const std::string& xmlrpc_uri)
00341 {
00342   XmlRpcValue tcpros_array, protos_array, params;
00343   XmlRpcValue udpros_array;
00344   TransportUDPPtr udp_transport;
00345   int protos = 0;
00346   V_string transports = transport_hints_.getTransports();
00347   if (transports.empty())
00348   {
00349     transport_hints_.reliable();
00350     transports = transport_hints_.getTransports();
00351   }
00352   for (V_string::const_iterator it = transports.begin();
00353        it != transports.end();
00354        ++it)
00355   {
00356     if (*it == "UDP")
00357     {
00358       int max_datagram_size = transport_hints_.getMaxDatagramSize();
00359       udp_transport = TransportUDPPtr(new TransportUDP(&PollManager::instance()->getPollSet()));
00360       if (!max_datagram_size)
00361         max_datagram_size = udp_transport->getMaxDatagramSize();
00362       udp_transport->createIncoming(0, false);
00363       udpros_array[0] = "UDPROS";
00364       M_string m;
00365       m["topic"] = getName();
00366       m["md5sum"] = md5sum();
00367       m["callerid"] = this_node::getName();
00368       m["type"] = datatype();
00369       boost::shared_array<uint8_t> buffer;
00370       uint32_t len;
00371       Header::write(m, buffer, len);
00372       XmlRpcValue v(buffer.get(), len);
00373       udpros_array[1] = v;
00374       udpros_array[2] = network::getHost();
00375       udpros_array[3] = udp_transport->getServerPort();
00376       udpros_array[4] = max_datagram_size;
00377 
00378       protos_array[protos++] = udpros_array;
00379     }
00380     else if (*it == "TCP")
00381     {
00382       tcpros_array[0] = std::string("TCPROS");
00383       protos_array[protos++] = tcpros_array;
00384     }
00385     else
00386     {
00387       ROS_WARN("Unsupported transport type hinted: %s, skipping", it->c_str());
00388     }
00389   }
00390   params[0] = this_node::getName();
00391   params[1] = name_;
00392   params[2] = protos_array;
00393   std::string peer_host;
00394   uint32_t peer_port;
00395   if (!network::splitURI(xmlrpc_uri, peer_host, peer_port))
00396   {
00397     ROS_ERROR("Bad xml-rpc URI: [%s]", xmlrpc_uri.c_str());
00398     return false;
00399   }
00400 
00401   XmlRpc::XmlRpcClient* c = new XmlRpc::XmlRpcClient(peer_host.c_str(),
00402                                                      peer_port, "/");
00403  // if (!c.execute("requestTopic", params, result) || !g_node->validateXmlrpcResponse("requestTopic", result, proto))
00404 
00405   // Initiate the negotiation.  We'll come back and check on it later.
00406   if (!c->executeNonBlock("requestTopic", params))
00407   {
00408     ROSCPP_LOG_DEBUG("Failed to contact publisher [%s:%d] for topic [%s]",
00409               peer_host.c_str(), peer_port, name_.c_str());
00410     delete c;
00411     if (udp_transport)
00412     {
00413       udp_transport->close();
00414     }
00415 
00416     return false;
00417   }
00418 
00419   ROSCPP_LOG_DEBUG("Began asynchronous xmlrpc connection to [%s:%d]", peer_host.c_str(), peer_port);
00420 
00421   // The PendingConnectionPtr takes ownership of c, and will delete it on
00422   // destruction.
00423   PendingConnectionPtr conn(new PendingConnection(c, udp_transport, shared_from_this(), xmlrpc_uri));
00424 
00425   XMLRPCManager::instance()->addASyncConnection(conn);
00426   // Put this connection on the list that we'll look at later.
00427   {
00428     boost::mutex::scoped_lock pending_connections_lock(pending_connections_mutex_);
00429     pending_connections_.insert(conn);
00430   }
00431 
00432   return true;
00433 }
00434 
00435 void closeTransport(const TransportUDPPtr& trans)
00436 {
00437   if (trans)
00438   {
00439     trans->close();
00440   }
00441 }
00442 
00443 void Subscription::pendingConnectionDone(const PendingConnectionPtr& conn, XmlRpcValue& result)
00444 {
00445   boost::mutex::scoped_lock lock(shutdown_mutex_);
00446   if (shutting_down_ || dropped_)
00447   {
00448     return;
00449   }
00450 
00451   {
00452     boost::mutex::scoped_lock pending_connections_lock(pending_connections_mutex_);
00453     pending_connections_.erase(conn);
00454   }
00455 
00456   TransportUDPPtr udp_transport;
00457 
00458   std::string peer_host = conn->getClient()->getHost();
00459   uint32_t peer_port = conn->getClient()->getPort();
00460   std::stringstream ss;
00461   ss << "http://" << peer_host << ":" << peer_port << "/";
00462   std::string xmlrpc_uri = ss.str();
00463   udp_transport = conn->getUDPTransport();
00464 
00465   XmlRpc::XmlRpcValue proto;
00466   if(!XMLRPCManager::instance()->validateXmlrpcResponse("requestTopic", result, proto))
00467   {
00468         ROSCPP_LOG_DEBUG("Failed to contact publisher [%s:%d] for topic [%s]",
00469               peer_host.c_str(), peer_port, name_.c_str());
00470         closeTransport(udp_transport);
00471         return;
00472   }
00473 
00474   if (proto.size() == 0)
00475   {
00476         ROSCPP_LOG_DEBUG("Couldn't agree on any common protocols with [%s] for topic [%s]", xmlrpc_uri.c_str(), name_.c_str());
00477         closeTransport(udp_transport);
00478         return;
00479   }
00480 
00481   if (proto.getType() != XmlRpcValue::TypeArray)
00482   {
00483         ROSCPP_LOG_DEBUG("Available protocol info returned from %s is not a list.", xmlrpc_uri.c_str());
00484         closeTransport(udp_transport);
00485         return;
00486   }
00487   if (proto[0].getType() != XmlRpcValue::TypeString)
00488   {
00489         ROSCPP_LOG_DEBUG("Available protocol info list doesn't have a string as its first element.");
00490         closeTransport(udp_transport);
00491         return;
00492   }
00493 
00494   std::string proto_name = proto[0];
00495   if (proto_name == "TCPROS")
00496   {
00497     if (proto.size() != 3 ||
00498         proto[1].getType() != XmlRpcValue::TypeString ||
00499         proto[2].getType() != XmlRpcValue::TypeInt)
00500     {
00501         ROSCPP_LOG_DEBUG("publisher implements TCPROS, but the " \
00502                 "parameters aren't string,int");
00503       return;
00504     }
00505     std::string pub_host = proto[1];
00506     int pub_port = proto[2];
00507     ROSCPP_LOG_DEBUG("Connecting via tcpros to topic [%s] at host [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
00508 
00509     TransportTCPPtr transport(new TransportTCP(&PollManager::instance()->getPollSet()));
00510     if (transport->connect(pub_host, pub_port))
00511     {
00512       ConnectionPtr connection(new Connection());
00513       TransportPublisherLinkPtr pub_link(new TransportPublisherLink(shared_from_this(), xmlrpc_uri, transport_hints_));
00514 
00515       connection->initialize(transport, false, HeaderReceivedFunc());
00516       pub_link->initialize(connection);
00517 
00518       ConnectionManager::instance()->addConnection(connection);
00519 
00520       boost::mutex::scoped_lock lock(publisher_links_mutex_);
00521       addPublisherLink(pub_link);
00522 
00523       ROSCPP_LOG_DEBUG("Connected to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
00524     }
00525     else
00526     {
00527         ROSCPP_LOG_DEBUG("Failed to connect to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
00528     }
00529   }
00530   else if (proto_name == "UDPROS")
00531   {
00532     if (proto.size() != 6 ||
00533         proto[1].getType() != XmlRpcValue::TypeString ||
00534         proto[2].getType() != XmlRpcValue::TypeInt ||
00535         proto[3].getType() != XmlRpcValue::TypeInt ||
00536         proto[4].getType() != XmlRpcValue::TypeInt ||
00537         proto[5].getType() != XmlRpcValue::TypeBase64)
00538     {
00539       ROSCPP_LOG_DEBUG("publisher implements UDPROS, but the " \
00540                        "parameters aren't string,int,int,int,base64");
00541       closeTransport(udp_transport);
00542       return;
00543     }
00544     std::string pub_host = proto[1];
00545     int pub_port = proto[2];
00546     int conn_id = proto[3];
00547     int max_datagram_size = proto[4];
00548     std::vector<char> header_bytes = proto[5];
00549     boost::shared_array<uint8_t> buffer = boost::shared_array<uint8_t>(new uint8_t[header_bytes.size()]);
00550     memcpy(buffer.get(), &header_bytes[0], header_bytes.size());
00551     Header h;
00552     std::string err;
00553     if (!h.parse(buffer, header_bytes.size(), err))
00554     {
00555       ROSCPP_LOG_DEBUG("Unable to parse UDPROS connection header: %s", err.c_str());
00556       closeTransport(udp_transport);
00557       return;
00558     }
00559     ROSCPP_LOG_DEBUG("Connecting via udpros to topic [%s] at host [%s:%d] connection id [%08x] max_datagram_size [%d]", name_.c_str(), pub_host.c_str(), pub_port, conn_id, max_datagram_size);
00560 
00561     std::string error_msg;
00562     if (h.getValue("error", error_msg))
00563     {
00564       ROSCPP_LOG_DEBUG("Received error message in header for connection to [%s]: [%s]", xmlrpc_uri.c_str(), error_msg.c_str());
00565       closeTransport(udp_transport);
00566       return;
00567     }
00568 
00569     TransportPublisherLinkPtr pub_link(new TransportPublisherLink(shared_from_this(), xmlrpc_uri, transport_hints_));
00570     if (pub_link->setHeader(h))
00571     {
00572       ConnectionPtr connection(new Connection());
00573       connection->initialize(udp_transport, false, NULL);
00574       connection->setHeader(h);
00575       pub_link->initialize(connection);
00576 
00577       ConnectionManager::instance()->addConnection(connection);
00578 
00579       boost::mutex::scoped_lock lock(publisher_links_mutex_);
00580       addPublisherLink(pub_link);
00581 
00582       ROSCPP_LOG_DEBUG("Connected to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
00583     }
00584     else
00585     {
00586       ROSCPP_LOG_DEBUG("Failed to connect to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
00587       closeTransport(udp_transport);
00588       return;
00589     }
00590   }
00591   else
00592   {
00593         ROSCPP_LOG_DEBUG("Publisher offered unsupported transport [%s]", proto_name.c_str());
00594   }
00595 }
00596 
00597 uint32_t Subscription::handleMessage(const SerializedMessage& m, bool ser, bool nocopy, const boost::shared_ptr<M_string>& connection_header, const PublisherLinkPtr& link)
00598 {
00599   boost::mutex::scoped_lock lock(callbacks_mutex_);
00600 
00601   uint32_t drops = 0;
00602 
00603   // Cache the deserializers by type info.  If all the subscriptions are the same type this has the same performance as before.  If
00604   // there are subscriptions with different C++ type (but same ROS message type), this now works correctly rather than passing
00605   // garbage to the messages with different C++ types than the first one.
00606   cached_deserializers_.clear();
00607 
00608   ros::Time receipt_time = ros::Time::now();
00609 
00610   for (V_CallbackInfo::iterator cb = callbacks_.begin();
00611        cb != callbacks_.end(); ++cb)
00612   {
00613     const CallbackInfoPtr& info = *cb;
00614 
00615     ROS_ASSERT(info->callback_queue_);
00616 
00617     const std::type_info* ti = &info->helper_->getTypeInfo();
00618 
00619     if ((nocopy && m.type_info && *ti == *m.type_info) || (ser && (!m.type_info || *ti != *m.type_info)))
00620     {
00621       MessageDeserializerPtr deserializer;
00622 
00623       V_TypeAndDeserializer::iterator des_it = cached_deserializers_.begin();
00624       V_TypeAndDeserializer::iterator des_end = cached_deserializers_.end();
00625       for (; des_it != des_end; ++des_it)
00626       {
00627         if (*des_it->first == *ti)
00628         {
00629           deserializer = des_it->second;
00630           break;
00631         }
00632       }
00633 
00634       if (!deserializer)
00635       {
00636         deserializer = boost::make_shared<MessageDeserializer>(info->helper_, m, connection_header);
00637         cached_deserializers_.push_back(std::make_pair(ti, deserializer));
00638       }
00639 
00640       bool was_full = false;
00641       bool nonconst_need_copy = false;
00642       if (callbacks_.size() > 1)
00643       {
00644         nonconst_need_copy = true;
00645       }
00646 
00647       info->subscription_queue_->push(info->helper_, deserializer, info->has_tracked_object_, info->tracked_object_, nonconst_need_copy, receipt_time, &was_full);
00648 
00649       if (was_full)
00650       {
00651         ++drops;
00652       }
00653       else
00654       {
00655         info->callback_queue_->addCallback(info->subscription_queue_, (uint64_t)info.get());
00656       }
00657     }
00658   }
00659 
00660   // If this link is latched, store off the message so we can immediately pass it to new subscribers later
00661   if (link->isLatched())
00662   {
00663     LatchInfo li;
00664     li.connection_header = connection_header;
00665     li.link = link;
00666     li.message = m;
00667     li.receipt_time = receipt_time;
00668     latched_messages_[link] = li;
00669   }
00670 
00671   cached_deserializers_.clear();
00672 
00673   return drops;
00674 }
00675 
00676 bool Subscription::addCallback(const SubscriptionCallbackHelperPtr& helper, const std::string& md5sum, CallbackQueueInterface* queue, int32_t queue_size, const VoidConstPtr& tracked_object, bool allow_concurrent_callbacks)
00677 {
00678   ROS_ASSERT(helper);
00679   ROS_ASSERT(queue);
00680 
00681   // Decay to a real type as soon as we have a subscriber with a real type
00682   {
00683     boost::mutex::scoped_lock lock(md5sum_mutex_);
00684     if (md5sum_ == "*" && md5sum != "*")
00685     {
00686 
00687       md5sum_ = md5sum;
00688     }
00689   }
00690 
00691   if (md5sum != "*" && md5sum != this->md5sum())
00692   {
00693     return false;
00694   }
00695 
00696   {
00697     boost::mutex::scoped_lock lock(callbacks_mutex_);
00698 
00699     CallbackInfoPtr info(new CallbackInfo);
00700     info->helper_ = helper;
00701     info->callback_queue_ = queue;
00702     info->subscription_queue_.reset(new SubscriptionQueue(name_, queue_size, allow_concurrent_callbacks));
00703     info->tracked_object_ = tracked_object;
00704     info->has_tracked_object_ = false;
00705     if (tracked_object)
00706     {
00707       info->has_tracked_object_ = true;
00708     }
00709 
00710     if (!helper->isConst())
00711     {
00712       ++nonconst_callbacks_;
00713     }
00714 
00715     callbacks_.push_back(info);
00716     cached_deserializers_.reserve(callbacks_.size());
00717 
00718     // if we have any latched links, we need to immediately schedule callbacks
00719     if (!latched_messages_.empty())
00720     {
00721       boost::mutex::scoped_lock lock(publisher_links_mutex_);
00722 
00723       V_PublisherLink::iterator it = publisher_links_.begin();
00724       V_PublisherLink::iterator end = publisher_links_.end();
00725       for (; it != end;++it)
00726       {
00727         const PublisherLinkPtr& link = *it;
00728         if (link->isLatched())
00729         {
00730           M_PublisherLinkToLatchInfo::iterator des_it = latched_messages_.find(link);
00731           if (des_it != latched_messages_.end())
00732           {
00733             const LatchInfo& latch_info = des_it->second;
00734 
00735             MessageDeserializerPtr des(new MessageDeserializer(helper, latch_info.message, latch_info.connection_header));
00736             bool was_full = false;
00737             info->subscription_queue_->push(info->helper_, des, info->has_tracked_object_, info->tracked_object_, true, latch_info.receipt_time, &was_full);
00738             if (!was_full)
00739             {
00740               info->callback_queue_->addCallback(info->subscription_queue_, (uint64_t)info.get());
00741             }
00742           }
00743         }
00744       }
00745     }
00746   }
00747 
00748   return true;
00749 }
00750 
00751 void Subscription::removeCallback(const SubscriptionCallbackHelperPtr& helper)
00752 {
00753   CallbackInfoPtr info;
00754   {
00755     boost::mutex::scoped_lock cbs_lock(callbacks_mutex_);
00756     for (V_CallbackInfo::iterator it = callbacks_.begin();
00757          it != callbacks_.end(); ++it)
00758     {
00759       if ((*it)->helper_ == helper)
00760       {
00761         info = *it;
00762         callbacks_.erase(it);
00763 
00764         if (!helper->isConst())
00765         {
00766           --nonconst_callbacks_;
00767         }
00768 
00769         break;
00770       }
00771     }
00772   }
00773 
00774   if (info)
00775   {
00776     info->subscription_queue_->clear();
00777     info->callback_queue_->removeByID((uint64_t)info.get());
00778   }
00779 }
00780 
00781 void Subscription::headerReceived(const PublisherLinkPtr& link, const Header& h)
00782 {
00783   boost::mutex::scoped_lock lock(md5sum_mutex_);
00784   if (md5sum_ == "*")
00785   {
00786     md5sum_ = link->getMD5Sum();
00787   }
00788 }
00789 
00790 void Subscription::addPublisherLink(const PublisherLinkPtr& link)
00791 {
00792   publisher_links_.push_back(link);
00793 }
00794 
00795 void Subscription::removePublisherLink(const PublisherLinkPtr& pub_link)
00796 {
00797   boost::mutex::scoped_lock lock(publisher_links_mutex_);
00798 
00799   V_PublisherLink::iterator it = std::find(publisher_links_.begin(), publisher_links_.end(), pub_link);
00800   if (it != publisher_links_.end())
00801   {
00802     publisher_links_.erase(it);
00803   }
00804 
00805   if (pub_link->isLatched())
00806   {
00807     latched_messages_.erase(pub_link);
00808   }
00809 }
00810 
00811 void Subscription::getPublishTypes(bool& ser, bool& nocopy, const std::type_info& ti)
00812 {
00813   boost::mutex::scoped_lock lock(callbacks_mutex_);
00814   for (V_CallbackInfo::iterator cb = callbacks_.begin();
00815        cb != callbacks_.end(); ++cb)
00816   {
00817     const CallbackInfoPtr& info = *cb;
00818     if (info->helper_->getTypeInfo() == ti)
00819     {
00820       nocopy = true;
00821     }
00822     else
00823     {
00824       ser = true;
00825     }
00826 
00827     if (nocopy && ser)
00828     {
00829       return;
00830     }
00831   }
00832 }
00833 
00834 const std::string Subscription::datatype()
00835 {
00836   return datatype_;
00837 }
00838 
00839 const std::string Subscription::md5sum()
00840 {
00841   boost::mutex::scoped_lock lock(md5sum_mutex_);
00842   return md5sum_;
00843 }
00844 
00845 }


roscpp
Author(s): Morgan Quigley mquigley@cs.stanford.edu, Josh Faust jfaust@willowgarage.com, Brian Gerkey gerkey@willowgarage.com, Troy Straszheim straszheim@willowgarage.com
autogenerated on Sat Dec 28 2013 17:35:52