subscription.cpp
Go to the documentation of this file.
00001 /*
00002  * Software License Agreement (BSD License)
00003  *
00004  *  Copyright (c) 2008, Willow Garage, Inc.
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions
00009  *  are met:
00010  *
00011  *   * Redistributions of source code must retain the above copyright
00012  *     notice, this list of conditions and the following disclaimer.
00013  *   * Redistributions in binary form must reproduce the above
00014  *     copyright notice, this list of conditions and the following
00015  *     disclaimer in the documentation and/or other materials provided
00016  *     with the distribution.
00017  *   * Neither the name of Willow Garage, Inc. nor the names of its
00018  *     contributors may be used to endorse or promote products derived
00019  *     from this software without specific prior written permission.
00020  *
00021  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024  *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025  *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027  *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028  *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029  *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030  *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031  *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032  *  POSSIBILITY OF SUCH DAMAGE.
00033  */
00034 
00035 #include <sstream>
00036 #include <fcntl.h>
00037 #include <cerrno>
00038 #include <cstring>
00039 #include <typeinfo>
00040 
00041 #include "ros/common.h"
00042 #include "ros/io.h"
00043 #include "ros/subscription.h"
00044 #include "ros/publication.h"
00045 #include "ros/transport_publisher_link.h"
00046 #include "ros/intraprocess_publisher_link.h"
00047 #include "ros/intraprocess_subscriber_link.h"
00048 #include "ros/connection.h"
00049 #include "ros/transport/transport_tcp.h"
00050 #include "ros/transport/transport_udp.h"
00051 #include "ros/callback_queue_interface.h"
00052 #include "ros/this_node.h"
00053 #include "ros/network.h"
00054 #include "ros/poll_manager.h"
00055 #include "ros/connection_manager.h"
00056 #include "ros/message_deserializer.h"
00057 #include "ros/subscription_queue.h"
00058 #include "ros/file_log.h"
00059 #include "ros/transport_hints.h"
00060 #include "ros/subscription_callback_helper.h"
00061 
00062 #include <boost/make_shared.hpp>
00063 
00064 using XmlRpc::XmlRpcValue;
00065 
00066 namespace ros
00067 {
00068 
00069 Subscription::Subscription(const std::string &name, const std::string& md5sum, const std::string& datatype, const TransportHints& transport_hints)
00070 : name_(name)
00071 , md5sum_(md5sum)
00072 , datatype_(datatype)
00073 , nonconst_callbacks_(0)
00074 , dropped_(false)
00075 , shutting_down_(false)
00076 , transport_hints_(transport_hints)
00077 {
00078 }
00079 
00080 Subscription::~Subscription()
00081 {
00082   pending_connections_.clear();
00083   callbacks_.clear();
00084 }
00085 
00086 void Subscription::shutdown()
00087 {
00088   {
00089     boost::mutex::scoped_lock lock(shutdown_mutex_);
00090     shutting_down_ = true;
00091   }
00092 
00093   drop();
00094 }
00095 
00096 XmlRpcValue Subscription::getStats()
00097 {
00098   XmlRpcValue stats;
00099   stats[0] = name_;
00100   XmlRpcValue conn_data;
00101   conn_data.setSize(0);
00102 
00103   boost::mutex::scoped_lock lock(publisher_links_mutex_);
00104 
00105   uint32_t cidx = 0;
00106   for (V_PublisherLink::iterator c = publisher_links_.begin();
00107        c != publisher_links_.end(); ++c)
00108   {
00109     const PublisherLink::Stats& s = (*c)->getStats();
00110     conn_data[cidx][0] = (*c)->getConnectionID();
00111     conn_data[cidx][1] = (int)s.bytes_received_;
00112     conn_data[cidx][2] = (int)s.messages_received_;
00113     conn_data[cidx][3] = (int)s.drops_;
00114     conn_data[cidx][4] = 0; // figure out something for this. not sure.
00115   }
00116 
00117   stats[1] = conn_data;
00118   return stats;
00119 }
00120 
00121 // [(connection_id, publisher_xmlrpc_uri, direction, transport, topic_name, connected, connection_info_string)*]
00122 // e.g. [(1, 'http://host:54893/', 'i', 'TCPROS', '/chatter', 1, 'TCPROS connection on port 59746 to [host:34318 on socket 11]')]
00123 void Subscription::getInfo(XmlRpc::XmlRpcValue& info)
00124 {
00125   boost::mutex::scoped_lock lock(publisher_links_mutex_);
00126 
00127   for (V_PublisherLink::iterator c = publisher_links_.begin();
00128        c != publisher_links_.end(); ++c)
00129   {
00130     XmlRpcValue curr_info;
00131     curr_info[0] = (int)(*c)->getConnectionID();
00132     curr_info[1] = (*c)->getPublisherXMLRPCURI();
00133     curr_info[2] = "i";
00134     curr_info[3] = (*c)->getTransportType();
00135     curr_info[4] = name_;
00136     curr_info[5] = true; // For length compatibility with rospy
00137     curr_info[6] = (*c)->getTransportInfo();
00138     info[info.size()] = curr_info;
00139   }
00140 }
00141 
00142 uint32_t Subscription::getNumPublishers()
00143 {
00144         boost::mutex::scoped_lock lock(publisher_links_mutex_);
00145         return (uint32_t)publisher_links_.size();
00146 }
00147 
00148 void Subscription::drop()
00149 {
00150   if (!dropped_)
00151   {
00152     dropped_ = true;
00153 
00154     dropAllConnections();
00155   }
00156 }
00157 
00158 void Subscription::dropAllConnections()
00159 {
00160   // Swap our subscribers list with a local one so we can only lock for a short period of time, because a
00161   // side effect of our calling drop() on connections can be re-locking the subscribers mutex
00162   V_PublisherLink localsubscribers;
00163 
00164   {
00165     boost::mutex::scoped_lock lock(publisher_links_mutex_);
00166 
00167     localsubscribers.swap(publisher_links_);
00168   }
00169 
00170   V_PublisherLink::iterator it = localsubscribers.begin();
00171   V_PublisherLink::iterator end = localsubscribers.end();
00172   for (;it != end; ++it)
00173   {
00174     (*it)->drop();
00175   }
00176 }
00177 
00178 void Subscription::addLocalConnection(const PublicationPtr& pub)
00179 {
00180   boost::mutex::scoped_lock lock(publisher_links_mutex_);
00181   if (dropped_)
00182   {
00183     return;
00184   }
00185 
00186   ROSCPP_LOG_DEBUG("Creating intraprocess link for topic [%s]", name_.c_str());
00187 
00188   IntraProcessPublisherLinkPtr pub_link(boost::make_shared<IntraProcessPublisherLink>(shared_from_this(), XMLRPCManager::instance()->getServerURI(), transport_hints_));
00189   IntraProcessSubscriberLinkPtr sub_link(boost::make_shared<IntraProcessSubscriberLink>(pub));
00190   pub_link->setPublisher(sub_link);
00191   sub_link->setSubscriber(pub_link);
00192 
00193   addPublisherLink(pub_link);
00194   pub->addSubscriberLink(sub_link);
00195 }
00196 
00197 bool urisEqual(const std::string& uri1, const std::string& uri2)
00198 {
00199   std::string host1, host2;
00200   uint32_t port1 = 0, port2 = 0;
00201   network::splitURI(uri1, host1, port1);
00202   network::splitURI(uri2, host2, port2);
00203   return port1 == port2 && host1 == host2;
00204 }
00205 
00206 bool Subscription::pubUpdate(const V_string& new_pubs)
00207 {
00208   boost::mutex::scoped_lock lock(shutdown_mutex_);
00209 
00210   if (shutting_down_ || dropped_)
00211   {
00212     return false;
00213   }
00214 
00215   bool retval = true;
00216 
00217   {
00218     std::stringstream ss;
00219 
00220     for (V_string::const_iterator up_i = new_pubs.begin();
00221          up_i != new_pubs.end(); ++up_i)
00222     {
00223       ss << *up_i << ", ";
00224     }
00225 
00226     ss << " already have these connections: ";
00227     for (V_PublisherLink::iterator spc = publisher_links_.begin();
00228          spc!= publisher_links_.end(); ++spc)
00229     {
00230       ss << (*spc)->getPublisherXMLRPCURI() << ", ";
00231     }
00232 
00233     boost::mutex::scoped_lock lock(pending_connections_mutex_);
00234     S_PendingConnection::iterator it = pending_connections_.begin();
00235     S_PendingConnection::iterator end = pending_connections_.end();
00236     for (; it != end; ++it)
00237     {
00238       ss << (*it)->getRemoteURI() << ", ";
00239     }
00240 
00241     ROSCPP_LOG_DEBUG("Publisher update for [%s]: %s", name_.c_str(), ss.str().c_str());
00242   }
00243 
00244   V_string additions;
00245   V_PublisherLink subtractions;
00246   V_PublisherLink to_add;
00247   // could use the STL set operations... but these sets are so small
00248   // it doesn't really matter.
00249   {
00250     boost::mutex::scoped_lock lock(publisher_links_mutex_);
00251 
00252     for (V_PublisherLink::iterator spc = publisher_links_.begin();
00253          spc!= publisher_links_.end(); ++spc)
00254     {
00255       bool found = false;
00256       for (V_string::const_iterator up_i = new_pubs.begin();
00257            !found && up_i != new_pubs.end(); ++up_i)
00258       {
00259         if (urisEqual((*spc)->getPublisherXMLRPCURI(), *up_i))
00260         {
00261           found = true;
00262           break;
00263         }
00264       }
00265 
00266       if (!found)
00267       {
00268         subtractions.push_back(*spc);
00269       }
00270     }
00271 
00272     for (V_string::const_iterator up_i  = new_pubs.begin(); up_i != new_pubs.end(); ++up_i)
00273     {
00274       bool found = false;
00275       for (V_PublisherLink::iterator spc = publisher_links_.begin();
00276            !found && spc != publisher_links_.end(); ++spc)
00277       {
00278         if (urisEqual(*up_i, (*spc)->getPublisherXMLRPCURI()))
00279         {
00280           found = true;
00281           break;
00282         }
00283       }
00284 
00285       if (!found)
00286       {
00287         boost::mutex::scoped_lock lock(pending_connections_mutex_);
00288         S_PendingConnection::iterator it = pending_connections_.begin();
00289         S_PendingConnection::iterator end = pending_connections_.end();
00290         for (; it != end; ++it)
00291         {
00292           if (urisEqual(*up_i, (*it)->getRemoteURI()))
00293           {
00294             found = true;
00295             break;
00296           }
00297         }
00298       }
00299 
00300       if (!found)
00301       {
00302         additions.push_back(*up_i);
00303       }
00304     }
00305   }
00306 
00307   for (V_PublisherLink::iterator i = subtractions.begin(); i != subtractions.end(); ++i)
00308   {
00309         const PublisherLinkPtr& link = *i;
00310     if (link->getPublisherXMLRPCURI() != XMLRPCManager::instance()->getServerURI())
00311     {
00312       ROSCPP_LOG_DEBUG("Disconnecting from publisher [%s] of topic [%s] at [%s]",
00313                         link->getCallerID().c_str(), name_.c_str(), link->getPublisherXMLRPCURI().c_str());
00314                   link->drop();
00315           }
00316           else
00317           {
00318                   ROSCPP_LOG_DEBUG("Disconnect: skipping myself for topic [%s]", name_.c_str());
00319           }
00320         }
00321 
00322   for (V_string::iterator i = additions.begin();
00323             i != additions.end(); ++i)
00324   {
00325     // this function should never negotiate a self-subscription
00326     if (XMLRPCManager::instance()->getServerURI() != *i)
00327     {
00328       retval &= negotiateConnection(*i);
00329     }
00330     else
00331     {
00332       ROSCPP_LOG_DEBUG("Skipping myself (%s, %s)", name_.c_str(), XMLRPCManager::instance()->getServerURI().c_str());
00333     }
00334   }
00335 
00336   return retval;
00337 }
00338 
00339 bool Subscription::negotiateConnection(const std::string& xmlrpc_uri)
00340 {
00341   XmlRpcValue tcpros_array, protos_array, params;
00342   XmlRpcValue udpros_array;
00343   TransportUDPPtr udp_transport;
00344   int protos = 0;
00345   V_string transports = transport_hints_.getTransports();
00346   if (transports.empty())
00347   {
00348     transport_hints_.reliable();
00349     transports = transport_hints_.getTransports();
00350   }
00351   for (V_string::const_iterator it = transports.begin();
00352        it != transports.end();
00353        ++it)
00354   {
00355     if (*it == "UDP")
00356     {
00357       int max_datagram_size = transport_hints_.getMaxDatagramSize();
00358       udp_transport = boost::make_shared<TransportUDP>(&PollManager::instance()->getPollSet());
00359       if (!max_datagram_size)
00360         max_datagram_size = udp_transport->getMaxDatagramSize();
00361       udp_transport->createIncoming(0, false);
00362       udpros_array[0] = "UDPROS";
00363       M_string m;
00364       m["topic"] = getName();
00365       m["md5sum"] = md5sum();
00366       m["callerid"] = this_node::getName();
00367       m["type"] = datatype();
00368       boost::shared_array<uint8_t> buffer;
00369       uint32_t len;
00370       Header::write(m, buffer, len);
00371       XmlRpcValue v(buffer.get(), len);
00372       udpros_array[1] = v;
00373       udpros_array[2] = network::getHost();
00374       udpros_array[3] = udp_transport->getServerPort();
00375       udpros_array[4] = max_datagram_size;
00376 
00377       protos_array[protos++] = udpros_array;
00378     }
00379     else if (*it == "TCP")
00380     {
00381       tcpros_array[0] = std::string("TCPROS");
00382       protos_array[protos++] = tcpros_array;
00383     }
00384     else
00385     {
00386       ROS_WARN("Unsupported transport type hinted: %s, skipping", it->c_str());
00387     }
00388   }
00389   params[0] = this_node::getName();
00390   params[1] = name_;
00391   params[2] = protos_array;
00392   std::string peer_host;
00393   uint32_t peer_port;
00394   if (!network::splitURI(xmlrpc_uri, peer_host, peer_port))
00395   {
00396     ROS_ERROR("Bad xml-rpc URI: [%s]", xmlrpc_uri.c_str());
00397     return false;
00398   }
00399 
00400   XmlRpc::XmlRpcClient* c = new XmlRpc::XmlRpcClient(peer_host.c_str(),
00401                                                      peer_port, "/");
00402  // if (!c.execute("requestTopic", params, result) || !g_node->validateXmlrpcResponse("requestTopic", result, proto))
00403 
00404   // Initiate the negotiation.  We'll come back and check on it later.
00405   if (!c->executeNonBlock("requestTopic", params))
00406   {
00407     ROSCPP_LOG_DEBUG("Failed to contact publisher [%s:%d] for topic [%s]",
00408               peer_host.c_str(), peer_port, name_.c_str());
00409     delete c;
00410     if (udp_transport)
00411     {
00412       udp_transport->close();
00413     }
00414 
00415     return false;
00416   }
00417 
00418   ROSCPP_LOG_DEBUG("Began asynchronous xmlrpc connection to [%s:%d]", peer_host.c_str(), peer_port);
00419 
00420   // The PendingConnectionPtr takes ownership of c, and will delete it on
00421   // destruction.
00422   PendingConnectionPtr conn(boost::make_shared<PendingConnection>(c, udp_transport, shared_from_this(), xmlrpc_uri));
00423 
00424   XMLRPCManager::instance()->addASyncConnection(conn);
00425   // Put this connection on the list that we'll look at later.
00426   {
00427     boost::mutex::scoped_lock pending_connections_lock(pending_connections_mutex_);
00428     pending_connections_.insert(conn);
00429   }
00430 
00431   return true;
00432 }
00433 
00434 void closeTransport(const TransportUDPPtr& trans)
00435 {
00436   if (trans)
00437   {
00438     trans->close();
00439   }
00440 }
00441 
00442 void Subscription::pendingConnectionDone(const PendingConnectionPtr& conn, XmlRpcValue& result)
00443 {
00444   boost::mutex::scoped_lock lock(shutdown_mutex_);
00445   if (shutting_down_ || dropped_)
00446   {
00447     return;
00448   }
00449 
00450   {
00451     boost::mutex::scoped_lock pending_connections_lock(pending_connections_mutex_);
00452     pending_connections_.erase(conn);
00453   }
00454 
00455   TransportUDPPtr udp_transport;
00456 
00457   std::string peer_host = conn->getClient()->getHost();
00458   uint32_t peer_port = conn->getClient()->getPort();
00459   std::stringstream ss;
00460   ss << "http://" << peer_host << ":" << peer_port << "/";
00461   std::string xmlrpc_uri = ss.str();
00462   udp_transport = conn->getUDPTransport();
00463 
00464   XmlRpc::XmlRpcValue proto;
00465   if(!XMLRPCManager::instance()->validateXmlrpcResponse("requestTopic", result, proto))
00466   {
00467         ROSCPP_LOG_DEBUG("Failed to contact publisher [%s:%d] for topic [%s]",
00468               peer_host.c_str(), peer_port, name_.c_str());
00469         closeTransport(udp_transport);
00470         return;
00471   }
00472 
00473   if (proto.size() == 0)
00474   {
00475         ROSCPP_LOG_DEBUG("Couldn't agree on any common protocols with [%s] for topic [%s]", xmlrpc_uri.c_str(), name_.c_str());
00476         closeTransport(udp_transport);
00477         return;
00478   }
00479 
00480   if (proto.getType() != XmlRpcValue::TypeArray)
00481   {
00482         ROSCPP_LOG_DEBUG("Available protocol info returned from %s is not a list.", xmlrpc_uri.c_str());
00483         closeTransport(udp_transport);
00484         return;
00485   }
00486   if (proto[0].getType() != XmlRpcValue::TypeString)
00487   {
00488         ROSCPP_LOG_DEBUG("Available protocol info list doesn't have a string as its first element.");
00489         closeTransport(udp_transport);
00490         return;
00491   }
00492 
00493   std::string proto_name = proto[0];
00494   if (proto_name == "TCPROS")
00495   {
00496     if (proto.size() != 3 ||
00497         proto[1].getType() != XmlRpcValue::TypeString ||
00498         proto[2].getType() != XmlRpcValue::TypeInt)
00499     {
00500         ROSCPP_LOG_DEBUG("publisher implements TCPROS, but the " \
00501                 "parameters aren't string,int");
00502       return;
00503     }
00504     std::string pub_host = proto[1];
00505     int pub_port = proto[2];
00506     ROSCPP_LOG_DEBUG("Connecting via tcpros to topic [%s] at host [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
00507 
00508     TransportTCPPtr transport(boost::make_shared<TransportTCP>(&PollManager::instance()->getPollSet()));
00509     if (transport->connect(pub_host, pub_port))
00510     {
00511       ConnectionPtr connection(boost::make_shared<Connection>());
00512       TransportPublisherLinkPtr pub_link(boost::make_shared<TransportPublisherLink>(shared_from_this(), xmlrpc_uri, transport_hints_));
00513 
00514       connection->initialize(transport, false, HeaderReceivedFunc());
00515       pub_link->initialize(connection);
00516 
00517       ConnectionManager::instance()->addConnection(connection);
00518 
00519       boost::mutex::scoped_lock lock(publisher_links_mutex_);
00520       addPublisherLink(pub_link);
00521 
00522       ROSCPP_LOG_DEBUG("Connected to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
00523     }
00524     else
00525     {
00526         ROSCPP_LOG_DEBUG("Failed to connect to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
00527     }
00528   }
00529   else if (proto_name == "UDPROS")
00530   {
00531     if (proto.size() != 6 ||
00532         proto[1].getType() != XmlRpcValue::TypeString ||
00533         proto[2].getType() != XmlRpcValue::TypeInt ||
00534         proto[3].getType() != XmlRpcValue::TypeInt ||
00535         proto[4].getType() != XmlRpcValue::TypeInt ||
00536         proto[5].getType() != XmlRpcValue::TypeBase64)
00537     {
00538       ROSCPP_LOG_DEBUG("publisher implements UDPROS, but the " \
00539                        "parameters aren't string,int,int,int,base64");
00540       closeTransport(udp_transport);
00541       return;
00542     }
00543     std::string pub_host = proto[1];
00544     int pub_port = proto[2];
00545     int conn_id = proto[3];
00546     int max_datagram_size = proto[4];
00547     std::vector<char> header_bytes = proto[5];
00548     boost::shared_array<uint8_t> buffer(new uint8_t[header_bytes.size()]);
00549     memcpy(buffer.get(), &header_bytes[0], header_bytes.size());
00550     Header h;
00551     std::string err;
00552     if (!h.parse(buffer, header_bytes.size(), err))
00553     {
00554       ROSCPP_LOG_DEBUG("Unable to parse UDPROS connection header: %s", err.c_str());
00555       closeTransport(udp_transport);
00556       return;
00557     }
00558     ROSCPP_LOG_DEBUG("Connecting via udpros to topic [%s] at host [%s:%d] connection id [%08x] max_datagram_size [%d]", name_.c_str(), pub_host.c_str(), pub_port, conn_id, max_datagram_size);
00559 
00560     std::string error_msg;
00561     if (h.getValue("error", error_msg))
00562     {
00563       ROSCPP_LOG_DEBUG("Received error message in header for connection to [%s]: [%s]", xmlrpc_uri.c_str(), error_msg.c_str());
00564       closeTransport(udp_transport);
00565       return;
00566     }
00567 
00568     TransportPublisherLinkPtr pub_link(boost::make_shared<TransportPublisherLink>(shared_from_this(), xmlrpc_uri, transport_hints_));
00569     if (pub_link->setHeader(h))
00570     {
00571       ConnectionPtr connection(boost::make_shared<Connection>());
00572       connection->initialize(udp_transport, false, NULL);
00573       connection->setHeader(h);
00574       pub_link->initialize(connection);
00575 
00576       ConnectionManager::instance()->addConnection(connection);
00577 
00578       boost::mutex::scoped_lock lock(publisher_links_mutex_);
00579       addPublisherLink(pub_link);
00580 
00581       ROSCPP_LOG_DEBUG("Connected to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
00582     }
00583     else
00584     {
00585       ROSCPP_LOG_DEBUG("Failed to connect to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
00586       closeTransport(udp_transport);
00587       return;
00588     }
00589   }
00590   else
00591   {
00592         ROSCPP_LOG_DEBUG("Publisher offered unsupported transport [%s]", proto_name.c_str());
00593   }
00594 }
00595 
00596 uint32_t Subscription::handleMessage(const SerializedMessage& m, bool ser, bool nocopy, const boost::shared_ptr<M_string>& connection_header, const PublisherLinkPtr& link)
00597 {
00598   boost::mutex::scoped_lock lock(callbacks_mutex_);
00599 
00600   uint32_t drops = 0;
00601 
00602   // Cache the deserializers by type info.  If all the subscriptions are the same type this has the same performance as before.  If
00603   // there are subscriptions with different C++ type (but same ROS message type), this now works correctly rather than passing
00604   // garbage to the messages with different C++ types than the first one.
00605   cached_deserializers_.clear();
00606 
00607   ros::Time receipt_time = ros::Time::now();
00608 
00609   for (V_CallbackInfo::iterator cb = callbacks_.begin();
00610        cb != callbacks_.end(); ++cb)
00611   {
00612     const CallbackInfoPtr& info = *cb;
00613 
00614     ROS_ASSERT(info->callback_queue_);
00615 
00616     const std::type_info* ti = &info->helper_->getTypeInfo();
00617 
00618     if ((nocopy && m.type_info && *ti == *m.type_info) || (ser && (!m.type_info || *ti != *m.type_info)))
00619     {
00620       MessageDeserializerPtr deserializer;
00621 
00622       V_TypeAndDeserializer::iterator des_it = cached_deserializers_.begin();
00623       V_TypeAndDeserializer::iterator des_end = cached_deserializers_.end();
00624       for (; des_it != des_end; ++des_it)
00625       {
00626         if (*des_it->first == *ti)
00627         {
00628           deserializer = des_it->second;
00629           break;
00630         }
00631       }
00632 
00633       if (!deserializer)
00634       {
00635         deserializer = boost::make_shared<MessageDeserializer>(info->helper_, m, connection_header);
00636         cached_deserializers_.push_back(std::make_pair(ti, deserializer));
00637       }
00638 
00639       bool was_full = false;
00640       bool nonconst_need_copy = false;
00641       if (callbacks_.size() > 1)
00642       {
00643         nonconst_need_copy = true;
00644       }
00645 
00646       info->subscription_queue_->push(info->helper_, deserializer, info->has_tracked_object_, info->tracked_object_, nonconst_need_copy, receipt_time, &was_full);
00647 
00648       if (was_full)
00649       {
00650         ++drops;
00651       }
00652       else
00653       {
00654         info->callback_queue_->addCallback(info->subscription_queue_, (uint64_t)info.get());
00655       }
00656     }
00657   }
00658 
00659   // measure statistics
00660   statistics_.callback(connection_header, name_, link->getCallerID(), m, link->getStats().bytes_received_, receipt_time, drops > 0);
00661 
00662   // If this link is latched, store off the message so we can immediately pass it to new subscribers later
00663   if (link->isLatched())
00664   {
00665     LatchInfo li;
00666     li.connection_header = connection_header;
00667     li.link = link;
00668     li.message = m;
00669     li.receipt_time = receipt_time;
00670     latched_messages_[link] = li;
00671   }
00672 
00673   cached_deserializers_.clear();
00674 
00675   return drops;
00676 }
00677 
00678 bool Subscription::addCallback(const SubscriptionCallbackHelperPtr& helper, const std::string& md5sum, CallbackQueueInterface* queue, int32_t queue_size, const VoidConstPtr& tracked_object, bool allow_concurrent_callbacks)
00679 {
00680   ROS_ASSERT(helper);
00681   ROS_ASSERT(queue);
00682 
00683   statistics_.init(helper);
00684 
00685   // Decay to a real type as soon as we have a subscriber with a real type
00686   {
00687     boost::mutex::scoped_lock lock(md5sum_mutex_);
00688     if (md5sum_ == "*" && md5sum != "*")
00689     {
00690 
00691       md5sum_ = md5sum;
00692     }
00693   }
00694 
00695   if (md5sum != "*" && md5sum != this->md5sum())
00696   {
00697     return false;
00698   }
00699 
00700   {
00701     boost::mutex::scoped_lock lock(callbacks_mutex_);
00702 
00703     CallbackInfoPtr info(boost::make_shared<CallbackInfo>());
00704     info->helper_ = helper;
00705     info->callback_queue_ = queue;
00706     info->subscription_queue_ = boost::make_shared<SubscriptionQueue>(name_, queue_size, allow_concurrent_callbacks);
00707     info->tracked_object_ = tracked_object;
00708     info->has_tracked_object_ = false;
00709     if (tracked_object)
00710     {
00711       info->has_tracked_object_ = true;
00712     }
00713 
00714     if (!helper->isConst())
00715     {
00716       ++nonconst_callbacks_;
00717     }
00718 
00719     callbacks_.push_back(info);
00720     cached_deserializers_.reserve(callbacks_.size());
00721 
00722     // if we have any latched links, we need to immediately schedule callbacks
00723     if (!latched_messages_.empty())
00724     {
00725       boost::mutex::scoped_lock lock(publisher_links_mutex_);
00726 
00727       V_PublisherLink::iterator it = publisher_links_.begin();
00728       V_PublisherLink::iterator end = publisher_links_.end();
00729       for (; it != end;++it)
00730       {
00731         const PublisherLinkPtr& link = *it;
00732         if (link->isLatched())
00733         {
00734           M_PublisherLinkToLatchInfo::iterator des_it = latched_messages_.find(link);
00735           if (des_it != latched_messages_.end())
00736           {
00737             const LatchInfo& latch_info = des_it->second;
00738 
00739             MessageDeserializerPtr des(boost::make_shared<MessageDeserializer>(helper, latch_info.message, latch_info.connection_header));
00740             bool was_full = false;
00741             info->subscription_queue_->push(info->helper_, des, info->has_tracked_object_, info->tracked_object_, true, latch_info.receipt_time, &was_full);
00742             if (!was_full)
00743             {
00744               info->callback_queue_->addCallback(info->subscription_queue_, (uint64_t)info.get());
00745             }
00746           }
00747         }
00748       }
00749     }
00750   }
00751 
00752   return true;
00753 }
00754 
00755 void Subscription::removeCallback(const SubscriptionCallbackHelperPtr& helper)
00756 {
00757   CallbackInfoPtr info;
00758   {
00759     boost::mutex::scoped_lock cbs_lock(callbacks_mutex_);
00760     for (V_CallbackInfo::iterator it = callbacks_.begin();
00761          it != callbacks_.end(); ++it)
00762     {
00763       if ((*it)->helper_ == helper)
00764       {
00765         info = *it;
00766         callbacks_.erase(it);
00767 
00768         if (!helper->isConst())
00769         {
00770           --nonconst_callbacks_;
00771         }
00772 
00773         break;
00774       }
00775     }
00776   }
00777 
00778   if (info)
00779   {
00780     info->subscription_queue_->clear();
00781     info->callback_queue_->removeByID((uint64_t)info.get());
00782   }
00783 }
00784 
00785 void Subscription::headerReceived(const PublisherLinkPtr& link, const Header& h)
00786 {
00787   (void)h;
00788   boost::mutex::scoped_lock lock(md5sum_mutex_);
00789   if (md5sum_ == "*")
00790   {
00791     md5sum_ = link->getMD5Sum();
00792   }
00793 }
00794 
00795 void Subscription::addPublisherLink(const PublisherLinkPtr& link)
00796 {
00797   publisher_links_.push_back(link);
00798 }
00799 
00800 void Subscription::removePublisherLink(const PublisherLinkPtr& pub_link)
00801 {
00802   boost::mutex::scoped_lock lock(publisher_links_mutex_);
00803 
00804   V_PublisherLink::iterator it = std::find(publisher_links_.begin(), publisher_links_.end(), pub_link);
00805   if (it != publisher_links_.end())
00806   {
00807     publisher_links_.erase(it);
00808   }
00809 
00810   if (pub_link->isLatched())
00811   {
00812     latched_messages_.erase(pub_link);
00813   }
00814 }
00815 
00816 void Subscription::getPublishTypes(bool& ser, bool& nocopy, const std::type_info& ti)
00817 {
00818   boost::mutex::scoped_lock lock(callbacks_mutex_);
00819   for (V_CallbackInfo::iterator cb = callbacks_.begin();
00820        cb != callbacks_.end(); ++cb)
00821   {
00822     const CallbackInfoPtr& info = *cb;
00823     if (info->helper_->getTypeInfo() == ti)
00824     {
00825       nocopy = true;
00826     }
00827     else
00828     {
00829       ser = true;
00830     }
00831 
00832     if (nocopy && ser)
00833     {
00834       return;
00835     }
00836   }
00837 }
00838 
00839 const std::string Subscription::datatype()
00840 {
00841   return datatype_;
00842 }
00843 
00844 const std::string Subscription::md5sum()
00845 {
00846   boost::mutex::scoped_lock lock(md5sum_mutex_);
00847   return md5sum_;
00848 }
00849 
00850 }


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Thu Jun 6 2019 21:10:05