00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include <sstream>
00036 #include <fcntl.h>
00037 #include <cerrno>
00038 #include <cstring>
00039 #include <typeinfo>
00040
00041 #include "ros/common.h"
00042 #include "ros/io.h"
00043 #include "ros/subscription.h"
00044 #include "ros/publication.h"
00045 #include "ros/transport_publisher_link.h"
00046 #include "ros/intraprocess_publisher_link.h"
00047 #include "ros/intraprocess_subscriber_link.h"
00048 #include "ros/connection.h"
00049 #include "ros/transport/transport_tcp.h"
00050 #include "ros/transport/transport_udp.h"
00051 #include "ros/callback_queue_interface.h"
00052 #include "ros/this_node.h"
00053 #include "ros/network.h"
00054 #include "ros/poll_manager.h"
00055 #include "ros/connection_manager.h"
00056 #include "ros/message_deserializer.h"
00057 #include "ros/subscription_queue.h"
00058 #include "ros/file_log.h"
00059 #include "ros/transport_hints.h"
00060 #include "ros/subscription_callback_helper.h"
00061
00062 #include <boost/make_shared.hpp>
00063
00064 using XmlRpc::XmlRpcValue;
00065
00066 namespace ros
00067 {
00068
00069 Subscription::Subscription(const std::string &name, const std::string& md5sum, const std::string& datatype, const TransportHints& transport_hints)
00070 : name_(name)
00071 , md5sum_(md5sum)
00072 , datatype_(datatype)
00073 , nonconst_callbacks_(0)
00074 , dropped_(false)
00075 , shutting_down_(false)
00076 , transport_hints_(transport_hints)
00077 {
00078 }
00079
00080 Subscription::~Subscription()
00081 {
00082 pending_connections_.clear();
00083 callbacks_.clear();
00084 }
00085
00086 void Subscription::shutdown()
00087 {
00088 {
00089 boost::mutex::scoped_lock lock(shutdown_mutex_);
00090 shutting_down_ = true;
00091 }
00092
00093 drop();
00094 }
00095
00096 XmlRpcValue Subscription::getStats()
00097 {
00098 XmlRpcValue stats;
00099 stats[0] = name_;
00100 XmlRpcValue conn_data;
00101 conn_data.setSize(0);
00102
00103 boost::mutex::scoped_lock lock(publisher_links_mutex_);
00104
00105 uint32_t cidx = 0;
00106 for (V_PublisherLink::iterator c = publisher_links_.begin();
00107 c != publisher_links_.end(); ++c)
00108 {
00109 const PublisherLink::Stats& s = (*c)->getStats();
00110 conn_data[cidx][0] = (*c)->getConnectionID();
00111 conn_data[cidx][1] = (int)s.bytes_received_;
00112 conn_data[cidx][2] = (int)s.messages_received_;
00113 conn_data[cidx][3] = (int)s.drops_;
00114 conn_data[cidx][4] = 0;
00115 }
00116
00117 stats[1] = conn_data;
00118 return stats;
00119 }
00120
00121
00122
00123
00124
00125
00126 void Subscription::getInfo(XmlRpc::XmlRpcValue& info)
00127 {
00128 boost::mutex::scoped_lock lock(publisher_links_mutex_);
00129
00130 for (V_PublisherLink::iterator c = publisher_links_.begin();
00131 c != publisher_links_.end(); ++c)
00132 {
00133 XmlRpcValue curr_info;
00134 curr_info[0] = (int)(*c)->getConnectionID();
00135 curr_info[1] = (*c)->getPublisherXMLRPCURI();
00136 curr_info[2] = "i";
00137 curr_info[3] = (*c)->getTransportType();
00138 curr_info[4] = name_;
00139 info[info.size()] = curr_info;
00140 }
00141 }
00142
00143 uint32_t Subscription::getNumPublishers()
00144 {
00145 boost::mutex::scoped_lock lock(publisher_links_mutex_);
00146 return (uint32_t)publisher_links_.size();
00147 }
00148
00149 void Subscription::drop()
00150 {
00151 if (!dropped_)
00152 {
00153 dropped_ = true;
00154
00155 dropAllConnections();
00156 }
00157 }
00158
00159 void Subscription::dropAllConnections()
00160 {
00161
00162
00163 V_PublisherLink localsubscribers;
00164
00165 {
00166 boost::mutex::scoped_lock lock(publisher_links_mutex_);
00167
00168 localsubscribers.swap(publisher_links_);
00169 }
00170
00171 V_PublisherLink::iterator it = localsubscribers.begin();
00172 V_PublisherLink::iterator end = localsubscribers.end();
00173 for (;it != end; ++it)
00174 {
00175 (*it)->drop();
00176 }
00177 }
00178
00179 void Subscription::addLocalConnection(const PublicationPtr& pub)
00180 {
00181 boost::mutex::scoped_lock lock(publisher_links_mutex_);
00182 if (dropped_)
00183 {
00184 return;
00185 }
00186
00187 ROSCPP_LOG_DEBUG("Creating intraprocess link for topic [%s]", name_.c_str());
00188
00189 IntraProcessPublisherLinkPtr pub_link(new IntraProcessPublisherLink(shared_from_this(), XMLRPCManager::instance()->getServerURI(), transport_hints_));
00190 IntraProcessSubscriberLinkPtr sub_link(new IntraProcessSubscriberLink(pub));
00191 pub_link->setPublisher(sub_link);
00192 sub_link->setSubscriber(pub_link);
00193
00194 addPublisherLink(pub_link);
00195 pub->addSubscriberLink(sub_link);
00196 }
00197
00198 bool urisEqual(const std::string& uri1, const std::string& uri2)
00199 {
00200 std::string host1, host2;
00201 uint32_t port1 = 0, port2 = 0;
00202 network::splitURI(uri1, host1, port1);
00203 network::splitURI(uri2, host2, port2);
00204 return port1 == port2 && host1 == host2;
00205 }
00206
00207 bool Subscription::pubUpdate(const V_string& new_pubs)
00208 {
00209 boost::mutex::scoped_lock lock(shutdown_mutex_);
00210
00211 if (shutting_down_ || dropped_)
00212 {
00213 return false;
00214 }
00215
00216 bool retval = true;
00217
00218 {
00219 std::stringstream ss;
00220
00221 for (V_string::const_iterator up_i = new_pubs.begin();
00222 up_i != new_pubs.end(); ++up_i)
00223 {
00224 ss << *up_i << ", ";
00225 }
00226
00227 ss << " already have these connections: ";
00228 for (V_PublisherLink::iterator spc = publisher_links_.begin();
00229 spc!= publisher_links_.end(); ++spc)
00230 {
00231 ss << (*spc)->getPublisherXMLRPCURI() << ", ";
00232 }
00233
00234 boost::mutex::scoped_lock lock(pending_connections_mutex_);
00235 S_PendingConnection::iterator it = pending_connections_.begin();
00236 S_PendingConnection::iterator end = pending_connections_.end();
00237 for (; it != end; ++it)
00238 {
00239 ss << (*it)->getRemoteURI() << ", ";
00240 }
00241
00242 ROSCPP_LOG_DEBUG("Publisher update for [%s]: %s", name_.c_str(), ss.str().c_str());
00243 }
00244
00245 V_string additions;
00246 V_PublisherLink subtractions;
00247 V_PublisherLink to_add;
00248
00249
00250 {
00251 boost::mutex::scoped_lock lock(publisher_links_mutex_);
00252
00253 for (V_PublisherLink::iterator spc = publisher_links_.begin();
00254 spc!= publisher_links_.end(); ++spc)
00255 {
00256 bool found = false;
00257 for (V_string::const_iterator up_i = new_pubs.begin();
00258 !found && up_i != new_pubs.end(); ++up_i)
00259 {
00260 if (urisEqual((*spc)->getPublisherXMLRPCURI(), *up_i))
00261 {
00262 found = true;
00263 break;
00264 }
00265 }
00266
00267 if (!found)
00268 {
00269 subtractions.push_back(*spc);
00270 }
00271 }
00272
00273 for (V_string::const_iterator up_i = new_pubs.begin(); up_i != new_pubs.end(); ++up_i)
00274 {
00275 bool found = false;
00276 for (V_PublisherLink::iterator spc = publisher_links_.begin();
00277 !found && spc != publisher_links_.end(); ++spc)
00278 {
00279 if (urisEqual(*up_i, (*spc)->getPublisherXMLRPCURI()))
00280 {
00281 found = true;
00282 break;
00283 }
00284 }
00285
00286 if (!found)
00287 {
00288 boost::mutex::scoped_lock lock(pending_connections_mutex_);
00289 S_PendingConnection::iterator it = pending_connections_.begin();
00290 S_PendingConnection::iterator end = pending_connections_.end();
00291 for (; it != end; ++it)
00292 {
00293 if (urisEqual(*up_i, (*it)->getRemoteURI()))
00294 {
00295 found = true;
00296 break;
00297 }
00298 }
00299 }
00300
00301 if (!found)
00302 {
00303 additions.push_back(*up_i);
00304 }
00305 }
00306 }
00307
00308 for (V_PublisherLink::iterator i = subtractions.begin(); i != subtractions.end(); ++i)
00309 {
00310 const PublisherLinkPtr& link = *i;
00311 if (link->getPublisherXMLRPCURI() != XMLRPCManager::instance()->getServerURI())
00312 {
00313 ROSCPP_LOG_DEBUG("Disconnecting from publisher [%s] of topic [%s] at [%s]",
00314 link->getCallerID().c_str(), name_.c_str(), link->getPublisherXMLRPCURI().c_str());
00315 link->drop();
00316 }
00317 else
00318 {
00319 ROSCPP_LOG_DEBUG("Disconnect: skipping myself for topic [%s]", name_.c_str());
00320 }
00321 }
00322
00323 for (V_string::iterator i = additions.begin();
00324 i != additions.end(); ++i)
00325 {
00326
00327 if (XMLRPCManager::instance()->getServerURI() != *i)
00328 {
00329 retval &= negotiateConnection(*i);
00330 }
00331 else
00332 {
00333 ROSCPP_LOG_DEBUG("Skipping myself (%s, %s)", name_.c_str(), XMLRPCManager::instance()->getServerURI().c_str());
00334 }
00335 }
00336
00337 return retval;
00338 }
00339
00340 bool Subscription::negotiateConnection(const std::string& xmlrpc_uri)
00341 {
00342 XmlRpcValue tcpros_array, protos_array, params;
00343 XmlRpcValue udpros_array;
00344 TransportUDPPtr udp_transport;
00345 int protos = 0;
00346 V_string transports = transport_hints_.getTransports();
00347 if (transports.empty())
00348 {
00349 transport_hints_.reliable();
00350 transports = transport_hints_.getTransports();
00351 }
00352 for (V_string::const_iterator it = transports.begin();
00353 it != transports.end();
00354 ++it)
00355 {
00356 if (*it == "UDP")
00357 {
00358 int max_datagram_size = transport_hints_.getMaxDatagramSize();
00359 udp_transport = TransportUDPPtr(new TransportUDP(&PollManager::instance()->getPollSet()));
00360 if (!max_datagram_size)
00361 max_datagram_size = udp_transport->getMaxDatagramSize();
00362 udp_transport->createIncoming(0, false);
00363 udpros_array[0] = "UDPROS";
00364 M_string m;
00365 m["topic"] = getName();
00366 m["md5sum"] = md5sum();
00367 m["callerid"] = this_node::getName();
00368 m["type"] = datatype();
00369 boost::shared_array<uint8_t> buffer;
00370 uint32_t len;
00371 Header::write(m, buffer, len);
00372 XmlRpcValue v(buffer.get(), len);
00373 udpros_array[1] = v;
00374 udpros_array[2] = network::getHost();
00375 udpros_array[3] = udp_transport->getServerPort();
00376 udpros_array[4] = max_datagram_size;
00377
00378 protos_array[protos++] = udpros_array;
00379 }
00380 else if (*it == "TCP")
00381 {
00382 tcpros_array[0] = std::string("TCPROS");
00383 protos_array[protos++] = tcpros_array;
00384 }
00385 else
00386 {
00387 ROS_WARN("Unsupported transport type hinted: %s, skipping", it->c_str());
00388 }
00389 }
00390 params[0] = this_node::getName();
00391 params[1] = name_;
00392 params[2] = protos_array;
00393 std::string peer_host;
00394 uint32_t peer_port;
00395 if (!network::splitURI(xmlrpc_uri, peer_host, peer_port))
00396 {
00397 ROS_ERROR("Bad xml-rpc URI: [%s]", xmlrpc_uri.c_str());
00398 return false;
00399 }
00400
00401 XmlRpc::XmlRpcClient* c = new XmlRpc::XmlRpcClient(peer_host.c_str(),
00402 peer_port, "/");
00403
00404
00405
00406 if (!c->executeNonBlock("requestTopic", params))
00407 {
00408 ROSCPP_LOG_DEBUG("Failed to contact publisher [%s:%d] for topic [%s]",
00409 peer_host.c_str(), peer_port, name_.c_str());
00410 delete c;
00411 if (udp_transport)
00412 {
00413 udp_transport->close();
00414 }
00415
00416 return false;
00417 }
00418
00419 ROSCPP_LOG_DEBUG("Began asynchronous xmlrpc connection to [%s:%d]", peer_host.c_str(), peer_port);
00420
00421
00422
00423 PendingConnectionPtr conn(new PendingConnection(c, udp_transport, shared_from_this(), xmlrpc_uri));
00424
00425 XMLRPCManager::instance()->addASyncConnection(conn);
00426
00427 {
00428 boost::mutex::scoped_lock pending_connections_lock(pending_connections_mutex_);
00429 pending_connections_.insert(conn);
00430 }
00431
00432 return true;
00433 }
00434
00435 void closeTransport(const TransportUDPPtr& trans)
00436 {
00437 if (trans)
00438 {
00439 trans->close();
00440 }
00441 }
00442
00443 void Subscription::pendingConnectionDone(const PendingConnectionPtr& conn, XmlRpcValue& result)
00444 {
00445 boost::mutex::scoped_lock lock(shutdown_mutex_);
00446 if (shutting_down_ || dropped_)
00447 {
00448 return;
00449 }
00450
00451 {
00452 boost::mutex::scoped_lock pending_connections_lock(pending_connections_mutex_);
00453 pending_connections_.erase(conn);
00454 }
00455
00456 TransportUDPPtr udp_transport;
00457
00458 std::string peer_host = conn->getClient()->getHost();
00459 uint32_t peer_port = conn->getClient()->getPort();
00460 std::stringstream ss;
00461 ss << "http://" << peer_host << ":" << peer_port << "/";
00462 std::string xmlrpc_uri = ss.str();
00463 udp_transport = conn->getUDPTransport();
00464
00465 XmlRpc::XmlRpcValue proto;
00466 if(!XMLRPCManager::instance()->validateXmlrpcResponse("requestTopic", result, proto))
00467 {
00468 ROSCPP_LOG_DEBUG("Failed to contact publisher [%s:%d] for topic [%s]",
00469 peer_host.c_str(), peer_port, name_.c_str());
00470 closeTransport(udp_transport);
00471 return;
00472 }
00473
00474 if (proto.size() == 0)
00475 {
00476 ROSCPP_LOG_DEBUG("Couldn't agree on any common protocols with [%s] for topic [%s]", xmlrpc_uri.c_str(), name_.c_str());
00477 closeTransport(udp_transport);
00478 return;
00479 }
00480
00481 if (proto.getType() != XmlRpcValue::TypeArray)
00482 {
00483 ROSCPP_LOG_DEBUG("Available protocol info returned from %s is not a list.", xmlrpc_uri.c_str());
00484 closeTransport(udp_transport);
00485 return;
00486 }
00487 if (proto[0].getType() != XmlRpcValue::TypeString)
00488 {
00489 ROSCPP_LOG_DEBUG("Available protocol info list doesn't have a string as its first element.");
00490 closeTransport(udp_transport);
00491 return;
00492 }
00493
00494 std::string proto_name = proto[0];
00495 if (proto_name == "TCPROS")
00496 {
00497 if (proto.size() != 3 ||
00498 proto[1].getType() != XmlRpcValue::TypeString ||
00499 proto[2].getType() != XmlRpcValue::TypeInt)
00500 {
00501 ROSCPP_LOG_DEBUG("publisher implements TCPROS, but the " \
00502 "parameters aren't string,int");
00503 return;
00504 }
00505 std::string pub_host = proto[1];
00506 int pub_port = proto[2];
00507 ROSCPP_LOG_DEBUG("Connecting via tcpros to topic [%s] at host [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
00508
00509 TransportTCPPtr transport(new TransportTCP(&PollManager::instance()->getPollSet()));
00510 if (transport->connect(pub_host, pub_port))
00511 {
00512 ConnectionPtr connection(new Connection());
00513 TransportPublisherLinkPtr pub_link(new TransportPublisherLink(shared_from_this(), xmlrpc_uri, transport_hints_));
00514
00515 connection->initialize(transport, false, HeaderReceivedFunc());
00516 pub_link->initialize(connection);
00517
00518 ConnectionManager::instance()->addConnection(connection);
00519
00520 boost::mutex::scoped_lock lock(publisher_links_mutex_);
00521 addPublisherLink(pub_link);
00522
00523 ROSCPP_LOG_DEBUG("Connected to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
00524 }
00525 else
00526 {
00527 ROSCPP_LOG_DEBUG("Failed to connect to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
00528 }
00529 }
00530 else if (proto_name == "UDPROS")
00531 {
00532 if (proto.size() != 6 ||
00533 proto[1].getType() != XmlRpcValue::TypeString ||
00534 proto[2].getType() != XmlRpcValue::TypeInt ||
00535 proto[3].getType() != XmlRpcValue::TypeInt ||
00536 proto[4].getType() != XmlRpcValue::TypeInt ||
00537 proto[5].getType() != XmlRpcValue::TypeBase64)
00538 {
00539 ROSCPP_LOG_DEBUG("publisher implements UDPROS, but the " \
00540 "parameters aren't string,int,int,int,base64");
00541 closeTransport(udp_transport);
00542 return;
00543 }
00544 std::string pub_host = proto[1];
00545 int pub_port = proto[2];
00546 int conn_id = proto[3];
00547 int max_datagram_size = proto[4];
00548 std::vector<char> header_bytes = proto[5];
00549 boost::shared_array<uint8_t> buffer = boost::shared_array<uint8_t>(new uint8_t[header_bytes.size()]);
00550 memcpy(buffer.get(), &header_bytes[0], header_bytes.size());
00551 Header h;
00552 std::string err;
00553 if (!h.parse(buffer, header_bytes.size(), err))
00554 {
00555 ROSCPP_LOG_DEBUG("Unable to parse UDPROS connection header: %s", err.c_str());
00556 closeTransport(udp_transport);
00557 return;
00558 }
00559 ROSCPP_LOG_DEBUG("Connecting via udpros to topic [%s] at host [%s:%d] connection id [%08x] max_datagram_size [%d]", name_.c_str(), pub_host.c_str(), pub_port, conn_id, max_datagram_size);
00560
00561 std::string error_msg;
00562 if (h.getValue("error", error_msg))
00563 {
00564 ROSCPP_LOG_DEBUG("Received error message in header for connection to [%s]: [%s]", xmlrpc_uri.c_str(), error_msg.c_str());
00565 closeTransport(udp_transport);
00566 return;
00567 }
00568
00569 TransportPublisherLinkPtr pub_link(new TransportPublisherLink(shared_from_this(), xmlrpc_uri, transport_hints_));
00570 if (pub_link->setHeader(h))
00571 {
00572 ConnectionPtr connection(new Connection());
00573 connection->initialize(udp_transport, false, NULL);
00574 connection->setHeader(h);
00575 pub_link->initialize(connection);
00576
00577 ConnectionManager::instance()->addConnection(connection);
00578
00579 boost::mutex::scoped_lock lock(publisher_links_mutex_);
00580 addPublisherLink(pub_link);
00581
00582 ROSCPP_LOG_DEBUG("Connected to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
00583 }
00584 else
00585 {
00586 ROSCPP_LOG_DEBUG("Failed to connect to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
00587 closeTransport(udp_transport);
00588 return;
00589 }
00590 }
00591 else
00592 {
00593 ROSCPP_LOG_DEBUG("Publisher offered unsupported transport [%s]", proto_name.c_str());
00594 }
00595 }
00596
00597 uint32_t Subscription::handleMessage(const SerializedMessage& m, bool ser, bool nocopy, const boost::shared_ptr<M_string>& connection_header, const PublisherLinkPtr& link)
00598 {
00599 boost::mutex::scoped_lock lock(callbacks_mutex_);
00600
00601 uint32_t drops = 0;
00602
00603
00604
00605
00606 cached_deserializers_.clear();
00607
00608 ros::Time receipt_time = ros::Time::now();
00609
00610 for (V_CallbackInfo::iterator cb = callbacks_.begin();
00611 cb != callbacks_.end(); ++cb)
00612 {
00613 const CallbackInfoPtr& info = *cb;
00614
00615 ROS_ASSERT(info->callback_queue_);
00616
00617 const std::type_info* ti = &info->helper_->getTypeInfo();
00618
00619 if ((nocopy && m.type_info && *ti == *m.type_info) || (ser && (!m.type_info || *ti != *m.type_info)))
00620 {
00621 MessageDeserializerPtr deserializer;
00622
00623 V_TypeAndDeserializer::iterator des_it = cached_deserializers_.begin();
00624 V_TypeAndDeserializer::iterator des_end = cached_deserializers_.end();
00625 for (; des_it != des_end; ++des_it)
00626 {
00627 if (*des_it->first == *ti)
00628 {
00629 deserializer = des_it->second;
00630 break;
00631 }
00632 }
00633
00634 if (!deserializer)
00635 {
00636 deserializer = boost::make_shared<MessageDeserializer>(info->helper_, m, connection_header);
00637 cached_deserializers_.push_back(std::make_pair(ti, deserializer));
00638 }
00639
00640 bool was_full = false;
00641 bool nonconst_need_copy = false;
00642 if (callbacks_.size() > 1)
00643 {
00644 nonconst_need_copy = true;
00645 }
00646
00647 info->subscription_queue_->push(info->helper_, deserializer, info->has_tracked_object_, info->tracked_object_, nonconst_need_copy, receipt_time, &was_full);
00648
00649 if (was_full)
00650 {
00651 ++drops;
00652 }
00653 else
00654 {
00655 info->callback_queue_->addCallback(info->subscription_queue_, (uint64_t)info.get());
00656 }
00657 }
00658 }
00659
00660
00661 if (link->isLatched())
00662 {
00663 LatchInfo li;
00664 li.connection_header = connection_header;
00665 li.link = link;
00666 li.message = m;
00667 li.receipt_time = receipt_time;
00668 latched_messages_[link] = li;
00669 }
00670
00671 cached_deserializers_.clear();
00672
00673 return drops;
00674 }
00675
00676 bool Subscription::addCallback(const SubscriptionCallbackHelperPtr& helper, const std::string& md5sum, CallbackQueueInterface* queue, int32_t queue_size, const VoidConstPtr& tracked_object, bool allow_concurrent_callbacks)
00677 {
00678 ROS_ASSERT(helper);
00679 ROS_ASSERT(queue);
00680
00681
00682 {
00683 boost::mutex::scoped_lock lock(md5sum_mutex_);
00684 if (md5sum_ == "*" && md5sum != "*")
00685 {
00686
00687 md5sum_ = md5sum;
00688 }
00689 }
00690
00691 if (md5sum != "*" && md5sum != this->md5sum())
00692 {
00693 return false;
00694 }
00695
00696 {
00697 boost::mutex::scoped_lock lock(callbacks_mutex_);
00698
00699 CallbackInfoPtr info(new CallbackInfo);
00700 info->helper_ = helper;
00701 info->callback_queue_ = queue;
00702 info->subscription_queue_.reset(new SubscriptionQueue(name_, queue_size, allow_concurrent_callbacks));
00703 info->tracked_object_ = tracked_object;
00704 info->has_tracked_object_ = false;
00705 if (tracked_object)
00706 {
00707 info->has_tracked_object_ = true;
00708 }
00709
00710 if (!helper->isConst())
00711 {
00712 ++nonconst_callbacks_;
00713 }
00714
00715 callbacks_.push_back(info);
00716 cached_deserializers_.reserve(callbacks_.size());
00717
00718
00719 if (!latched_messages_.empty())
00720 {
00721 boost::mutex::scoped_lock lock(publisher_links_mutex_);
00722
00723 V_PublisherLink::iterator it = publisher_links_.begin();
00724 V_PublisherLink::iterator end = publisher_links_.end();
00725 for (; it != end;++it)
00726 {
00727 const PublisherLinkPtr& link = *it;
00728 if (link->isLatched())
00729 {
00730 M_PublisherLinkToLatchInfo::iterator des_it = latched_messages_.find(link);
00731 if (des_it != latched_messages_.end())
00732 {
00733 const LatchInfo& latch_info = des_it->second;
00734
00735 MessageDeserializerPtr des(new MessageDeserializer(helper, latch_info.message, latch_info.connection_header));
00736 bool was_full = false;
00737 info->subscription_queue_->push(info->helper_, des, info->has_tracked_object_, info->tracked_object_, true, latch_info.receipt_time, &was_full);
00738 if (!was_full)
00739 {
00740 info->callback_queue_->addCallback(info->subscription_queue_, (uint64_t)info.get());
00741 }
00742 }
00743 }
00744 }
00745 }
00746 }
00747
00748 return true;
00749 }
00750
00751 void Subscription::removeCallback(const SubscriptionCallbackHelperPtr& helper)
00752 {
00753 CallbackInfoPtr info;
00754 {
00755 boost::mutex::scoped_lock cbs_lock(callbacks_mutex_);
00756 for (V_CallbackInfo::iterator it = callbacks_.begin();
00757 it != callbacks_.end(); ++it)
00758 {
00759 if ((*it)->helper_ == helper)
00760 {
00761 info = *it;
00762 callbacks_.erase(it);
00763
00764 if (!helper->isConst())
00765 {
00766 --nonconst_callbacks_;
00767 }
00768
00769 break;
00770 }
00771 }
00772 }
00773
00774 if (info)
00775 {
00776 info->subscription_queue_->clear();
00777 info->callback_queue_->removeByID((uint64_t)info.get());
00778 }
00779 }
00780
00781 void Subscription::headerReceived(const PublisherLinkPtr& link, const Header& h)
00782 {
00783 boost::mutex::scoped_lock lock(md5sum_mutex_);
00784 if (md5sum_ == "*")
00785 {
00786 md5sum_ = link->getMD5Sum();
00787 }
00788 }
00789
00790 void Subscription::addPublisherLink(const PublisherLinkPtr& link)
00791 {
00792 publisher_links_.push_back(link);
00793 }
00794
00795 void Subscription::removePublisherLink(const PublisherLinkPtr& pub_link)
00796 {
00797 boost::mutex::scoped_lock lock(publisher_links_mutex_);
00798
00799 V_PublisherLink::iterator it = std::find(publisher_links_.begin(), publisher_links_.end(), pub_link);
00800 if (it != publisher_links_.end())
00801 {
00802 publisher_links_.erase(it);
00803 }
00804
00805 if (pub_link->isLatched())
00806 {
00807 latched_messages_.erase(pub_link);
00808 }
00809 }
00810
00811 void Subscription::getPublishTypes(bool& ser, bool& nocopy, const std::type_info& ti)
00812 {
00813 boost::mutex::scoped_lock lock(callbacks_mutex_);
00814 for (V_CallbackInfo::iterator cb = callbacks_.begin();
00815 cb != callbacks_.end(); ++cb)
00816 {
00817 const CallbackInfoPtr& info = *cb;
00818 if (info->helper_->getTypeInfo() == ti)
00819 {
00820 nocopy = true;
00821 }
00822 else
00823 {
00824 ser = true;
00825 }
00826
00827 if (nocopy && ser)
00828 {
00829 return;
00830 }
00831 }
00832 }
00833
00834 const std::string Subscription::datatype()
00835 {
00836 return datatype_;
00837 }
00838
00839 const std::string Subscription::md5sum()
00840 {
00841 boost::mutex::scoped_lock lock(md5sum_mutex_);
00842 return md5sum_;
00843 }
00844
00845 }