00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include <sstream>
00036 #include <arpa/inet.h>
00037 #include <netdb.h>
00038 #include <fcntl.h>
00039 #include <sys/poll.h>
00040 #include <cerrno>
00041 #include <cstring>
00042 #include <typeinfo>
00043
00044 #include "ros/common.h"
00045 #include "ros/subscription.h"
00046 #include "ros/publication.h"
00047 #include "ros/transport_publisher_link.h"
00048 #include "ros/intraprocess_publisher_link.h"
00049 #include "ros/intraprocess_subscriber_link.h"
00050 #include "ros/connection.h"
00051 #include "ros/transport/transport_tcp.h"
00052 #include "ros/transport/transport_udp.h"
00053 #include "ros/callback_queue_interface.h"
00054 #include "ros/this_node.h"
00055 #include "ros/network.h"
00056 #include "ros/poll_manager.h"
00057 #include "ros/connection_manager.h"
00058 #include "ros/message_deserializer.h"
00059 #include "ros/subscription_queue.h"
00060 #include "ros/file_log.h"
00061 #include "ros/transport_hints.h"
00062 #include "ros/subscription_callback_helper.h"
00063
00064 #include <boost/make_shared.hpp>
00065
00066 using XmlRpc::XmlRpcValue;
00067
00068 namespace ros
00069 {
00070
00071 Subscription::Subscription(const std::string &name, const std::string& md5sum, const std::string& datatype, const TransportHints& transport_hints)
00072 : name_(name)
00073 , md5sum_(md5sum)
00074 , datatype_(datatype)
00075 , nonconst_callbacks_(0)
00076 , dropped_(false)
00077 , shutting_down_(false)
00078 , transport_hints_(transport_hints)
00079 {
00080 }
00081
00082 Subscription::~Subscription()
00083 {
00084 pending_connections_.clear();
00085 callbacks_.clear();
00086 }
00087
00088 void Subscription::shutdown()
00089 {
00090 {
00091 boost::mutex::scoped_lock lock(shutdown_mutex_);
00092 shutting_down_ = true;
00093 }
00094
00095 drop();
00096 }
00097
00098 XmlRpcValue Subscription::getStats()
00099 {
00100 XmlRpcValue stats;
00101 stats[0] = name_;
00102 XmlRpcValue conn_data;
00103 conn_data.setSize(0);
00104
00105 boost::mutex::scoped_lock lock(publisher_links_mutex_);
00106
00107 uint32_t cidx = 0;
00108 for (V_PublisherLink::iterator c = publisher_links_.begin();
00109 c != publisher_links_.end(); ++c)
00110 {
00111 const PublisherLink::Stats& s = (*c)->getStats();
00112 conn_data[cidx][0] = (*c)->getConnectionID();
00113 conn_data[cidx][1] = (int)s.bytes_received_;
00114 conn_data[cidx][2] = (int)s.messages_received_;
00115 conn_data[cidx][3] = (int)s.drops_;
00116 conn_data[cidx][4] = 0;
00117 }
00118
00119 stats[1] = conn_data;
00120 return stats;
00121 }
00122
00123
00124
00125
00126
00127
00128 void Subscription::getInfo(XmlRpc::XmlRpcValue& info)
00129 {
00130 boost::mutex::scoped_lock lock(publisher_links_mutex_);
00131
00132 for (V_PublisherLink::iterator c = publisher_links_.begin();
00133 c != publisher_links_.end(); ++c)
00134 {
00135 XmlRpcValue curr_info;
00136 curr_info[0] = (int)(*c)->getConnectionID();
00137 curr_info[1] = (*c)->getPublisherXMLRPCURI();
00138 curr_info[2] = "i";
00139 curr_info[3] = (*c)->getTransportType();
00140 curr_info[4] = name_;
00141 info[info.size()] = curr_info;
00142 }
00143 }
00144
00145 uint32_t Subscription::getNumPublishers()
00146 {
00147 boost::mutex::scoped_lock lock(publisher_links_mutex_);
00148 return (uint32_t)publisher_links_.size();
00149 }
00150
00151 void Subscription::drop()
00152 {
00153 if (!dropped_)
00154 {
00155 dropped_ = true;
00156
00157 dropAllConnections();
00158 }
00159 }
00160
00161 void Subscription::dropAllConnections()
00162 {
00163
00164
00165 V_PublisherLink localsubscribers;
00166
00167 {
00168 boost::mutex::scoped_lock lock(publisher_links_mutex_);
00169
00170 localsubscribers.swap(publisher_links_);
00171 }
00172
00173 V_PublisherLink::iterator it = localsubscribers.begin();
00174 V_PublisherLink::iterator end = localsubscribers.end();
00175 for (;it != end; ++it)
00176 {
00177 (*it)->drop();
00178 }
00179 }
00180
00181 void Subscription::addLocalConnection(const PublicationPtr& pub)
00182 {
00183 boost::mutex::scoped_lock lock(publisher_links_mutex_);
00184 if (dropped_)
00185 {
00186 return;
00187 }
00188
00189 ROSCPP_LOG_DEBUG("Creating intraprocess link for topic [%s]", name_.c_str());
00190
00191 IntraProcessPublisherLinkPtr pub_link(new IntraProcessPublisherLink(shared_from_this(), XMLRPCManager::instance()->getServerURI(), transport_hints_));
00192 IntraProcessSubscriberLinkPtr sub_link(new IntraProcessSubscriberLink(pub));
00193 pub_link->setPublisher(sub_link);
00194 sub_link->setSubscriber(pub_link);
00195
00196 addPublisherLink(pub_link);
00197 pub->addSubscriberLink(sub_link);
00198 }
00199
00200 bool urisEqual(const std::string& uri1, const std::string& uri2)
00201 {
00202 std::string host1, host2;
00203 uint32_t port1 = 0, port2 = 0;
00204 network::splitURI(uri1, host1, port1);
00205 network::splitURI(uri2, host2, port2);
00206 return port1 == port2 && host1 == host2;
00207 }
00208
00209 bool Subscription::pubUpdate(const V_string& new_pubs)
00210 {
00211 boost::mutex::scoped_lock lock(shutdown_mutex_);
00212
00213 if (shutting_down_ || dropped_)
00214 {
00215 return false;
00216 }
00217
00218 bool retval = true;
00219
00220 {
00221 std::stringstream ss;
00222
00223 for (V_string::const_iterator up_i = new_pubs.begin();
00224 up_i != new_pubs.end(); ++up_i)
00225 {
00226 ss << *up_i << ", ";
00227 }
00228
00229 ss << " already have these connections: ";
00230 for (V_PublisherLink::iterator spc = publisher_links_.begin();
00231 spc!= publisher_links_.end(); ++spc)
00232 {
00233 ss << (*spc)->getPublisherXMLRPCURI() << ", ";
00234 }
00235
00236 boost::mutex::scoped_lock lock(pending_connections_mutex_);
00237 S_PendingConnection::iterator it = pending_connections_.begin();
00238 S_PendingConnection::iterator end = pending_connections_.end();
00239 for (; it != end; ++it)
00240 {
00241 ss << (*it)->getRemoteURI() << ", ";
00242 }
00243
00244 ROSCPP_LOG_DEBUG("Publisher update for [%s]: %s", name_.c_str(), ss.str().c_str());
00245 }
00246
00247 V_string additions;
00248 V_PublisherLink subtractions;
00249 V_PublisherLink to_add;
00250
00251
00252 {
00253 boost::mutex::scoped_lock lock(publisher_links_mutex_);
00254
00255 for (V_PublisherLink::iterator spc = publisher_links_.begin();
00256 spc!= publisher_links_.end(); ++spc)
00257 {
00258 bool found = false;
00259 for (V_string::const_iterator up_i = new_pubs.begin();
00260 !found && up_i != new_pubs.end(); ++up_i)
00261 {
00262 if (urisEqual((*spc)->getPublisherXMLRPCURI(), *up_i))
00263 {
00264 found = true;
00265 break;
00266 }
00267 }
00268
00269 if (!found)
00270 {
00271 subtractions.push_back(*spc);
00272 }
00273 }
00274
00275 for (V_string::const_iterator up_i = new_pubs.begin(); up_i != new_pubs.end(); ++up_i)
00276 {
00277 bool found = false;
00278 for (V_PublisherLink::iterator spc = publisher_links_.begin();
00279 !found && spc != publisher_links_.end(); ++spc)
00280 {
00281 if (urisEqual(*up_i, (*spc)->getPublisherXMLRPCURI()))
00282 {
00283 found = true;
00284 break;
00285 }
00286 }
00287
00288 if (!found)
00289 {
00290 boost::mutex::scoped_lock lock(pending_connections_mutex_);
00291 S_PendingConnection::iterator it = pending_connections_.begin();
00292 S_PendingConnection::iterator end = pending_connections_.end();
00293 for (; it != end; ++it)
00294 {
00295 if (urisEqual(*up_i, (*it)->getRemoteURI()))
00296 {
00297 found = true;
00298 break;
00299 }
00300 }
00301 }
00302
00303 if (!found)
00304 {
00305 additions.push_back(*up_i);
00306 }
00307 }
00308 }
00309
00310 for (V_PublisherLink::iterator i = subtractions.begin(); i != subtractions.end(); ++i)
00311 {
00312 const PublisherLinkPtr& link = *i;
00313 if (link->getPublisherXMLRPCURI() != XMLRPCManager::instance()->getServerURI())
00314 {
00315 ROSCPP_LOG_DEBUG("Disconnecting from publisher [%s] of topic [%s] at [%s]",
00316 link->getCallerID().c_str(), name_.c_str(), link->getPublisherXMLRPCURI().c_str());
00317 link->drop();
00318 }
00319 else
00320 {
00321 ROSCPP_LOG_DEBUG("Disconnect: skipping myself for topic [%s]", name_.c_str());
00322 }
00323 }
00324
00325 for (V_string::iterator i = additions.begin();
00326 i != additions.end(); ++i)
00327 {
00328
00329 if (XMLRPCManager::instance()->getServerURI() != *i)
00330 {
00331 retval &= negotiateConnection(*i);
00332 }
00333 else
00334 {
00335 ROSCPP_LOG_DEBUG("Skipping myself (%s, %s)", name_.c_str(), XMLRPCManager::instance()->getServerURI().c_str());
00336 }
00337 }
00338
00339 return retval;
00340 }
00341
00342 bool Subscription::negotiateConnection(const std::string& xmlrpc_uri)
00343 {
00344 XmlRpcValue tcpros_array, protos_array, params;
00345 XmlRpcValue udpros_array;
00346 TransportUDPPtr udp_transport;
00347 int protos = 0;
00348 V_string transports = transport_hints_.getTransports();
00349 if (transports.empty())
00350 {
00351 transport_hints_.reliable();
00352 transports = transport_hints_.getTransports();
00353 }
00354 for (V_string::const_iterator it = transports.begin();
00355 it != transports.end();
00356 ++it)
00357 {
00358 if (*it == "UDP")
00359 {
00360 int max_datagram_size = transport_hints_.getMaxDatagramSize();
00361 udp_transport = TransportUDPPtr(new TransportUDP(&PollManager::instance()->getPollSet()));
00362 if (!max_datagram_size)
00363 max_datagram_size = udp_transport->getMaxDatagramSize();
00364 udp_transport->createIncoming(0, false);
00365 udpros_array[0] = "UDPROS";
00366 M_string m;
00367 m["topic"] = getName();
00368 m["md5sum"] = md5sum();
00369 m["callerid"] = this_node::getName();
00370 m["type"] = datatype();
00371 boost::shared_array<uint8_t> buffer;
00372 uint32_t len;
00373 Header::write(m, buffer, len);
00374 XmlRpcValue v(buffer.get(), len);
00375 udpros_array[1] = v;
00376 udpros_array[2] = network::getHost();
00377 udpros_array[3] = udp_transport->getServerPort();
00378 udpros_array[4] = max_datagram_size;
00379
00380 protos_array[protos++] = udpros_array;
00381 }
00382 else if (*it == "TCP")
00383 {
00384 tcpros_array[0] = std::string("TCPROS");
00385 protos_array[protos++] = tcpros_array;
00386 }
00387 else
00388 {
00389 ROS_WARN("Unsupported transport type hinted: %s, skipping", it->c_str());
00390 }
00391 }
00392 params[0] = this_node::getName();
00393 params[1] = name_;
00394 params[2] = protos_array;
00395 std::string peer_host;
00396 uint32_t peer_port;
00397 if (!network::splitURI(xmlrpc_uri, peer_host, peer_port))
00398 {
00399 ROS_ERROR("Bad xml-rpc URI: [%s]", xmlrpc_uri.c_str());
00400 return false;
00401 }
00402
00403 XmlRpc::XmlRpcClient* c = new XmlRpc::XmlRpcClient(peer_host.c_str(),
00404 peer_port, "/");
00405
00406
00407
00408 if (!c->executeNonBlock("requestTopic", params))
00409 {
00410 ROSCPP_LOG_DEBUG("Failed to contact publisher [%s:%d] for topic [%s]",
00411 peer_host.c_str(), peer_port, name_.c_str());
00412 delete c;
00413 if (udp_transport)
00414 {
00415 udp_transport->close();
00416 }
00417
00418 return false;
00419 }
00420
00421 ROSCPP_LOG_DEBUG("Began asynchronous xmlrpc connection to [%s:%d]", peer_host.c_str(), peer_port);
00422
00423
00424
00425 PendingConnectionPtr conn(new PendingConnection(c, udp_transport, shared_from_this(), xmlrpc_uri));
00426
00427 XMLRPCManager::instance()->addASyncConnection(conn);
00428
00429 {
00430 boost::mutex::scoped_lock pending_connections_lock(pending_connections_mutex_);
00431 pending_connections_.insert(conn);
00432 }
00433
00434 return true;
00435 }
00436
00437 void closeTransport(const TransportUDPPtr& trans)
00438 {
00439 if (trans)
00440 {
00441 trans->close();
00442 }
00443 }
00444
00445 void Subscription::pendingConnectionDone(const PendingConnectionPtr& conn, XmlRpcValue& result)
00446 {
00447 boost::mutex::scoped_lock lock(shutdown_mutex_);
00448 if (shutting_down_ || dropped_)
00449 {
00450 return;
00451 }
00452
00453 {
00454 boost::mutex::scoped_lock pending_connections_lock(pending_connections_mutex_);
00455 pending_connections_.erase(conn);
00456 }
00457
00458 TransportUDPPtr udp_transport;
00459
00460 std::string peer_host = conn->getClient()->getHost();
00461 uint32_t peer_port = conn->getClient()->getPort();
00462 std::stringstream ss;
00463 ss << "http://" << peer_host << ":" << peer_port << "/";
00464 std::string xmlrpc_uri = ss.str();
00465 udp_transport = conn->getUDPTransport();
00466
00467 XmlRpc::XmlRpcValue proto;
00468 if(!XMLRPCManager::instance()->validateXmlrpcResponse("requestTopic", result, proto))
00469 {
00470 ROSCPP_LOG_DEBUG("Failed to contact publisher [%s:%d] for topic [%s]",
00471 peer_host.c_str(), peer_port, name_.c_str());
00472 closeTransport(udp_transport);
00473 return;
00474 }
00475
00476 if (proto.size() == 0)
00477 {
00478 ROSCPP_LOG_DEBUG("Couldn't agree on any common protocols with [%s] for topic [%s]", xmlrpc_uri.c_str(), name_.c_str());
00479 closeTransport(udp_transport);
00480 return;
00481 }
00482
00483 if (proto.getType() != XmlRpcValue::TypeArray)
00484 {
00485 ROSCPP_LOG_DEBUG("Available protocol info returned from %s is not a list.", xmlrpc_uri.c_str());
00486 closeTransport(udp_transport);
00487 return;
00488 }
00489 if (proto[0].getType() != XmlRpcValue::TypeString)
00490 {
00491 ROSCPP_LOG_DEBUG("Available protocol info list doesn't have a string as its first element.");
00492 closeTransport(udp_transport);
00493 return;
00494 }
00495
00496 std::string proto_name = proto[0];
00497 if (proto_name == "TCPROS")
00498 {
00499 if (proto.size() != 3 ||
00500 proto[1].getType() != XmlRpcValue::TypeString ||
00501 proto[2].getType() != XmlRpcValue::TypeInt)
00502 {
00503 ROSCPP_LOG_DEBUG("publisher implements TCPROS, but the " \
00504 "parameters aren't string,int");
00505 return;
00506 }
00507 std::string pub_host = proto[1];
00508 int pub_port = proto[2];
00509 ROSCPP_LOG_DEBUG("Connecting via tcpros to topic [%s] at host [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
00510
00511 TransportTCPPtr transport(new TransportTCP(&PollManager::instance()->getPollSet()));
00512 if (transport->connect(pub_host, pub_port))
00513 {
00514 ConnectionPtr connection(new Connection());
00515 TransportPublisherLinkPtr pub_link(new TransportPublisherLink(shared_from_this(), xmlrpc_uri, transport_hints_));
00516
00517 connection->initialize(transport, false, HeaderReceivedFunc());
00518 pub_link->initialize(connection);
00519
00520 ConnectionManager::instance()->addConnection(connection);
00521
00522 boost::mutex::scoped_lock lock(publisher_links_mutex_);
00523 addPublisherLink(pub_link);
00524
00525 ROSCPP_LOG_DEBUG("Connected to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
00526 }
00527 else
00528 {
00529 ROSCPP_LOG_DEBUG("Failed to connect to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
00530 }
00531 }
00532 else if (proto_name == "UDPROS")
00533 {
00534 if (proto.size() != 6 ||
00535 proto[1].getType() != XmlRpcValue::TypeString ||
00536 proto[2].getType() != XmlRpcValue::TypeInt ||
00537 proto[3].getType() != XmlRpcValue::TypeInt ||
00538 proto[4].getType() != XmlRpcValue::TypeInt ||
00539 proto[5].getType() != XmlRpcValue::TypeBase64)
00540 {
00541 ROSCPP_LOG_DEBUG("publisher implements UDPROS, but the " \
00542 "parameters aren't string,int,int,int,base64");
00543 closeTransport(udp_transport);
00544 return;
00545 }
00546 std::string pub_host = proto[1];
00547 int pub_port = proto[2];
00548 int conn_id = proto[3];
00549 int max_datagram_size = proto[4];
00550 std::vector<char> header_bytes = proto[5];
00551 boost::shared_array<uint8_t> buffer = boost::shared_array<uint8_t>(new uint8_t[header_bytes.size()]);
00552 memcpy(buffer.get(), &header_bytes[0], header_bytes.size());
00553 Header h;
00554 std::string err;
00555 if (!h.parse(buffer, header_bytes.size(), err))
00556 {
00557 ROSCPP_LOG_DEBUG("Unable to parse UDPROS connection header: %s", err.c_str());
00558 closeTransport(udp_transport);
00559 return;
00560 }
00561 ROSCPP_LOG_DEBUG("Connecting via udpros to topic [%s] at host [%s:%d] connection id [%08x] max_datagram_size [%d]", name_.c_str(), pub_host.c_str(), pub_port, conn_id, max_datagram_size);
00562
00563 std::string error_msg;
00564 if (h.getValue("error", error_msg))
00565 {
00566 ROSCPP_LOG_DEBUG("Received error message in header for connection to [%s]: [%s]", xmlrpc_uri.c_str(), error_msg.c_str());
00567 closeTransport(udp_transport);
00568 return;
00569 }
00570
00571 TransportPublisherLinkPtr pub_link(new TransportPublisherLink(shared_from_this(), xmlrpc_uri, transport_hints_));
00572 if (pub_link->setHeader(h))
00573 {
00574 ConnectionPtr connection(new Connection());
00575 connection->initialize(udp_transport, false, NULL);
00576 connection->setHeader(h);
00577 pub_link->initialize(connection);
00578
00579 ConnectionManager::instance()->addConnection(connection);
00580
00581 boost::mutex::scoped_lock lock(publisher_links_mutex_);
00582 addPublisherLink(pub_link);
00583
00584 ROSCPP_LOG_DEBUG("Connected to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
00585 }
00586 else
00587 {
00588 ROSCPP_LOG_DEBUG("Failed to connect to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
00589 closeTransport(udp_transport);
00590 return;
00591 }
00592 }
00593 else
00594 {
00595 ROSCPP_LOG_DEBUG("Publisher offered unsupported transport [%s]", proto_name.c_str());
00596 }
00597 }
00598
00599 uint32_t Subscription::handleMessage(const SerializedMessage& m, bool ser, bool nocopy, const boost::shared_ptr<M_string>& connection_header, const PublisherLinkPtr& link)
00600 {
00601 boost::mutex::scoped_lock lock(callbacks_mutex_);
00602
00603 uint32_t drops = 0;
00604
00605
00606
00607
00608 cached_deserializers_.clear();
00609
00610 ros::Time receipt_time = ros::Time::now();
00611
00612 for (V_CallbackInfo::iterator cb = callbacks_.begin();
00613 cb != callbacks_.end(); ++cb)
00614 {
00615 const CallbackInfoPtr& info = *cb;
00616
00617 ROS_ASSERT(info->callback_queue_);
00618
00619 const std::type_info* ti = &info->helper_->getTypeInfo();
00620
00621 if ((nocopy && m.type_info && *ti == *m.type_info) || (ser && (!m.type_info || *ti != *m.type_info)))
00622 {
00623 MessageDeserializerPtr deserializer;
00624
00625 V_TypeAndDeserializer::iterator des_it = cached_deserializers_.begin();
00626 V_TypeAndDeserializer::iterator des_end = cached_deserializers_.end();
00627 for (; des_it != des_end; ++des_it)
00628 {
00629 if (*des_it->first == *ti)
00630 {
00631 deserializer = des_it->second;
00632 break;
00633 }
00634 }
00635
00636 if (!deserializer)
00637 {
00638 deserializer = boost::make_shared<MessageDeserializer>(info->helper_, m, connection_header);
00639 cached_deserializers_.push_back(std::make_pair(ti, deserializer));
00640 }
00641
00642 bool was_full = false;
00643 bool nonconst_need_copy = false;
00644 if (callbacks_.size() > 1)
00645 {
00646 nonconst_need_copy = true;
00647 }
00648
00649 info->subscription_queue_->push(info->helper_, deserializer, info->has_tracked_object_, info->tracked_object_, nonconst_need_copy, receipt_time, &was_full);
00650
00651 if (was_full)
00652 {
00653 ++drops;
00654 }
00655 else
00656 {
00657 info->callback_queue_->addCallback(info->subscription_queue_, (uint64_t)info.get());
00658 }
00659 }
00660 }
00661
00662
00663 if (link->isLatched())
00664 {
00665 LatchInfo li;
00666 li.connection_header = connection_header;
00667 li.link = link;
00668 li.message = m;
00669 li.receipt_time = receipt_time;
00670 latched_messages_[link] = li;
00671 }
00672
00673 cached_deserializers_.clear();
00674
00675 return drops;
00676 }
00677
00678 bool Subscription::addCallback(const SubscriptionCallbackHelperPtr& helper, const std::string& md5sum, CallbackQueueInterface* queue, int32_t queue_size, const VoidConstPtr& tracked_object, bool allow_concurrent_callbacks)
00679 {
00680 ROS_ASSERT(helper);
00681 ROS_ASSERT(queue);
00682
00683
00684 {
00685 boost::mutex::scoped_lock lock(md5sum_mutex_);
00686 if (md5sum_ == "*" && md5sum != "*")
00687 {
00688
00689 md5sum_ = md5sum;
00690 }
00691 }
00692
00693 if (md5sum != "*" && md5sum != this->md5sum())
00694 {
00695 return false;
00696 }
00697
00698 {
00699 boost::mutex::scoped_lock lock(callbacks_mutex_);
00700
00701 CallbackInfoPtr info(new CallbackInfo);
00702 info->helper_ = helper;
00703 info->callback_queue_ = queue;
00704 info->subscription_queue_.reset(new SubscriptionQueue(name_, queue_size, allow_concurrent_callbacks));
00705 info->tracked_object_ = tracked_object;
00706 info->has_tracked_object_ = false;
00707 if (tracked_object)
00708 {
00709 info->has_tracked_object_ = true;
00710 }
00711
00712 if (!helper->isConst())
00713 {
00714 ++nonconst_callbacks_;
00715 }
00716
00717 callbacks_.push_back(info);
00718 cached_deserializers_.reserve(callbacks_.size());
00719
00720
00721 if (!latched_messages_.empty())
00722 {
00723 boost::mutex::scoped_lock lock(publisher_links_mutex_);
00724
00725 V_PublisherLink::iterator it = publisher_links_.begin();
00726 V_PublisherLink::iterator end = publisher_links_.end();
00727 for (; it != end;++it)
00728 {
00729 const PublisherLinkPtr& link = *it;
00730 if (link->isLatched())
00731 {
00732 M_PublisherLinkToLatchInfo::iterator des_it = latched_messages_.find(link);
00733 if (des_it != latched_messages_.end())
00734 {
00735 const LatchInfo& latch_info = des_it->second;
00736
00737 MessageDeserializerPtr des(new MessageDeserializer(helper, latch_info.message, latch_info.connection_header));
00738 bool was_full = false;
00739 info->subscription_queue_->push(info->helper_, des, info->has_tracked_object_, info->tracked_object_, true, latch_info.receipt_time, &was_full);
00740 if (!was_full)
00741 {
00742 info->callback_queue_->addCallback(info->subscription_queue_, (uint64_t)info.get());
00743 }
00744 }
00745 }
00746 }
00747 }
00748 }
00749
00750 return true;
00751 }
00752
00753 void Subscription::removeCallback(const SubscriptionCallbackHelperPtr& helper)
00754 {
00755 boost::mutex::scoped_lock cbs_lock(callbacks_mutex_);
00756 for (V_CallbackInfo::iterator it = callbacks_.begin();
00757 it != callbacks_.end(); ++it)
00758 {
00759 if ((*it)->helper_ == helper)
00760 {
00761 const CallbackInfoPtr& info = *it;
00762 info->subscription_queue_->clear();
00763 info->callback_queue_->removeByID((uint64_t)info.get());
00764 callbacks_.erase(it);
00765
00766 if (!helper->isConst())
00767 {
00768 --nonconst_callbacks_;
00769 }
00770
00771 break;
00772 }
00773 }
00774 }
00775
00776 void Subscription::headerReceived(const PublisherLinkPtr& link, const Header& h)
00777 {
00778 boost::mutex::scoped_lock lock(md5sum_mutex_);
00779 if (md5sum_ == "*")
00780 {
00781 md5sum_ = link->getMD5Sum();
00782 }
00783 }
00784
00785 void Subscription::addPublisherLink(const PublisherLinkPtr& link)
00786 {
00787 publisher_links_.push_back(link);
00788 }
00789
00790 void Subscription::removePublisherLink(const PublisherLinkPtr& pub_link)
00791 {
00792 boost::mutex::scoped_lock lock(publisher_links_mutex_);
00793
00794 V_PublisherLink::iterator it = std::find(publisher_links_.begin(), publisher_links_.end(), pub_link);
00795 if (it != publisher_links_.end())
00796 {
00797 publisher_links_.erase(it);
00798 }
00799
00800 if (pub_link->isLatched())
00801 {
00802 latched_messages_.erase(pub_link);
00803 }
00804 }
00805
00806 void Subscription::getPublishTypes(bool& ser, bool& nocopy, const std::type_info& ti)
00807 {
00808 boost::mutex::scoped_lock lock(callbacks_mutex_);
00809 for (V_CallbackInfo::iterator cb = callbacks_.begin();
00810 cb != callbacks_.end(); ++cb)
00811 {
00812 const CallbackInfoPtr& info = *cb;
00813 if (info->helper_->getTypeInfo() == ti)
00814 {
00815 nocopy = true;
00816 }
00817 else
00818 {
00819 ser = true;
00820 }
00821
00822 if (nocopy && ser)
00823 {
00824 return;
00825 }
00826 }
00827 }
00828
00829 const std::string Subscription::datatype()
00830 {
00831 return datatype_;
00832 }
00833
00834 const std::string Subscription::md5sum()
00835 {
00836 boost::mutex::scoped_lock lock(md5sum_mutex_);
00837 return md5sum_;
00838 }
00839
00840 }