$search
00001 /* 00002 * Software License Agreement (BSD License) 00003 * 00004 * Copyright (c) 2008, Willow Garage, Inc. 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions 00009 * are met: 00010 * 00011 * * Redistributions of source code must retain the above copyright 00012 * notice, this list of conditions and the following disclaimer. 00013 * * Redistributions in binary form must reproduce the above 00014 * copyright notice, this list of conditions and the following 00015 * disclaimer in the documentation and/or other materials provided 00016 * with the distribution. 00017 * * Neither the name of Willow Garage, Inc. nor the names of its 00018 * contributors may be used to endorse or promote products derived 00019 * from this software without specific prior written permission. 00020 * 00021 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 * POSSIBILITY OF SUCH DAMAGE. 00033 */ 00034 00035 #include <sstream> 00036 #include <fcntl.h> 00037 #include <cerrno> 00038 #include <cstring> 00039 #include <typeinfo> 00040 00041 #include "ros/common.h" 00042 #include "ros/io.h" 00043 #include "ros/subscription.h" 00044 #include "ros/publication.h" 00045 #include "ros/transport_publisher_link.h" 00046 #include "ros/intraprocess_publisher_link.h" 00047 #include "ros/intraprocess_subscriber_link.h" 00048 #include "ros/connection.h" 00049 #include "ros/transport/transport_tcp.h" 00050 #include "ros/transport/transport_udp.h" 00051 #include "ros/callback_queue_interface.h" 00052 #include "ros/this_node.h" 00053 #include "ros/network.h" 00054 #include "ros/poll_manager.h" 00055 #include "ros/connection_manager.h" 00056 #include "ros/message_deserializer.h" 00057 #include "ros/subscription_queue.h" 00058 #include "ros/file_log.h" 00059 #include "ros/transport_hints.h" 00060 #include "ros/subscription_callback_helper.h" 00061 00062 #include <boost/make_shared.hpp> 00063 00064 using XmlRpc::XmlRpcValue; 00065 00066 namespace ros 00067 { 00068 00069 Subscription::Subscription(const std::string &name, const std::string& md5sum, const std::string& datatype, const TransportHints& transport_hints) 00070 : name_(name) 00071 , md5sum_(md5sum) 00072 , datatype_(datatype) 00073 , nonconst_callbacks_(0) 00074 , dropped_(false) 00075 , shutting_down_(false) 00076 , transport_hints_(transport_hints) 00077 { 00078 } 00079 00080 Subscription::~Subscription() 00081 { 00082 pending_connections_.clear(); 00083 callbacks_.clear(); 00084 } 00085 00086 void Subscription::shutdown() 00087 { 00088 { 00089 boost::mutex::scoped_lock lock(shutdown_mutex_); 00090 shutting_down_ = true; 00091 } 00092 00093 drop(); 00094 } 00095 00096 XmlRpcValue Subscription::getStats() 00097 { 00098 XmlRpcValue stats; 00099 stats[0] = name_; 00100 XmlRpcValue conn_data; 00101 conn_data.setSize(0); 00102 00103 boost::mutex::scoped_lock lock(publisher_links_mutex_); 00104 00105 uint32_t cidx = 0; 00106 for (V_PublisherLink::iterator c = publisher_links_.begin(); 00107 c != publisher_links_.end(); ++c) 00108 { 00109 const PublisherLink::Stats& s = (*c)->getStats(); 00110 conn_data[cidx][0] = (*c)->getConnectionID(); 00111 conn_data[cidx][1] = (int)s.bytes_received_; 00112 conn_data[cidx][2] = (int)s.messages_received_; 00113 conn_data[cidx][3] = (int)s.drops_; 00114 conn_data[cidx][4] = 0; // figure out something for this. not sure. 00115 } 00116 00117 stats[1] = conn_data; 00118 return stats; 00119 } 00120 00121 // rospy returns values like this: 00122 // (1, 'http://127.0.0.1:62365/', 'i', 'TCPROS', '/chatter') 00123 // 00124 // We're outputting something like this: 00125 // (0, http://127.0.0.1:62438/, i, TCPROS, /chatter) 00126 void Subscription::getInfo(XmlRpc::XmlRpcValue& info) 00127 { 00128 boost::mutex::scoped_lock lock(publisher_links_mutex_); 00129 00130 for (V_PublisherLink::iterator c = publisher_links_.begin(); 00131 c != publisher_links_.end(); ++c) 00132 { 00133 XmlRpcValue curr_info; 00134 curr_info[0] = (int)(*c)->getConnectionID(); 00135 curr_info[1] = (*c)->getPublisherXMLRPCURI(); 00136 curr_info[2] = "i"; 00137 curr_info[3] = (*c)->getTransportType(); 00138 curr_info[4] = name_; 00139 info[info.size()] = curr_info; 00140 } 00141 } 00142 00143 uint32_t Subscription::getNumPublishers() 00144 { 00145 boost::mutex::scoped_lock lock(publisher_links_mutex_); 00146 return (uint32_t)publisher_links_.size(); 00147 } 00148 00149 void Subscription::drop() 00150 { 00151 if (!dropped_) 00152 { 00153 dropped_ = true; 00154 00155 dropAllConnections(); 00156 } 00157 } 00158 00159 void Subscription::dropAllConnections() 00160 { 00161 // Swap our subscribers list with a local one so we can only lock for a short period of time, because a 00162 // side effect of our calling drop() on connections can be re-locking the subscribers mutex 00163 V_PublisherLink localsubscribers; 00164 00165 { 00166 boost::mutex::scoped_lock lock(publisher_links_mutex_); 00167 00168 localsubscribers.swap(publisher_links_); 00169 } 00170 00171 V_PublisherLink::iterator it = localsubscribers.begin(); 00172 V_PublisherLink::iterator end = localsubscribers.end(); 00173 for (;it != end; ++it) 00174 { 00175 (*it)->drop(); 00176 } 00177 } 00178 00179 void Subscription::addLocalConnection(const PublicationPtr& pub) 00180 { 00181 boost::mutex::scoped_lock lock(publisher_links_mutex_); 00182 if (dropped_) 00183 { 00184 return; 00185 } 00186 00187 ROSCPP_LOG_DEBUG("Creating intraprocess link for topic [%s]", name_.c_str()); 00188 00189 IntraProcessPublisherLinkPtr pub_link(new IntraProcessPublisherLink(shared_from_this(), XMLRPCManager::instance()->getServerURI(), transport_hints_)); 00190 IntraProcessSubscriberLinkPtr sub_link(new IntraProcessSubscriberLink(pub)); 00191 pub_link->setPublisher(sub_link); 00192 sub_link->setSubscriber(pub_link); 00193 00194 addPublisherLink(pub_link); 00195 pub->addSubscriberLink(sub_link); 00196 } 00197 00198 bool urisEqual(const std::string& uri1, const std::string& uri2) 00199 { 00200 std::string host1, host2; 00201 uint32_t port1 = 0, port2 = 0; 00202 network::splitURI(uri1, host1, port1); 00203 network::splitURI(uri2, host2, port2); 00204 return port1 == port2 && host1 == host2; 00205 } 00206 00207 bool Subscription::pubUpdate(const V_string& new_pubs) 00208 { 00209 boost::mutex::scoped_lock lock(shutdown_mutex_); 00210 00211 if (shutting_down_ || dropped_) 00212 { 00213 return false; 00214 } 00215 00216 bool retval = true; 00217 00218 { 00219 std::stringstream ss; 00220 00221 for (V_string::const_iterator up_i = new_pubs.begin(); 00222 up_i != new_pubs.end(); ++up_i) 00223 { 00224 ss << *up_i << ", "; 00225 } 00226 00227 ss << " already have these connections: "; 00228 for (V_PublisherLink::iterator spc = publisher_links_.begin(); 00229 spc!= publisher_links_.end(); ++spc) 00230 { 00231 ss << (*spc)->getPublisherXMLRPCURI() << ", "; 00232 } 00233 00234 boost::mutex::scoped_lock lock(pending_connections_mutex_); 00235 S_PendingConnection::iterator it = pending_connections_.begin(); 00236 S_PendingConnection::iterator end = pending_connections_.end(); 00237 for (; it != end; ++it) 00238 { 00239 ss << (*it)->getRemoteURI() << ", "; 00240 } 00241 00242 ROSCPP_LOG_DEBUG("Publisher update for [%s]: %s", name_.c_str(), ss.str().c_str()); 00243 } 00244 00245 V_string additions; 00246 V_PublisherLink subtractions; 00247 V_PublisherLink to_add; 00248 // could use the STL set operations... but these sets are so small 00249 // it doesn't really matter. 00250 { 00251 boost::mutex::scoped_lock lock(publisher_links_mutex_); 00252 00253 for (V_PublisherLink::iterator spc = publisher_links_.begin(); 00254 spc!= publisher_links_.end(); ++spc) 00255 { 00256 bool found = false; 00257 for (V_string::const_iterator up_i = new_pubs.begin(); 00258 !found && up_i != new_pubs.end(); ++up_i) 00259 { 00260 if (urisEqual((*spc)->getPublisherXMLRPCURI(), *up_i)) 00261 { 00262 found = true; 00263 break; 00264 } 00265 } 00266 00267 if (!found) 00268 { 00269 subtractions.push_back(*spc); 00270 } 00271 } 00272 00273 for (V_string::const_iterator up_i = new_pubs.begin(); up_i != new_pubs.end(); ++up_i) 00274 { 00275 bool found = false; 00276 for (V_PublisherLink::iterator spc = publisher_links_.begin(); 00277 !found && spc != publisher_links_.end(); ++spc) 00278 { 00279 if (urisEqual(*up_i, (*spc)->getPublisherXMLRPCURI())) 00280 { 00281 found = true; 00282 break; 00283 } 00284 } 00285 00286 if (!found) 00287 { 00288 boost::mutex::scoped_lock lock(pending_connections_mutex_); 00289 S_PendingConnection::iterator it = pending_connections_.begin(); 00290 S_PendingConnection::iterator end = pending_connections_.end(); 00291 for (; it != end; ++it) 00292 { 00293 if (urisEqual(*up_i, (*it)->getRemoteURI())) 00294 { 00295 found = true; 00296 break; 00297 } 00298 } 00299 } 00300 00301 if (!found) 00302 { 00303 additions.push_back(*up_i); 00304 } 00305 } 00306 } 00307 00308 for (V_PublisherLink::iterator i = subtractions.begin(); i != subtractions.end(); ++i) 00309 { 00310 const PublisherLinkPtr& link = *i; 00311 if (link->getPublisherXMLRPCURI() != XMLRPCManager::instance()->getServerURI()) 00312 { 00313 ROSCPP_LOG_DEBUG("Disconnecting from publisher [%s] of topic [%s] at [%s]", 00314 link->getCallerID().c_str(), name_.c_str(), link->getPublisherXMLRPCURI().c_str()); 00315 link->drop(); 00316 } 00317 else 00318 { 00319 ROSCPP_LOG_DEBUG("Disconnect: skipping myself for topic [%s]", name_.c_str()); 00320 } 00321 } 00322 00323 for (V_string::iterator i = additions.begin(); 00324 i != additions.end(); ++i) 00325 { 00326 // this function should never negotiate a self-subscription 00327 if (XMLRPCManager::instance()->getServerURI() != *i) 00328 { 00329 retval &= negotiateConnection(*i); 00330 } 00331 else 00332 { 00333 ROSCPP_LOG_DEBUG("Skipping myself (%s, %s)", name_.c_str(), XMLRPCManager::instance()->getServerURI().c_str()); 00334 } 00335 } 00336 00337 return retval; 00338 } 00339 00340 bool Subscription::negotiateConnection(const std::string& xmlrpc_uri) 00341 { 00342 XmlRpcValue tcpros_array, protos_array, params; 00343 XmlRpcValue udpros_array; 00344 TransportUDPPtr udp_transport; 00345 int protos = 0; 00346 V_string transports = transport_hints_.getTransports(); 00347 if (transports.empty()) 00348 { 00349 transport_hints_.reliable(); 00350 transports = transport_hints_.getTransports(); 00351 } 00352 for (V_string::const_iterator it = transports.begin(); 00353 it != transports.end(); 00354 ++it) 00355 { 00356 if (*it == "UDP") 00357 { 00358 int max_datagram_size = transport_hints_.getMaxDatagramSize(); 00359 udp_transport = TransportUDPPtr(new TransportUDP(&PollManager::instance()->getPollSet())); 00360 if (!max_datagram_size) 00361 max_datagram_size = udp_transport->getMaxDatagramSize(); 00362 udp_transport->createIncoming(0, false); 00363 udpros_array[0] = "UDPROS"; 00364 M_string m; 00365 m["topic"] = getName(); 00366 m["md5sum"] = md5sum(); 00367 m["callerid"] = this_node::getName(); 00368 m["type"] = datatype(); 00369 boost::shared_array<uint8_t> buffer; 00370 uint32_t len; 00371 Header::write(m, buffer, len); 00372 XmlRpcValue v(buffer.get(), len); 00373 udpros_array[1] = v; 00374 udpros_array[2] = network::getHost(); 00375 udpros_array[3] = udp_transport->getServerPort(); 00376 udpros_array[4] = max_datagram_size; 00377 00378 protos_array[protos++] = udpros_array; 00379 } 00380 else if (*it == "TCP") 00381 { 00382 tcpros_array[0] = std::string("TCPROS"); 00383 protos_array[protos++] = tcpros_array; 00384 } 00385 else 00386 { 00387 ROS_WARN("Unsupported transport type hinted: %s, skipping", it->c_str()); 00388 } 00389 } 00390 params[0] = this_node::getName(); 00391 params[1] = name_; 00392 params[2] = protos_array; 00393 std::string peer_host; 00394 uint32_t peer_port; 00395 if (!network::splitURI(xmlrpc_uri, peer_host, peer_port)) 00396 { 00397 ROS_ERROR("Bad xml-rpc URI: [%s]", xmlrpc_uri.c_str()); 00398 return false; 00399 } 00400 00401 XmlRpc::XmlRpcClient* c = new XmlRpc::XmlRpcClient(peer_host.c_str(), 00402 peer_port, "/"); 00403 // if (!c.execute("requestTopic", params, result) || !g_node->validateXmlrpcResponse("requestTopic", result, proto)) 00404 00405 // Initiate the negotiation. We'll come back and check on it later. 00406 if (!c->executeNonBlock("requestTopic", params)) 00407 { 00408 ROSCPP_LOG_DEBUG("Failed to contact publisher [%s:%d] for topic [%s]", 00409 peer_host.c_str(), peer_port, name_.c_str()); 00410 delete c; 00411 if (udp_transport) 00412 { 00413 udp_transport->close(); 00414 } 00415 00416 return false; 00417 } 00418 00419 ROSCPP_LOG_DEBUG("Began asynchronous xmlrpc connection to [%s:%d]", peer_host.c_str(), peer_port); 00420 00421 // The PendingConnectionPtr takes ownership of c, and will delete it on 00422 // destruction. 00423 PendingConnectionPtr conn(new PendingConnection(c, udp_transport, shared_from_this(), xmlrpc_uri)); 00424 00425 XMLRPCManager::instance()->addASyncConnection(conn); 00426 // Put this connection on the list that we'll look at later. 00427 { 00428 boost::mutex::scoped_lock pending_connections_lock(pending_connections_mutex_); 00429 pending_connections_.insert(conn); 00430 } 00431 00432 return true; 00433 } 00434 00435 void closeTransport(const TransportUDPPtr& trans) 00436 { 00437 if (trans) 00438 { 00439 trans->close(); 00440 } 00441 } 00442 00443 void Subscription::pendingConnectionDone(const PendingConnectionPtr& conn, XmlRpcValue& result) 00444 { 00445 boost::mutex::scoped_lock lock(shutdown_mutex_); 00446 if (shutting_down_ || dropped_) 00447 { 00448 return; 00449 } 00450 00451 { 00452 boost::mutex::scoped_lock pending_connections_lock(pending_connections_mutex_); 00453 pending_connections_.erase(conn); 00454 } 00455 00456 TransportUDPPtr udp_transport; 00457 00458 std::string peer_host = conn->getClient()->getHost(); 00459 uint32_t peer_port = conn->getClient()->getPort(); 00460 std::stringstream ss; 00461 ss << "http://" << peer_host << ":" << peer_port << "/"; 00462 std::string xmlrpc_uri = ss.str(); 00463 udp_transport = conn->getUDPTransport(); 00464 00465 XmlRpc::XmlRpcValue proto; 00466 if(!XMLRPCManager::instance()->validateXmlrpcResponse("requestTopic", result, proto)) 00467 { 00468 ROSCPP_LOG_DEBUG("Failed to contact publisher [%s:%d] for topic [%s]", 00469 peer_host.c_str(), peer_port, name_.c_str()); 00470 closeTransport(udp_transport); 00471 return; 00472 } 00473 00474 if (proto.size() == 0) 00475 { 00476 ROSCPP_LOG_DEBUG("Couldn't agree on any common protocols with [%s] for topic [%s]", xmlrpc_uri.c_str(), name_.c_str()); 00477 closeTransport(udp_transport); 00478 return; 00479 } 00480 00481 if (proto.getType() != XmlRpcValue::TypeArray) 00482 { 00483 ROSCPP_LOG_DEBUG("Available protocol info returned from %s is not a list.", xmlrpc_uri.c_str()); 00484 closeTransport(udp_transport); 00485 return; 00486 } 00487 if (proto[0].getType() != XmlRpcValue::TypeString) 00488 { 00489 ROSCPP_LOG_DEBUG("Available protocol info list doesn't have a string as its first element."); 00490 closeTransport(udp_transport); 00491 return; 00492 } 00493 00494 std::string proto_name = proto[0]; 00495 if (proto_name == "TCPROS") 00496 { 00497 if (proto.size() != 3 || 00498 proto[1].getType() != XmlRpcValue::TypeString || 00499 proto[2].getType() != XmlRpcValue::TypeInt) 00500 { 00501 ROSCPP_LOG_DEBUG("publisher implements TCPROS, but the " \ 00502 "parameters aren't string,int"); 00503 return; 00504 } 00505 std::string pub_host = proto[1]; 00506 int pub_port = proto[2]; 00507 ROSCPP_LOG_DEBUG("Connecting via tcpros to topic [%s] at host [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port); 00508 00509 TransportTCPPtr transport(new TransportTCP(&PollManager::instance()->getPollSet())); 00510 if (transport->connect(pub_host, pub_port)) 00511 { 00512 ConnectionPtr connection(new Connection()); 00513 TransportPublisherLinkPtr pub_link(new TransportPublisherLink(shared_from_this(), xmlrpc_uri, transport_hints_)); 00514 00515 connection->initialize(transport, false, HeaderReceivedFunc()); 00516 pub_link->initialize(connection); 00517 00518 ConnectionManager::instance()->addConnection(connection); 00519 00520 boost::mutex::scoped_lock lock(publisher_links_mutex_); 00521 addPublisherLink(pub_link); 00522 00523 ROSCPP_LOG_DEBUG("Connected to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port); 00524 } 00525 else 00526 { 00527 ROSCPP_LOG_DEBUG("Failed to connect to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port); 00528 } 00529 } 00530 else if (proto_name == "UDPROS") 00531 { 00532 if (proto.size() != 6 || 00533 proto[1].getType() != XmlRpcValue::TypeString || 00534 proto[2].getType() != XmlRpcValue::TypeInt || 00535 proto[3].getType() != XmlRpcValue::TypeInt || 00536 proto[4].getType() != XmlRpcValue::TypeInt || 00537 proto[5].getType() != XmlRpcValue::TypeBase64) 00538 { 00539 ROSCPP_LOG_DEBUG("publisher implements UDPROS, but the " \ 00540 "parameters aren't string,int,int,int,base64"); 00541 closeTransport(udp_transport); 00542 return; 00543 } 00544 std::string pub_host = proto[1]; 00545 int pub_port = proto[2]; 00546 int conn_id = proto[3]; 00547 int max_datagram_size = proto[4]; 00548 std::vector<char> header_bytes = proto[5]; 00549 boost::shared_array<uint8_t> buffer = boost::shared_array<uint8_t>(new uint8_t[header_bytes.size()]); 00550 memcpy(buffer.get(), &header_bytes[0], header_bytes.size()); 00551 Header h; 00552 std::string err; 00553 if (!h.parse(buffer, header_bytes.size(), err)) 00554 { 00555 ROSCPP_LOG_DEBUG("Unable to parse UDPROS connection header: %s", err.c_str()); 00556 closeTransport(udp_transport); 00557 return; 00558 } 00559 ROSCPP_LOG_DEBUG("Connecting via udpros to topic [%s] at host [%s:%d] connection id [%08x] max_datagram_size [%d]", name_.c_str(), pub_host.c_str(), pub_port, conn_id, max_datagram_size); 00560 00561 std::string error_msg; 00562 if (h.getValue("error", error_msg)) 00563 { 00564 ROSCPP_LOG_DEBUG("Received error message in header for connection to [%s]: [%s]", xmlrpc_uri.c_str(), error_msg.c_str()); 00565 closeTransport(udp_transport); 00566 return; 00567 } 00568 00569 TransportPublisherLinkPtr pub_link(new TransportPublisherLink(shared_from_this(), xmlrpc_uri, transport_hints_)); 00570 if (pub_link->setHeader(h)) 00571 { 00572 ConnectionPtr connection(new Connection()); 00573 connection->initialize(udp_transport, false, NULL); 00574 connection->setHeader(h); 00575 pub_link->initialize(connection); 00576 00577 ConnectionManager::instance()->addConnection(connection); 00578 00579 boost::mutex::scoped_lock lock(publisher_links_mutex_); 00580 addPublisherLink(pub_link); 00581 00582 ROSCPP_LOG_DEBUG("Connected to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port); 00583 } 00584 else 00585 { 00586 ROSCPP_LOG_DEBUG("Failed to connect to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port); 00587 closeTransport(udp_transport); 00588 return; 00589 } 00590 } 00591 else 00592 { 00593 ROSCPP_LOG_DEBUG("Publisher offered unsupported transport [%s]", proto_name.c_str()); 00594 } 00595 } 00596 00597 uint32_t Subscription::handleMessage(const SerializedMessage& m, bool ser, bool nocopy, const boost::shared_ptr<M_string>& connection_header, const PublisherLinkPtr& link) 00598 { 00599 boost::mutex::scoped_lock lock(callbacks_mutex_); 00600 00601 uint32_t drops = 0; 00602 00603 // Cache the deserializers by type info. If all the subscriptions are the same type this has the same performance as before. If 00604 // there are subscriptions with different C++ type (but same ROS message type), this now works correctly rather than passing 00605 // garbage to the messages with different C++ types than the first one. 00606 cached_deserializers_.clear(); 00607 00608 ros::Time receipt_time = ros::Time::now(); 00609 00610 for (V_CallbackInfo::iterator cb = callbacks_.begin(); 00611 cb != callbacks_.end(); ++cb) 00612 { 00613 const CallbackInfoPtr& info = *cb; 00614 00615 ROS_ASSERT(info->callback_queue_); 00616 00617 const std::type_info* ti = &info->helper_->getTypeInfo(); 00618 00619 if ((nocopy && m.type_info && *ti == *m.type_info) || (ser && (!m.type_info || *ti != *m.type_info))) 00620 { 00621 MessageDeserializerPtr deserializer; 00622 00623 V_TypeAndDeserializer::iterator des_it = cached_deserializers_.begin(); 00624 V_TypeAndDeserializer::iterator des_end = cached_deserializers_.end(); 00625 for (; des_it != des_end; ++des_it) 00626 { 00627 if (*des_it->first == *ti) 00628 { 00629 deserializer = des_it->second; 00630 break; 00631 } 00632 } 00633 00634 if (!deserializer) 00635 { 00636 deserializer = boost::make_shared<MessageDeserializer>(info->helper_, m, connection_header); 00637 cached_deserializers_.push_back(std::make_pair(ti, deserializer)); 00638 } 00639 00640 bool was_full = false; 00641 bool nonconst_need_copy = false; 00642 if (callbacks_.size() > 1) 00643 { 00644 nonconst_need_copy = true; 00645 } 00646 00647 info->subscription_queue_->push(info->helper_, deserializer, info->has_tracked_object_, info->tracked_object_, nonconst_need_copy, receipt_time, &was_full); 00648 00649 if (was_full) 00650 { 00651 ++drops; 00652 } 00653 else 00654 { 00655 info->callback_queue_->addCallback(info->subscription_queue_, (uint64_t)info.get()); 00656 } 00657 } 00658 } 00659 00660 // If this link is latched, store off the message so we can immediately pass it to new subscribers later 00661 if (link->isLatched()) 00662 { 00663 LatchInfo li; 00664 li.connection_header = connection_header; 00665 li.link = link; 00666 li.message = m; 00667 li.receipt_time = receipt_time; 00668 latched_messages_[link] = li; 00669 } 00670 00671 cached_deserializers_.clear(); 00672 00673 return drops; 00674 } 00675 00676 bool Subscription::addCallback(const SubscriptionCallbackHelperPtr& helper, const std::string& md5sum, CallbackQueueInterface* queue, int32_t queue_size, const VoidConstPtr& tracked_object, bool allow_concurrent_callbacks) 00677 { 00678 ROS_ASSERT(helper); 00679 ROS_ASSERT(queue); 00680 00681 // Decay to a real type as soon as we have a subscriber with a real type 00682 { 00683 boost::mutex::scoped_lock lock(md5sum_mutex_); 00684 if (md5sum_ == "*" && md5sum != "*") 00685 { 00686 00687 md5sum_ = md5sum; 00688 } 00689 } 00690 00691 if (md5sum != "*" && md5sum != this->md5sum()) 00692 { 00693 return false; 00694 } 00695 00696 { 00697 boost::mutex::scoped_lock lock(callbacks_mutex_); 00698 00699 CallbackInfoPtr info(new CallbackInfo); 00700 info->helper_ = helper; 00701 info->callback_queue_ = queue; 00702 info->subscription_queue_.reset(new SubscriptionQueue(name_, queue_size, allow_concurrent_callbacks)); 00703 info->tracked_object_ = tracked_object; 00704 info->has_tracked_object_ = false; 00705 if (tracked_object) 00706 { 00707 info->has_tracked_object_ = true; 00708 } 00709 00710 if (!helper->isConst()) 00711 { 00712 ++nonconst_callbacks_; 00713 } 00714 00715 callbacks_.push_back(info); 00716 cached_deserializers_.reserve(callbacks_.size()); 00717 00718 // if we have any latched links, we need to immediately schedule callbacks 00719 if (!latched_messages_.empty()) 00720 { 00721 boost::mutex::scoped_lock lock(publisher_links_mutex_); 00722 00723 V_PublisherLink::iterator it = publisher_links_.begin(); 00724 V_PublisherLink::iterator end = publisher_links_.end(); 00725 for (; it != end;++it) 00726 { 00727 const PublisherLinkPtr& link = *it; 00728 if (link->isLatched()) 00729 { 00730 M_PublisherLinkToLatchInfo::iterator des_it = latched_messages_.find(link); 00731 if (des_it != latched_messages_.end()) 00732 { 00733 const LatchInfo& latch_info = des_it->second; 00734 00735 MessageDeserializerPtr des(new MessageDeserializer(helper, latch_info.message, latch_info.connection_header)); 00736 bool was_full = false; 00737 info->subscription_queue_->push(info->helper_, des, info->has_tracked_object_, info->tracked_object_, true, latch_info.receipt_time, &was_full); 00738 if (!was_full) 00739 { 00740 info->callback_queue_->addCallback(info->subscription_queue_, (uint64_t)info.get()); 00741 } 00742 } 00743 } 00744 } 00745 } 00746 } 00747 00748 return true; 00749 } 00750 00751 void Subscription::removeCallback(const SubscriptionCallbackHelperPtr& helper) 00752 { 00753 boost::mutex::scoped_lock cbs_lock(callbacks_mutex_); 00754 for (V_CallbackInfo::iterator it = callbacks_.begin(); 00755 it != callbacks_.end(); ++it) 00756 { 00757 if ((*it)->helper_ == helper) 00758 { 00759 const CallbackInfoPtr& info = *it; 00760 info->subscription_queue_->clear(); 00761 info->callback_queue_->removeByID((uint64_t)info.get()); 00762 callbacks_.erase(it); 00763 00764 if (!helper->isConst()) 00765 { 00766 --nonconst_callbacks_; 00767 } 00768 00769 break; 00770 } 00771 } 00772 } 00773 00774 void Subscription::headerReceived(const PublisherLinkPtr& link, const Header& h) 00775 { 00776 boost::mutex::scoped_lock lock(md5sum_mutex_); 00777 if (md5sum_ == "*") 00778 { 00779 md5sum_ = link->getMD5Sum(); 00780 } 00781 } 00782 00783 void Subscription::addPublisherLink(const PublisherLinkPtr& link) 00784 { 00785 publisher_links_.push_back(link); 00786 } 00787 00788 void Subscription::removePublisherLink(const PublisherLinkPtr& pub_link) 00789 { 00790 boost::mutex::scoped_lock lock(publisher_links_mutex_); 00791 00792 V_PublisherLink::iterator it = std::find(publisher_links_.begin(), publisher_links_.end(), pub_link); 00793 if (it != publisher_links_.end()) 00794 { 00795 publisher_links_.erase(it); 00796 } 00797 00798 if (pub_link->isLatched()) 00799 { 00800 latched_messages_.erase(pub_link); 00801 } 00802 } 00803 00804 void Subscription::getPublishTypes(bool& ser, bool& nocopy, const std::type_info& ti) 00805 { 00806 boost::mutex::scoped_lock lock(callbacks_mutex_); 00807 for (V_CallbackInfo::iterator cb = callbacks_.begin(); 00808 cb != callbacks_.end(); ++cb) 00809 { 00810 const CallbackInfoPtr& info = *cb; 00811 if (info->helper_->getTypeInfo() == ti) 00812 { 00813 nocopy = true; 00814 } 00815 else 00816 { 00817 ser = true; 00818 } 00819 00820 if (nocopy && ser) 00821 { 00822 return; 00823 } 00824 } 00825 } 00826 00827 const std::string Subscription::datatype() 00828 { 00829 return datatype_; 00830 } 00831 00832 const std::string Subscription::md5sum() 00833 { 00834 boost::mutex::scoped_lock lock(md5sum_mutex_); 00835 return md5sum_; 00836 } 00837 00838 }