00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include <sstream>
00036 #include <fcntl.h>
00037 #include <cerrno>
00038 #include <cstring>
00039 #include <typeinfo>
00040
00041 #include "ros/common.h"
00042 #include "ros/io.h"
00043 #include "ros/subscription.h"
00044 #include "ros/publication.h"
00045 #include "ros/transport_publisher_link.h"
00046 #include "ros/intraprocess_publisher_link.h"
00047 #include "ros/intraprocess_subscriber_link.h"
00048 #include "ros/connection.h"
00049 #include "ros/transport/transport_tcp.h"
00050 #include "ros/transport/transport_udp.h"
00051 #include "ros/callback_queue_interface.h"
00052 #include "ros/this_node.h"
00053 #include "ros/network.h"
00054 #include "ros/poll_manager.h"
00055 #include "ros/connection_manager.h"
00056 #include "ros/message_deserializer.h"
00057 #include "ros/subscription_queue.h"
00058 #include "ros/file_log.h"
00059 #include "ros/transport_hints.h"
00060 #include "ros/subscription_callback_helper.h"
00061
00062 #include <boost/make_shared.hpp>
00063
00064 using XmlRpc::XmlRpcValue;
00065
00066 namespace ros
00067 {
00068
00069 Subscription::Subscription(const std::string &name, const std::string& md5sum, const std::string& datatype, const TransportHints& transport_hints)
00070 : name_(name)
00071 , md5sum_(md5sum)
00072 , datatype_(datatype)
00073 , nonconst_callbacks_(0)
00074 , dropped_(false)
00075 , shutting_down_(false)
00076 , transport_hints_(transport_hints)
00077 {
00078 }
00079
00080 Subscription::~Subscription()
00081 {
00082 pending_connections_.clear();
00083 callbacks_.clear();
00084 }
00085
00086 void Subscription::shutdown()
00087 {
00088 {
00089 boost::mutex::scoped_lock lock(shutdown_mutex_);
00090 shutting_down_ = true;
00091 }
00092
00093 drop();
00094 }
00095
00096 XmlRpcValue Subscription::getStats()
00097 {
00098 XmlRpcValue stats;
00099 stats[0] = name_;
00100 XmlRpcValue conn_data;
00101 conn_data.setSize(0);
00102
00103 boost::mutex::scoped_lock lock(publisher_links_mutex_);
00104
00105 uint32_t cidx = 0;
00106 for (V_PublisherLink::iterator c = publisher_links_.begin();
00107 c != publisher_links_.end(); ++c)
00108 {
00109 const PublisherLink::Stats& s = (*c)->getStats();
00110 conn_data[cidx][0] = (*c)->getConnectionID();
00111 conn_data[cidx][1] = (int)s.bytes_received_;
00112 conn_data[cidx][2] = (int)s.messages_received_;
00113 conn_data[cidx][3] = (int)s.drops_;
00114 conn_data[cidx][4] = 0;
00115 }
00116
00117 stats[1] = conn_data;
00118 return stats;
00119 }
00120
00121
00122
00123 void Subscription::getInfo(XmlRpc::XmlRpcValue& info)
00124 {
00125 boost::mutex::scoped_lock lock(publisher_links_mutex_);
00126
00127 for (V_PublisherLink::iterator c = publisher_links_.begin();
00128 c != publisher_links_.end(); ++c)
00129 {
00130 XmlRpcValue curr_info;
00131 curr_info[0] = (int)(*c)->getConnectionID();
00132 curr_info[1] = (*c)->getPublisherXMLRPCURI();
00133 curr_info[2] = "i";
00134 curr_info[3] = (*c)->getTransportType();
00135 curr_info[4] = name_;
00136 curr_info[5] = true;
00137 curr_info[6] = (*c)->getTransportInfo();
00138 info[info.size()] = curr_info;
00139 }
00140 }
00141
00142 uint32_t Subscription::getNumPublishers()
00143 {
00144 boost::mutex::scoped_lock lock(publisher_links_mutex_);
00145 return (uint32_t)publisher_links_.size();
00146 }
00147
00148 void Subscription::drop()
00149 {
00150 if (!dropped_)
00151 {
00152 dropped_ = true;
00153
00154 dropAllConnections();
00155 }
00156 }
00157
00158 void Subscription::dropAllConnections()
00159 {
00160
00161
00162 V_PublisherLink localsubscribers;
00163
00164 {
00165 boost::mutex::scoped_lock lock(publisher_links_mutex_);
00166
00167 localsubscribers.swap(publisher_links_);
00168 }
00169
00170 V_PublisherLink::iterator it = localsubscribers.begin();
00171 V_PublisherLink::iterator end = localsubscribers.end();
00172 for (;it != end; ++it)
00173 {
00174 (*it)->drop();
00175 }
00176 }
00177
00178 void Subscription::addLocalConnection(const PublicationPtr& pub)
00179 {
00180 boost::mutex::scoped_lock lock(publisher_links_mutex_);
00181 if (dropped_)
00182 {
00183 return;
00184 }
00185
00186 ROSCPP_LOG_DEBUG("Creating intraprocess link for topic [%s]", name_.c_str());
00187
00188 IntraProcessPublisherLinkPtr pub_link(boost::make_shared<IntraProcessPublisherLink>(shared_from_this(), XMLRPCManager::instance()->getServerURI(), transport_hints_));
00189 IntraProcessSubscriberLinkPtr sub_link(boost::make_shared<IntraProcessSubscriberLink>(pub));
00190 pub_link->setPublisher(sub_link);
00191 sub_link->setSubscriber(pub_link);
00192
00193 addPublisherLink(pub_link);
00194 pub->addSubscriberLink(sub_link);
00195 }
00196
00197 bool urisEqual(const std::string& uri1, const std::string& uri2)
00198 {
00199 std::string host1, host2;
00200 uint32_t port1 = 0, port2 = 0;
00201 network::splitURI(uri1, host1, port1);
00202 network::splitURI(uri2, host2, port2);
00203 return port1 == port2 && host1 == host2;
00204 }
00205
00206 bool Subscription::pubUpdate(const V_string& new_pubs)
00207 {
00208 boost::mutex::scoped_lock lock(shutdown_mutex_);
00209
00210 if (shutting_down_ || dropped_)
00211 {
00212 return false;
00213 }
00214
00215 bool retval = true;
00216
00217 {
00218 std::stringstream ss;
00219
00220 for (V_string::const_iterator up_i = new_pubs.begin();
00221 up_i != new_pubs.end(); ++up_i)
00222 {
00223 ss << *up_i << ", ";
00224 }
00225
00226 ss << " already have these connections: ";
00227 for (V_PublisherLink::iterator spc = publisher_links_.begin();
00228 spc!= publisher_links_.end(); ++spc)
00229 {
00230 ss << (*spc)->getPublisherXMLRPCURI() << ", ";
00231 }
00232
00233 boost::mutex::scoped_lock lock(pending_connections_mutex_);
00234 S_PendingConnection::iterator it = pending_connections_.begin();
00235 S_PendingConnection::iterator end = pending_connections_.end();
00236 for (; it != end; ++it)
00237 {
00238 ss << (*it)->getRemoteURI() << ", ";
00239 }
00240
00241 ROSCPP_LOG_DEBUG("Publisher update for [%s]: %s", name_.c_str(), ss.str().c_str());
00242 }
00243
00244 V_string additions;
00245 V_PublisherLink subtractions;
00246 V_PublisherLink to_add;
00247
00248
00249 {
00250 boost::mutex::scoped_lock lock(publisher_links_mutex_);
00251
00252 for (V_PublisherLink::iterator spc = publisher_links_.begin();
00253 spc!= publisher_links_.end(); ++spc)
00254 {
00255 bool found = false;
00256 for (V_string::const_iterator up_i = new_pubs.begin();
00257 !found && up_i != new_pubs.end(); ++up_i)
00258 {
00259 if (urisEqual((*spc)->getPublisherXMLRPCURI(), *up_i))
00260 {
00261 found = true;
00262 break;
00263 }
00264 }
00265
00266 if (!found)
00267 {
00268 subtractions.push_back(*spc);
00269 }
00270 }
00271
00272 for (V_string::const_iterator up_i = new_pubs.begin(); up_i != new_pubs.end(); ++up_i)
00273 {
00274 bool found = false;
00275 for (V_PublisherLink::iterator spc = publisher_links_.begin();
00276 !found && spc != publisher_links_.end(); ++spc)
00277 {
00278 if (urisEqual(*up_i, (*spc)->getPublisherXMLRPCURI()))
00279 {
00280 found = true;
00281 break;
00282 }
00283 }
00284
00285 if (!found)
00286 {
00287 boost::mutex::scoped_lock lock(pending_connections_mutex_);
00288 S_PendingConnection::iterator it = pending_connections_.begin();
00289 S_PendingConnection::iterator end = pending_connections_.end();
00290 for (; it != end; ++it)
00291 {
00292 if (urisEqual(*up_i, (*it)->getRemoteURI()))
00293 {
00294 found = true;
00295 break;
00296 }
00297 }
00298 }
00299
00300 if (!found)
00301 {
00302 additions.push_back(*up_i);
00303 }
00304 }
00305 }
00306
00307 for (V_PublisherLink::iterator i = subtractions.begin(); i != subtractions.end(); ++i)
00308 {
00309 const PublisherLinkPtr& link = *i;
00310 if (link->getPublisherXMLRPCURI() != XMLRPCManager::instance()->getServerURI())
00311 {
00312 ROSCPP_LOG_DEBUG("Disconnecting from publisher [%s] of topic [%s] at [%s]",
00313 link->getCallerID().c_str(), name_.c_str(), link->getPublisherXMLRPCURI().c_str());
00314 link->drop();
00315 }
00316 else
00317 {
00318 ROSCPP_LOG_DEBUG("Disconnect: skipping myself for topic [%s]", name_.c_str());
00319 }
00320 }
00321
00322 for (V_string::iterator i = additions.begin();
00323 i != additions.end(); ++i)
00324 {
00325
00326 if (XMLRPCManager::instance()->getServerURI() != *i)
00327 {
00328 retval &= negotiateConnection(*i);
00329 }
00330 else
00331 {
00332 ROSCPP_LOG_DEBUG("Skipping myself (%s, %s)", name_.c_str(), XMLRPCManager::instance()->getServerURI().c_str());
00333 }
00334 }
00335
00336 return retval;
00337 }
00338
00339 bool Subscription::negotiateConnection(const std::string& xmlrpc_uri)
00340 {
00341 XmlRpcValue tcpros_array, protos_array, params;
00342 XmlRpcValue udpros_array;
00343 TransportUDPPtr udp_transport;
00344 int protos = 0;
00345 V_string transports = transport_hints_.getTransports();
00346 if (transports.empty())
00347 {
00348 transport_hints_.reliable();
00349 transports = transport_hints_.getTransports();
00350 }
00351 for (V_string::const_iterator it = transports.begin();
00352 it != transports.end();
00353 ++it)
00354 {
00355 if (*it == "UDP")
00356 {
00357 int max_datagram_size = transport_hints_.getMaxDatagramSize();
00358 udp_transport = boost::make_shared<TransportUDP>(&PollManager::instance()->getPollSet());
00359 if (!max_datagram_size)
00360 max_datagram_size = udp_transport->getMaxDatagramSize();
00361 udp_transport->createIncoming(0, false);
00362 udpros_array[0] = "UDPROS";
00363 M_string m;
00364 m["topic"] = getName();
00365 m["md5sum"] = md5sum();
00366 m["callerid"] = this_node::getName();
00367 m["type"] = datatype();
00368 boost::shared_array<uint8_t> buffer;
00369 uint32_t len;
00370 Header::write(m, buffer, len);
00371 XmlRpcValue v(buffer.get(), len);
00372 udpros_array[1] = v;
00373 udpros_array[2] = network::getHost();
00374 udpros_array[3] = udp_transport->getServerPort();
00375 udpros_array[4] = max_datagram_size;
00376
00377 protos_array[protos++] = udpros_array;
00378 }
00379 else if (*it == "TCP")
00380 {
00381 tcpros_array[0] = std::string("TCPROS");
00382 protos_array[protos++] = tcpros_array;
00383 }
00384 else
00385 {
00386 ROS_WARN("Unsupported transport type hinted: %s, skipping", it->c_str());
00387 }
00388 }
00389 params[0] = this_node::getName();
00390 params[1] = name_;
00391 params[2] = protos_array;
00392 std::string peer_host;
00393 uint32_t peer_port;
00394 if (!network::splitURI(xmlrpc_uri, peer_host, peer_port))
00395 {
00396 ROS_ERROR("Bad xml-rpc URI: [%s]", xmlrpc_uri.c_str());
00397 return false;
00398 }
00399
00400 XmlRpc::XmlRpcClient* c = new XmlRpc::XmlRpcClient(peer_host.c_str(),
00401 peer_port, "/");
00402
00403
00404
00405 if (!c->executeNonBlock("requestTopic", params))
00406 {
00407 ROSCPP_LOG_DEBUG("Failed to contact publisher [%s:%d] for topic [%s]",
00408 peer_host.c_str(), peer_port, name_.c_str());
00409 delete c;
00410 if (udp_transport)
00411 {
00412 udp_transport->close();
00413 }
00414
00415 return false;
00416 }
00417
00418 ROSCPP_LOG_DEBUG("Began asynchronous xmlrpc connection to [%s:%d]", peer_host.c_str(), peer_port);
00419
00420
00421
00422 PendingConnectionPtr conn(boost::make_shared<PendingConnection>(c, udp_transport, shared_from_this(), xmlrpc_uri));
00423
00424 XMLRPCManager::instance()->addASyncConnection(conn);
00425
00426 {
00427 boost::mutex::scoped_lock pending_connections_lock(pending_connections_mutex_);
00428 pending_connections_.insert(conn);
00429 }
00430
00431 return true;
00432 }
00433
00434 void closeTransport(const TransportUDPPtr& trans)
00435 {
00436 if (trans)
00437 {
00438 trans->close();
00439 }
00440 }
00441
00442 void Subscription::pendingConnectionDone(const PendingConnectionPtr& conn, XmlRpcValue& result)
00443 {
00444 boost::mutex::scoped_lock lock(shutdown_mutex_);
00445 if (shutting_down_ || dropped_)
00446 {
00447 return;
00448 }
00449
00450 {
00451 boost::mutex::scoped_lock pending_connections_lock(pending_connections_mutex_);
00452 pending_connections_.erase(conn);
00453 }
00454
00455 TransportUDPPtr udp_transport;
00456
00457 std::string peer_host = conn->getClient()->getHost();
00458 uint32_t peer_port = conn->getClient()->getPort();
00459 std::stringstream ss;
00460 ss << "http://" << peer_host << ":" << peer_port << "/";
00461 std::string xmlrpc_uri = ss.str();
00462 udp_transport = conn->getUDPTransport();
00463
00464 XmlRpc::XmlRpcValue proto;
00465 if(!XMLRPCManager::instance()->validateXmlrpcResponse("requestTopic", result, proto))
00466 {
00467 ROSCPP_LOG_DEBUG("Failed to contact publisher [%s:%d] for topic [%s]",
00468 peer_host.c_str(), peer_port, name_.c_str());
00469 closeTransport(udp_transport);
00470 return;
00471 }
00472
00473 if (proto.size() == 0)
00474 {
00475 ROSCPP_LOG_DEBUG("Couldn't agree on any common protocols with [%s] for topic [%s]", xmlrpc_uri.c_str(), name_.c_str());
00476 closeTransport(udp_transport);
00477 return;
00478 }
00479
00480 if (proto.getType() != XmlRpcValue::TypeArray)
00481 {
00482 ROSCPP_LOG_DEBUG("Available protocol info returned from %s is not a list.", xmlrpc_uri.c_str());
00483 closeTransport(udp_transport);
00484 return;
00485 }
00486 if (proto[0].getType() != XmlRpcValue::TypeString)
00487 {
00488 ROSCPP_LOG_DEBUG("Available protocol info list doesn't have a string as its first element.");
00489 closeTransport(udp_transport);
00490 return;
00491 }
00492
00493 std::string proto_name = proto[0];
00494 if (proto_name == "TCPROS")
00495 {
00496 if (proto.size() != 3 ||
00497 proto[1].getType() != XmlRpcValue::TypeString ||
00498 proto[2].getType() != XmlRpcValue::TypeInt)
00499 {
00500 ROSCPP_LOG_DEBUG("publisher implements TCPROS, but the " \
00501 "parameters aren't string,int");
00502 return;
00503 }
00504 std::string pub_host = proto[1];
00505 int pub_port = proto[2];
00506 ROSCPP_LOG_DEBUG("Connecting via tcpros to topic [%s] at host [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
00507
00508 TransportTCPPtr transport(boost::make_shared<TransportTCP>(&PollManager::instance()->getPollSet()));
00509 if (transport->connect(pub_host, pub_port))
00510 {
00511 ConnectionPtr connection(boost::make_shared<Connection>());
00512 TransportPublisherLinkPtr pub_link(boost::make_shared<TransportPublisherLink>(shared_from_this(), xmlrpc_uri, transport_hints_));
00513
00514 connection->initialize(transport, false, HeaderReceivedFunc());
00515 pub_link->initialize(connection);
00516
00517 ConnectionManager::instance()->addConnection(connection);
00518
00519 boost::mutex::scoped_lock lock(publisher_links_mutex_);
00520 addPublisherLink(pub_link);
00521
00522 ROSCPP_LOG_DEBUG("Connected to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
00523 }
00524 else
00525 {
00526 ROSCPP_LOG_DEBUG("Failed to connect to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
00527 }
00528 }
00529 else if (proto_name == "UDPROS")
00530 {
00531 if (proto.size() != 6 ||
00532 proto[1].getType() != XmlRpcValue::TypeString ||
00533 proto[2].getType() != XmlRpcValue::TypeInt ||
00534 proto[3].getType() != XmlRpcValue::TypeInt ||
00535 proto[4].getType() != XmlRpcValue::TypeInt ||
00536 proto[5].getType() != XmlRpcValue::TypeBase64)
00537 {
00538 ROSCPP_LOG_DEBUG("publisher implements UDPROS, but the " \
00539 "parameters aren't string,int,int,int,base64");
00540 closeTransport(udp_transport);
00541 return;
00542 }
00543 std::string pub_host = proto[1];
00544 int pub_port = proto[2];
00545 int conn_id = proto[3];
00546 int max_datagram_size = proto[4];
00547 std::vector<char> header_bytes = proto[5];
00548 boost::shared_array<uint8_t> buffer(new uint8_t[header_bytes.size()]);
00549 memcpy(buffer.get(), &header_bytes[0], header_bytes.size());
00550 Header h;
00551 std::string err;
00552 if (!h.parse(buffer, header_bytes.size(), err))
00553 {
00554 ROSCPP_LOG_DEBUG("Unable to parse UDPROS connection header: %s", err.c_str());
00555 closeTransport(udp_transport);
00556 return;
00557 }
00558 ROSCPP_LOG_DEBUG("Connecting via udpros to topic [%s] at host [%s:%d] connection id [%08x] max_datagram_size [%d]", name_.c_str(), pub_host.c_str(), pub_port, conn_id, max_datagram_size);
00559
00560 std::string error_msg;
00561 if (h.getValue("error", error_msg))
00562 {
00563 ROSCPP_LOG_DEBUG("Received error message in header for connection to [%s]: [%s]", xmlrpc_uri.c_str(), error_msg.c_str());
00564 closeTransport(udp_transport);
00565 return;
00566 }
00567
00568 TransportPublisherLinkPtr pub_link(boost::make_shared<TransportPublisherLink>(shared_from_this(), xmlrpc_uri, transport_hints_));
00569 if (pub_link->setHeader(h))
00570 {
00571 ConnectionPtr connection(boost::make_shared<Connection>());
00572 connection->initialize(udp_transport, false, NULL);
00573 connection->setHeader(h);
00574 pub_link->initialize(connection);
00575
00576 ConnectionManager::instance()->addConnection(connection);
00577
00578 boost::mutex::scoped_lock lock(publisher_links_mutex_);
00579 addPublisherLink(pub_link);
00580
00581 ROSCPP_LOG_DEBUG("Connected to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
00582 }
00583 else
00584 {
00585 ROSCPP_LOG_DEBUG("Failed to connect to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
00586 closeTransport(udp_transport);
00587 return;
00588 }
00589 }
00590 else
00591 {
00592 ROSCPP_LOG_DEBUG("Publisher offered unsupported transport [%s]", proto_name.c_str());
00593 }
00594 }
00595
00596 uint32_t Subscription::handleMessage(const SerializedMessage& m, bool ser, bool nocopy, const boost::shared_ptr<M_string>& connection_header, const PublisherLinkPtr& link)
00597 {
00598 boost::mutex::scoped_lock lock(callbacks_mutex_);
00599
00600 uint32_t drops = 0;
00601
00602
00603
00604
00605 cached_deserializers_.clear();
00606
00607 ros::Time receipt_time = ros::Time::now();
00608
00609 for (V_CallbackInfo::iterator cb = callbacks_.begin();
00610 cb != callbacks_.end(); ++cb)
00611 {
00612 const CallbackInfoPtr& info = *cb;
00613
00614 ROS_ASSERT(info->callback_queue_);
00615
00616 const std::type_info* ti = &info->helper_->getTypeInfo();
00617
00618 if ((nocopy && m.type_info && *ti == *m.type_info) || (ser && (!m.type_info || *ti != *m.type_info)))
00619 {
00620 MessageDeserializerPtr deserializer;
00621
00622 V_TypeAndDeserializer::iterator des_it = cached_deserializers_.begin();
00623 V_TypeAndDeserializer::iterator des_end = cached_deserializers_.end();
00624 for (; des_it != des_end; ++des_it)
00625 {
00626 if (*des_it->first == *ti)
00627 {
00628 deserializer = des_it->second;
00629 break;
00630 }
00631 }
00632
00633 if (!deserializer)
00634 {
00635 deserializer = boost::make_shared<MessageDeserializer>(info->helper_, m, connection_header);
00636 cached_deserializers_.push_back(std::make_pair(ti, deserializer));
00637 }
00638
00639 bool was_full = false;
00640 bool nonconst_need_copy = false;
00641 if (callbacks_.size() > 1)
00642 {
00643 nonconst_need_copy = true;
00644 }
00645
00646 info->subscription_queue_->push(info->helper_, deserializer, info->has_tracked_object_, info->tracked_object_, nonconst_need_copy, receipt_time, &was_full);
00647
00648 if (was_full)
00649 {
00650 ++drops;
00651 }
00652 else
00653 {
00654 info->callback_queue_->addCallback(info->subscription_queue_, (uint64_t)info.get());
00655 }
00656 }
00657 }
00658
00659
00660 statistics_.callback(connection_header, name_, link->getCallerID(), m, link->getStats().bytes_received_, receipt_time, drops > 0);
00661
00662
00663 if (link->isLatched())
00664 {
00665 LatchInfo li;
00666 li.connection_header = connection_header;
00667 li.link = link;
00668 li.message = m;
00669 li.receipt_time = receipt_time;
00670 latched_messages_[link] = li;
00671 }
00672
00673 cached_deserializers_.clear();
00674
00675 return drops;
00676 }
00677
00678 bool Subscription::addCallback(const SubscriptionCallbackHelperPtr& helper, const std::string& md5sum, CallbackQueueInterface* queue, int32_t queue_size, const VoidConstPtr& tracked_object, bool allow_concurrent_callbacks)
00679 {
00680 ROS_ASSERT(helper);
00681 ROS_ASSERT(queue);
00682
00683 statistics_.init(helper);
00684
00685
00686 {
00687 boost::mutex::scoped_lock lock(md5sum_mutex_);
00688 if (md5sum_ == "*" && md5sum != "*")
00689 {
00690
00691 md5sum_ = md5sum;
00692 }
00693 }
00694
00695 if (md5sum != "*" && md5sum != this->md5sum())
00696 {
00697 return false;
00698 }
00699
00700 {
00701 boost::mutex::scoped_lock lock(callbacks_mutex_);
00702
00703 CallbackInfoPtr info(boost::make_shared<CallbackInfo>());
00704 info->helper_ = helper;
00705 info->callback_queue_ = queue;
00706 info->subscription_queue_ = boost::make_shared<SubscriptionQueue>(name_, queue_size, allow_concurrent_callbacks);
00707 info->tracked_object_ = tracked_object;
00708 info->has_tracked_object_ = false;
00709 if (tracked_object)
00710 {
00711 info->has_tracked_object_ = true;
00712 }
00713
00714 if (!helper->isConst())
00715 {
00716 ++nonconst_callbacks_;
00717 }
00718
00719 callbacks_.push_back(info);
00720 cached_deserializers_.reserve(callbacks_.size());
00721
00722
00723 if (!latched_messages_.empty())
00724 {
00725 boost::mutex::scoped_lock lock(publisher_links_mutex_);
00726
00727 V_PublisherLink::iterator it = publisher_links_.begin();
00728 V_PublisherLink::iterator end = publisher_links_.end();
00729 for (; it != end;++it)
00730 {
00731 const PublisherLinkPtr& link = *it;
00732 if (link->isLatched())
00733 {
00734 M_PublisherLinkToLatchInfo::iterator des_it = latched_messages_.find(link);
00735 if (des_it != latched_messages_.end())
00736 {
00737 const LatchInfo& latch_info = des_it->second;
00738
00739 MessageDeserializerPtr des(boost::make_shared<MessageDeserializer>(helper, latch_info.message, latch_info.connection_header));
00740 bool was_full = false;
00741 info->subscription_queue_->push(info->helper_, des, info->has_tracked_object_, info->tracked_object_, true, latch_info.receipt_time, &was_full);
00742 if (!was_full)
00743 {
00744 info->callback_queue_->addCallback(info->subscription_queue_, (uint64_t)info.get());
00745 }
00746 }
00747 }
00748 }
00749 }
00750 }
00751
00752 return true;
00753 }
00754
00755 void Subscription::removeCallback(const SubscriptionCallbackHelperPtr& helper)
00756 {
00757 CallbackInfoPtr info;
00758 {
00759 boost::mutex::scoped_lock cbs_lock(callbacks_mutex_);
00760 for (V_CallbackInfo::iterator it = callbacks_.begin();
00761 it != callbacks_.end(); ++it)
00762 {
00763 if ((*it)->helper_ == helper)
00764 {
00765 info = *it;
00766 callbacks_.erase(it);
00767
00768 if (!helper->isConst())
00769 {
00770 --nonconst_callbacks_;
00771 }
00772
00773 break;
00774 }
00775 }
00776 }
00777
00778 if (info)
00779 {
00780 info->subscription_queue_->clear();
00781 info->callback_queue_->removeByID((uint64_t)info.get());
00782 }
00783 }
00784
00785 void Subscription::headerReceived(const PublisherLinkPtr& link, const Header& h)
00786 {
00787 (void)h;
00788 boost::mutex::scoped_lock lock(md5sum_mutex_);
00789 if (md5sum_ == "*")
00790 {
00791 md5sum_ = link->getMD5Sum();
00792 }
00793 }
00794
00795 void Subscription::addPublisherLink(const PublisherLinkPtr& link)
00796 {
00797 publisher_links_.push_back(link);
00798 }
00799
00800 void Subscription::removePublisherLink(const PublisherLinkPtr& pub_link)
00801 {
00802 boost::mutex::scoped_lock lock(publisher_links_mutex_);
00803
00804 V_PublisherLink::iterator it = std::find(publisher_links_.begin(), publisher_links_.end(), pub_link);
00805 if (it != publisher_links_.end())
00806 {
00807 publisher_links_.erase(it);
00808 }
00809
00810 if (pub_link->isLatched())
00811 {
00812 latched_messages_.erase(pub_link);
00813 }
00814 }
00815
00816 void Subscription::getPublishTypes(bool& ser, bool& nocopy, const std::type_info& ti)
00817 {
00818 boost::mutex::scoped_lock lock(callbacks_mutex_);
00819 for (V_CallbackInfo::iterator cb = callbacks_.begin();
00820 cb != callbacks_.end(); ++cb)
00821 {
00822 const CallbackInfoPtr& info = *cb;
00823 if (info->helper_->getTypeInfo() == ti)
00824 {
00825 nocopy = true;
00826 }
00827 else
00828 {
00829 ser = true;
00830 }
00831
00832 if (nocopy && ser)
00833 {
00834 return;
00835 }
00836 }
00837 }
00838
00839 const std::string Subscription::datatype()
00840 {
00841 return datatype_;
00842 }
00843
00844 const std::string Subscription::md5sum()
00845 {
00846 boost::mutex::scoped_lock lock(md5sum_mutex_);
00847 return md5sum_;
00848 }
00849
00850 }