subscription.cpp
Go to the documentation of this file.
1 /*
2  * Software License Agreement (BSD License)
3  *
4  * Copyright (c) 2008, Willow Garage, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above
14  * copyright notice, this list of conditions and the following
15  * disclaimer in the documentation and/or other materials provided
16  * with the distribution.
17  * * Neither the name of Willow Garage, Inc. nor the names of its
18  * contributors may be used to endorse or promote products derived
19  * from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include <sstream>
36 #include <fcntl.h>
37 #include <cerrno>
38 #include <cstring>
39 #include <typeinfo>
40 
41 #include "ros/common.h"
42 #include "ros/io.h"
43 #include "ros/subscription.h"
44 #include "ros/publication.h"
48 #include "ros/connection.h"
52 #include "ros/this_node.h"
53 #include "ros/network.h"
54 #include "ros/poll_manager.h"
55 #include "ros/connection_manager.h"
57 #include "ros/subscription_queue.h"
58 #include "ros/file_log.h"
59 #include "ros/transport_hints.h"
61 
62 #include <boost/make_shared.hpp>
63 
65 
66 namespace ros
67 {
68 
69 Subscription::Subscription(const std::string &name, const std::string& md5sum, const std::string& datatype, const TransportHints& transport_hints)
70 : name_(name)
71 , md5sum_(md5sum)
72 , datatype_(datatype)
73 , nonconst_callbacks_(0)
74 , dropped_(false)
75 , shutting_down_(false)
76 , transport_hints_(transport_hints)
77 {
78 }
79 
81 {
82  pending_connections_.clear();
83  callbacks_.clear();
84 }
85 
87 {
88  {
89  boost::mutex::scoped_lock lock(shutdown_mutex_);
90  shutting_down_ = true;
91  }
92 
93  drop();
94 }
95 
97 {
98  XmlRpcValue stats;
99  stats[0] = name_;
100  XmlRpcValue conn_data;
101  conn_data.setSize(0);
102 
103  boost::mutex::scoped_lock lock(publisher_links_mutex_);
104 
105  uint32_t cidx = 0;
106  for (V_PublisherLink::iterator c = publisher_links_.begin();
107  c != publisher_links_.end(); ++c)
108  {
109  const PublisherLink::Stats& s = (*c)->getStats();
110  conn_data[cidx][0] = (*c)->getConnectionID();
111  conn_data[cidx][1] = (int)s.bytes_received_;
112  conn_data[cidx][2] = (int)s.messages_received_;
113  conn_data[cidx][3] = (int)s.drops_;
114  conn_data[cidx][4] = 0; // figure out something for this. not sure.
115  }
116 
117  stats[1] = conn_data;
118  return stats;
119 }
120 
121 // [(connection_id, publisher_xmlrpc_uri, direction, transport, topic_name, connected, connection_info_string)*]
122 // e.g. [(1, 'http://host:54893/', 'i', 'TCPROS', '/chatter', 1, 'TCPROS connection on port 59746 to [host:34318 on socket 11]')]
124 {
125  boost::mutex::scoped_lock lock(publisher_links_mutex_);
126 
127  for (V_PublisherLink::iterator c = publisher_links_.begin();
128  c != publisher_links_.end(); ++c)
129  {
130  XmlRpcValue curr_info;
131  curr_info[0] = (int)(*c)->getConnectionID();
132  curr_info[1] = (*c)->getPublisherXMLRPCURI();
133  curr_info[2] = "i";
134  curr_info[3] = (*c)->getTransportType();
135  curr_info[4] = name_;
136  curr_info[5] = true; // For length compatibility with rospy
137  curr_info[6] = (*c)->getTransportInfo();
138  info[info.size()] = curr_info;
139  }
140 }
141 
143 {
144  boost::mutex::scoped_lock lock(publisher_links_mutex_);
145  uint32_t num_connected_publishers = 0;
146  for (V_PublisherLink::iterator c = publisher_links_.begin();
147  c != publisher_links_.end(); ++c)
148  {
149  // Only count a connection with a received header.
150  // Discern this by a non-zero length callerid.
151  if ((*c)->getCallerID().size() > 0)
152  {
153  num_connected_publishers++;
154  }
155  }
156  return num_connected_publishers;
157 }
158 
160 {
161  if (!dropped_)
162  {
163  dropped_ = true;
164 
166  }
167 }
168 
170 {
171  // Swap our subscribers list with a local one so we can only lock for a short period of time, because a
172  // side effect of our calling drop() on connections can be re-locking the subscribers mutex
173  V_PublisherLink localsubscribers;
174 
175  {
176  boost::mutex::scoped_lock lock(publisher_links_mutex_);
177 
178  localsubscribers.swap(publisher_links_);
179  }
180 
181  V_PublisherLink::iterator it = localsubscribers.begin();
182  V_PublisherLink::iterator end = localsubscribers.end();
183  for (;it != end; ++it)
184  {
185  (*it)->drop();
186  }
187 }
188 
190 {
191  boost::mutex::scoped_lock lock(publisher_links_mutex_);
192  if (dropped_)
193  {
194  return;
195  }
196 
197  ROSCPP_LOG_DEBUG("Creating intraprocess link for topic [%s]", name_.c_str());
198 
199  IntraProcessPublisherLinkPtr pub_link(boost::make_shared<IntraProcessPublisherLink>(shared_from_this(), XMLRPCManager::instance()->getServerURI(), transport_hints_));
200  IntraProcessSubscriberLinkPtr sub_link(boost::make_shared<IntraProcessSubscriberLink>(pub));
201  pub_link->setPublisher(sub_link);
202  sub_link->setSubscriber(pub_link);
203 
204  addPublisherLink(pub_link);
205  pub->addSubscriberLink(sub_link);
206 }
207 
208 bool urisEqual(const std::string& uri1, const std::string& uri2)
209 {
210  std::string host1, host2;
211  uint32_t port1 = 0, port2 = 0;
212  network::splitURI(uri1, host1, port1);
213  network::splitURI(uri2, host2, port2);
214  return port1 == port2 && host1 == host2;
215 }
216 
217 bool Subscription::pubUpdate(const V_string& new_pubs)
218 {
219  boost::mutex::scoped_lock lock(shutdown_mutex_);
220 
221  if (shutting_down_ || dropped_)
222  {
223  return false;
224  }
225 
226  bool retval = true;
227 
228  {
229  std::stringstream ss;
230 
231  for (V_string::const_iterator up_i = new_pubs.begin();
232  up_i != new_pubs.end(); ++up_i)
233  {
234  ss << *up_i << ", ";
235  }
236 
237  ss << " already have these connections: ";
238  {
239  boost::mutex::scoped_lock lock(publisher_links_mutex_);
240  for (V_PublisherLink::iterator spc = publisher_links_.begin();
241  spc!= publisher_links_.end(); ++spc)
242  {
243  ss << (*spc)->getPublisherXMLRPCURI() << ", ";
244  }
245  }
246 
247  boost::mutex::scoped_lock lock(pending_connections_mutex_);
248  S_PendingConnection::iterator it = pending_connections_.begin();
249  S_PendingConnection::iterator end = pending_connections_.end();
250  for (; it != end; ++it)
251  {
252  ss << (*it)->getRemoteURI() << ", ";
253  }
254 
255  ROSCPP_LOG_DEBUG("Publisher update for [%s]: %s", name_.c_str(), ss.str().c_str());
256  }
257 
258  V_string additions;
259  V_PublisherLink subtractions;
260  V_PublisherLink to_add;
261  // could use the STL set operations... but these sets are so small
262  // it doesn't really matter.
263  {
264  boost::mutex::scoped_lock lock(publisher_links_mutex_);
265 
266  for (V_PublisherLink::iterator spc = publisher_links_.begin();
267  spc!= publisher_links_.end(); ++spc)
268  {
269  bool found = false;
270  for (V_string::const_iterator up_i = new_pubs.begin();
271  !found && up_i != new_pubs.end(); ++up_i)
272  {
273  if (urisEqual((*spc)->getPublisherXMLRPCURI(), *up_i))
274  {
275  found = true;
276  break;
277  }
278  }
279 
280  if (!found)
281  {
282  subtractions.push_back(*spc);
283  }
284  }
285 
286  for (V_string::const_iterator up_i = new_pubs.begin(); up_i != new_pubs.end(); ++up_i)
287  {
288  bool found = false;
289  for (V_PublisherLink::iterator spc = publisher_links_.begin();
290  !found && spc != publisher_links_.end(); ++spc)
291  {
292  if (urisEqual(*up_i, (*spc)->getPublisherXMLRPCURI()))
293  {
294  found = true;
295  break;
296  }
297  }
298 
299  if (!found)
300  {
301  boost::mutex::scoped_lock lock(pending_connections_mutex_);
302  S_PendingConnection::iterator it = pending_connections_.begin();
303  S_PendingConnection::iterator end = pending_connections_.end();
304  for (; it != end; ++it)
305  {
306  if (urisEqual(*up_i, (*it)->getRemoteURI()))
307  {
308  found = true;
309  break;
310  }
311  }
312  }
313 
314  if (!found)
315  {
316  additions.push_back(*up_i);
317  }
318  }
319  }
320 
321  for (V_PublisherLink::iterator i = subtractions.begin(); i != subtractions.end(); ++i)
322  {
323  const PublisherLinkPtr& link = *i;
324  if (link->getPublisherXMLRPCURI() != XMLRPCManager::instance()->getServerURI())
325  {
326  ROSCPP_LOG_DEBUG("Disconnecting from publisher [%s] of topic [%s] at [%s]",
327  link->getCallerID().c_str(), name_.c_str(), link->getPublisherXMLRPCURI().c_str());
328  link->drop();
329  }
330  else
331  {
332  ROSCPP_LOG_DEBUG("Disconnect: skipping myself for topic [%s]", name_.c_str());
333  }
334  }
335 
336  for (V_string::iterator i = additions.begin();
337  i != additions.end(); ++i)
338  {
339  // this function should never negotiate a self-subscription
340  if (XMLRPCManager::instance()->getServerURI() != *i)
341  {
342  retval &= negotiateConnection(*i);
343  }
344  else
345  {
346  ROSCPP_LOG_DEBUG("Skipping myself (%s, %s)", name_.c_str(), XMLRPCManager::instance()->getServerURI().c_str());
347  }
348  }
349 
350  return retval;
351 }
352 
353 bool Subscription::negotiateConnection(const std::string& xmlrpc_uri)
354 {
355  XmlRpcValue tcpros_array, protos_array, params;
356  XmlRpcValue udpros_array;
357  TransportUDPPtr udp_transport;
358  int protos = 0;
359  V_string transports = transport_hints_.getTransports();
360  if (transports.empty())
361  {
363  transports = transport_hints_.getTransports();
364  }
365  for (V_string::const_iterator it = transports.begin();
366  it != transports.end();
367  ++it)
368  {
369  if (*it == "UDP")
370  {
371  int max_datagram_size = transport_hints_.getMaxDatagramSize();
372  udp_transport = boost::make_shared<TransportUDP>(&PollManager::instance()->getPollSet());
373  if (!max_datagram_size)
374  max_datagram_size = udp_transport->getMaxDatagramSize();
375  udp_transport->createIncoming(0, false);
376  udpros_array[0] = "UDPROS";
377  M_string m;
378  m["topic"] = getName();
379  m["md5sum"] = md5sum();
380  m["callerid"] = this_node::getName();
381  m["type"] = datatype();
383  uint32_t len;
384  Header::write(m, buffer, len);
385  XmlRpcValue v(buffer.get(), len);
386  udpros_array[1] = v;
387  udpros_array[2] = network::getHost();
388  udpros_array[3] = udp_transport->getServerPort();
389  udpros_array[4] = max_datagram_size;
390 
391  protos_array[protos++] = udpros_array;
392  }
393  else if (*it == "TCP")
394  {
395  tcpros_array[0] = std::string("TCPROS");
396  protos_array[protos++] = tcpros_array;
397  }
398  else
399  {
400  ROS_WARN("Unsupported transport type hinted: %s, skipping", it->c_str());
401  }
402  }
403  params[0] = this_node::getName();
404  params[1] = name_;
405  params[2] = protos_array;
406  std::string peer_host;
407  uint32_t peer_port;
408  if (!network::splitURI(xmlrpc_uri, peer_host, peer_port))
409  {
410  ROS_ERROR("Bad xml-rpc URI: [%s]", xmlrpc_uri.c_str());
411  return false;
412  }
413 
414  XmlRpc::XmlRpcClient* c = new XmlRpc::XmlRpcClient(peer_host.c_str(),
415  peer_port, "/");
416  // if (!c.execute("requestTopic", params, result) || !g_node->validateXmlrpcResponse("requestTopic", result, proto))
417 
418  // Initiate the negotiation. We'll come back and check on it later.
419  if (!c->executeNonBlock("requestTopic", params))
420  {
421  ROSCPP_LOG_DEBUG("Failed to contact publisher [%s:%d] for topic [%s]",
422  peer_host.c_str(), peer_port, name_.c_str());
423  delete c;
424  if (udp_transport)
425  {
426  udp_transport->close();
427  }
428 
429  return false;
430  }
431 
432  ROSCPP_LOG_DEBUG("Began asynchronous xmlrpc connection to [%s:%d]", peer_host.c_str(), peer_port);
433 
434  // The PendingConnectionPtr takes ownership of c, and will delete it on
435  // destruction.
436  PendingConnectionPtr conn(boost::make_shared<PendingConnection>(c, udp_transport, shared_from_this(), xmlrpc_uri));
437 
438  XMLRPCManager::instance()->addASyncConnection(conn);
439  // Put this connection on the list that we'll look at later.
440  {
441  boost::mutex::scoped_lock pending_connections_lock(pending_connections_mutex_);
442  pending_connections_.insert(conn);
443  }
444 
445  return true;
446 }
447 
448 void closeTransport(const TransportUDPPtr& trans)
449 {
450  if (trans)
451  {
452  trans->close();
453  }
454 }
455 
457 {
458  boost::mutex::scoped_lock lock(shutdown_mutex_);
459  if (shutting_down_ || dropped_)
460  {
461  return;
462  }
463 
464  {
465  boost::mutex::scoped_lock pending_connections_lock(pending_connections_mutex_);
466  pending_connections_.erase(conn);
467  }
468 
469  TransportUDPPtr udp_transport;
470 
471  std::string peer_host = conn->getClient()->getHost();
472  uint32_t peer_port = conn->getClient()->getPort();
473  std::stringstream ss;
474  ss << "http://" << peer_host << ":" << peer_port << "/";
475  std::string xmlrpc_uri = ss.str();
476  udp_transport = conn->getUDPTransport();
477 
478  XmlRpc::XmlRpcValue proto;
479  if(!XMLRPCManager::instance()->validateXmlrpcResponse("requestTopic", result, proto))
480  {
481  ROSCPP_LOG_DEBUG("Failed to contact publisher [%s:%d] for topic [%s]",
482  peer_host.c_str(), peer_port, name_.c_str());
483  closeTransport(udp_transport);
484  return;
485  }
486 
487  if (proto.size() == 0)
488  {
489  ROSCPP_LOG_DEBUG("Couldn't agree on any common protocols with [%s] for topic [%s]", xmlrpc_uri.c_str(), name_.c_str());
490  closeTransport(udp_transport);
491  return;
492  }
493 
494  if (proto.getType() != XmlRpcValue::TypeArray)
495  {
496  ROSCPP_LOG_DEBUG("Available protocol info returned from %s is not a list.", xmlrpc_uri.c_str());
497  closeTransport(udp_transport);
498  return;
499  }
500  if (proto[0].getType() != XmlRpcValue::TypeString)
501  {
502  ROSCPP_LOG_DEBUG("Available protocol info list doesn't have a string as its first element.");
503  closeTransport(udp_transport);
504  return;
505  }
506 
507  std::string proto_name = proto[0];
508  if (proto_name == "TCPROS")
509  {
510  if (proto.size() != 3 ||
511  proto[1].getType() != XmlRpcValue::TypeString ||
512  proto[2].getType() != XmlRpcValue::TypeInt)
513  {
514  ROSCPP_LOG_DEBUG("publisher implements TCPROS, but the " \
515  "parameters aren't string,int");
516  return;
517  }
518  std::string pub_host = proto[1];
519  int pub_port = proto[2];
520  ROSCPP_CONN_LOG_DEBUG("Connecting via tcpros to topic [%s] at host [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
521 
522  TransportTCPPtr transport(boost::make_shared<TransportTCP>(&PollManager::instance()->getPollSet()));
523  if (transport->connect(pub_host, pub_port))
524  {
525  ConnectionPtr connection(boost::make_shared<Connection>());
526  TransportPublisherLinkPtr pub_link(boost::make_shared<TransportPublisherLink>(shared_from_this(), xmlrpc_uri, transport_hints_));
527 
528  connection->initialize(transport, false, HeaderReceivedFunc());
529  pub_link->initialize(connection);
530 
531  ConnectionManager::instance()->addConnection(connection);
532 
533  boost::mutex::scoped_lock lock(publisher_links_mutex_);
534  addPublisherLink(pub_link);
535 
536  ROSCPP_CONN_LOG_DEBUG("Connected to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
537  }
538  else
539  {
540  ROSCPP_CONN_LOG_DEBUG("Failed to connect to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
541  }
542  }
543  else if (proto_name == "UDPROS")
544  {
545  if (proto.size() != 6 ||
546  proto[1].getType() != XmlRpcValue::TypeString ||
547  proto[2].getType() != XmlRpcValue::TypeInt ||
548  proto[3].getType() != XmlRpcValue::TypeInt ||
549  proto[4].getType() != XmlRpcValue::TypeInt ||
550  proto[5].getType() != XmlRpcValue::TypeBase64)
551  {
552  ROSCPP_LOG_DEBUG("publisher implements UDPROS, but the " \
553  "parameters aren't string,int,int,int,base64");
554  closeTransport(udp_transport);
555  return;
556  }
557  std::string pub_host = proto[1];
558  int pub_port = proto[2];
559  int conn_id = proto[3];
560  int max_datagram_size = proto[4];
561  std::vector<char> header_bytes = proto[5];
562  boost::shared_array<uint8_t> buffer(new uint8_t[header_bytes.size()]);
563  memcpy(buffer.get(), &header_bytes[0], header_bytes.size());
564  Header h;
565  std::string err;
566  if (!h.parse(buffer, header_bytes.size(), err))
567  {
568  ROSCPP_LOG_DEBUG("Unable to parse UDPROS connection header: %s", err.c_str());
569  closeTransport(udp_transport);
570  return;
571  }
572  ROSCPP_LOG_DEBUG("Connecting via udpros to topic [%s] at host [%s:%d] connection id [%08x] max_datagram_size [%d]", name_.c_str(), pub_host.c_str(), pub_port, conn_id, max_datagram_size);
573 
574  std::string error_msg;
575  if (h.getValue("error", error_msg))
576  {
577  ROSCPP_LOG_DEBUG("Received error message in header for connection to [%s]: [%s]", xmlrpc_uri.c_str(), error_msg.c_str());
578  closeTransport(udp_transport);
579  return;
580  }
581 
582  TransportPublisherLinkPtr pub_link(boost::make_shared<TransportPublisherLink>(shared_from_this(), xmlrpc_uri, transport_hints_));
583  if (pub_link->setHeader(h))
584  {
585  ConnectionPtr connection(boost::make_shared<Connection>());
586  connection->initialize(udp_transport, false, NULL);
587  connection->setHeader(h);
588  pub_link->initialize(connection);
589 
590  ConnectionManager::instance()->addConnection(connection);
591 
592  boost::mutex::scoped_lock lock(publisher_links_mutex_);
593  addPublisherLink(pub_link);
594 
595  ROSCPP_LOG_DEBUG("Connected to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
596  }
597  else
598  {
599  ROSCPP_LOG_DEBUG("Failed to connect to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
600  closeTransport(udp_transport);
601  return;
602  }
603  }
604  else
605  {
606  ROSCPP_LOG_DEBUG("Publisher offered unsupported transport [%s]", proto_name.c_str());
607  }
608 }
609 
610 uint32_t Subscription::handleMessage(const SerializedMessage& m, bool ser, bool nocopy, const boost::shared_ptr<M_string>& connection_header, const PublisherLinkPtr& link)
611 {
612  boost::mutex::scoped_lock lock(callbacks_mutex_);
613 
614  uint32_t drops = 0;
615 
616  // Cache the deserializers by type info. If all the subscriptions are the same type this has the same performance as before. If
617  // there are subscriptions with different C++ type (but same ROS message type), this now works correctly rather than passing
618  // garbage to the messages with different C++ types than the first one.
619  cached_deserializers_.clear();
620 
621  ros::Time receipt_time = ros::Time::now();
622 
623  for (V_CallbackInfo::iterator cb = callbacks_.begin();
624  cb != callbacks_.end(); ++cb)
625  {
626  const CallbackInfoPtr& info = *cb;
627 
628  ROS_ASSERT(info->callback_queue_);
629 
630  const std::type_info* ti = &info->helper_->getTypeInfo();
631 
632  if ((nocopy && m.type_info && *ti == *m.type_info) || (ser && (!m.type_info || *ti != *m.type_info)))
633  {
634  MessageDeserializerPtr deserializer;
635 
636  V_TypeAndDeserializer::iterator des_it = cached_deserializers_.begin();
637  V_TypeAndDeserializer::iterator des_end = cached_deserializers_.end();
638  for (; des_it != des_end; ++des_it)
639  {
640  if (*des_it->first == *ti)
641  {
642  deserializer = des_it->second;
643  break;
644  }
645  }
646 
647  if (!deserializer)
648  {
649  deserializer = boost::make_shared<MessageDeserializer>(info->helper_, m, connection_header);
650  cached_deserializers_.push_back(std::make_pair(ti, deserializer));
651  }
652 
653  bool was_full = false;
654  bool nonconst_need_copy = false;
655  if (callbacks_.size() > 1)
656  {
657  nonconst_need_copy = true;
658  }
659 
660  info->subscription_queue_->push(info->helper_, deserializer, info->has_tracked_object_, info->tracked_object_, nonconst_need_copy, receipt_time, &was_full);
661 
662  if (was_full)
663  {
664  ++drops;
665  }
666  else
667  {
668  info->callback_queue_->addCallback(info->subscription_queue_, (uint64_t)info.get());
669  }
670  }
671  }
672 
673  // measure statistics
674  statistics_.callback(connection_header, name_, link->getCallerID(), m, link->getStats().bytes_received_, receipt_time, drops > 0, link->getConnectionID());
675 
676  // If this link is latched, store off the message so we can immediately pass it to new subscribers later
677  if (link->isLatched())
678  {
679  LatchInfo li;
680  li.connection_header = connection_header;
681  li.link = link;
682  li.message = m;
683  li.receipt_time = receipt_time;
684  latched_messages_[link] = li;
685  }
686 
687  cached_deserializers_.clear();
688 
689  return drops;
690 }
691 
692 bool Subscription::addCallback(const SubscriptionCallbackHelperPtr& helper, const std::string& md5sum, CallbackQueueInterface* queue, int32_t queue_size, const VoidConstPtr& tracked_object, bool allow_concurrent_callbacks)
693 {
694  ROS_ASSERT(helper);
695  ROS_ASSERT(queue);
696 
697  statistics_.init(helper);
698 
699  // Decay to a real type as soon as we have a subscriber with a real type
700  {
701  boost::mutex::scoped_lock lock(md5sum_mutex_);
702  if (md5sum_ == "*" && md5sum != "*")
703  {
704 
705  md5sum_ = md5sum;
706  }
707  }
708 
709  if (md5sum != "*" && md5sum != this->md5sum())
710  {
711  return false;
712  }
713 
714  {
715  boost::mutex::scoped_lock lock(callbacks_mutex_);
716 
717  CallbackInfoPtr info(boost::make_shared<CallbackInfo>());
718  info->helper_ = helper;
719  info->callback_queue_ = queue;
720  info->subscription_queue_ = boost::make_shared<SubscriptionQueue>(name_, queue_size, allow_concurrent_callbacks);
721  info->tracked_object_ = tracked_object;
722  info->has_tracked_object_ = false;
723  if (tracked_object)
724  {
725  info->has_tracked_object_ = true;
726  }
727 
728  if (!helper->isConst())
729  {
731  }
732 
733  callbacks_.push_back(info);
734  cached_deserializers_.reserve(callbacks_.size());
735 
736  // if we have any latched links, we need to immediately schedule callbacks
737  if (!latched_messages_.empty())
738  {
739  boost::mutex::scoped_lock lock(publisher_links_mutex_);
740 
741  V_PublisherLink::iterator it = publisher_links_.begin();
742  V_PublisherLink::iterator end = publisher_links_.end();
743  for (; it != end;++it)
744  {
745  const PublisherLinkPtr& link = *it;
746  if (link->isLatched())
747  {
748  M_PublisherLinkToLatchInfo::iterator des_it = latched_messages_.find(link);
749  if (des_it != latched_messages_.end())
750  {
751  const LatchInfo& latch_info = des_it->second;
752 
753  MessageDeserializerPtr des(boost::make_shared<MessageDeserializer>(helper, latch_info.message, latch_info.connection_header));
754  bool was_full = false;
755  info->subscription_queue_->push(info->helper_, des, info->has_tracked_object_, info->tracked_object_, true, latch_info.receipt_time, &was_full);
756  if (!was_full)
757  {
758  info->callback_queue_->addCallback(info->subscription_queue_, (uint64_t)info.get());
759  }
760  }
761  }
762  }
763  }
764  }
765 
766  return true;
767 }
768 
770 {
771  CallbackInfoPtr info;
772  {
773  boost::mutex::scoped_lock cbs_lock(callbacks_mutex_);
774  for (V_CallbackInfo::iterator it = callbacks_.begin();
775  it != callbacks_.end(); ++it)
776  {
777  if ((*it)->helper_ == helper)
778  {
779  info = *it;
780  callbacks_.erase(it);
781 
782  if (!helper->isConst())
783  {
785  }
786 
787  break;
788  }
789  }
790  }
791 
792  if (info)
793  {
794  info->subscription_queue_->clear();
795  info->callback_queue_->removeByID((uint64_t)info.get());
796  }
797 }
798 
800 {
801  (void)h;
802  boost::mutex::scoped_lock lock(md5sum_mutex_);
803  if (md5sum_ == "*")
804  {
805  md5sum_ = link->getMD5Sum();
806  }
807 }
808 
810 {
811  publisher_links_.push_back(link);
812 }
813 
815 {
816  boost::mutex::scoped_lock lock(publisher_links_mutex_);
817 
818  V_PublisherLink::iterator it = std::find(publisher_links_.begin(), publisher_links_.end(), pub_link);
819  if (it != publisher_links_.end())
820  {
821  publisher_links_.erase(it);
822  }
823 
824  if (pub_link->isLatched())
825  {
826  latched_messages_.erase(pub_link);
827  }
828 }
829 
830 void Subscription::getPublishTypes(bool& ser, bool& nocopy, const std::type_info& ti)
831 {
832  boost::mutex::scoped_lock lock(callbacks_mutex_);
833  for (V_CallbackInfo::iterator cb = callbacks_.begin();
834  cb != callbacks_.end(); ++cb)
835  {
836  const CallbackInfoPtr& info = *cb;
837  if (info->helper_->getTypeInfo() == ti)
838  {
839  nocopy = true;
840  }
841  else
842  {
843  ser = true;
844  }
845 
846  if (nocopy && ser)
847  {
848  return;
849  }
850  }
851 }
852 
853 const std::string Subscription::datatype()
854 {
855  return datatype_;
856 }
857 
858 const std::string Subscription::md5sum()
859 {
860  boost::mutex::scoped_lock lock(md5sum_mutex_);
861  return md5sum_;
862 }
863 
864 }
XmlRpc::XmlRpcValue::size
int size() const
connection_manager.h
ros::Subscription::addCallback
bool addCallback(const SubscriptionCallbackHelperPtr &helper, const std::string &md5sum, CallbackQueueInterface *queue, int32_t queue_size, const VoidConstPtr &tracked_object, bool allow_concurrent_callbacks)
Definition: subscription.cpp:692
ros::Subscription::shutdown
void shutdown()
Terminate all our PublisherLinks and join our callback thread if it exists.
Definition: subscription.cpp:86
message_deserializer.h
this_node.h
ros::SerializedMessage
ros::StatisticsLogger::init
void init(const SubscriptionCallbackHelperPtr &helper)
Definition: statistics.cpp:44
callback_queue_interface.h
transport_hints.h
md5sum
const char * md5sum()
ROSCPP_CONN_LOG_DEBUG
#define ROSCPP_CONN_LOG_DEBUG(...)
Definition: file_log.h:36
ros::Subscription::LatchInfo
Definition: subscription.h:231
boost::shared_ptr
ros::Subscription::negotiateConnection
bool negotiateConnection(const std::string &xmlrpc_uri)
Negotiates a connection with a publisher.
Definition: subscription.cpp:353
ros::Subscription::pendingConnectionDone
void pendingConnectionDone(const PendingConnectionPtr &pending_conn, XmlRpc::XmlRpcValue &result)
Definition: subscription.cpp:456
ros::Subscription::shutting_down_
bool shutting_down_
Definition: subscription.h:216
ros::Subscription::md5sum_
std::string md5sum_
Definition: subscription.h:209
ros::Subscription::getPublishTypes
void getPublishTypes(bool &ser, bool &nocopy, const std::type_info &ti)
Definition: subscription.cpp:830
s
XmlRpcServer s
ros
XmlRpc::XmlRpcClient::executeNonBlock
bool executeNonBlock(const char *method, XmlRpcValue const &params)
subscription_queue.h
ros::Header
ros::Subscription::LatchInfo::message
SerializedMessage message
Definition: subscription.h:233
ros::Subscription::md5sum_mutex_
boost::mutex md5sum_mutex_
Definition: subscription.h:208
ros::Subscription::pubUpdate
bool pubUpdate(const std::vector< std::string > &pubs)
Handle a publisher update list received from the master. Creates/drops PublisherLinks based on the li...
Definition: subscription.cpp:217
ros::Subscription::LatchInfo::link
PublisherLinkPtr link
Definition: subscription.h:234
ros::Subscription::handleMessage
uint32_t handleMessage(const SerializedMessage &m, bool ser, bool nocopy, const boost::shared_ptr< M_string > &connection_header, const PublisherLinkPtr &link)
Called to notify that a new message has arrived from a publisher. Schedules the callback for invokati...
Definition: subscription.cpp:610
ros::Subscription::dropped_
bool dropped_
Definition: subscription.h:215
ros::TransportHints
Provides a way of specifying network transport hints to ros::NodeHandle::subscribe() and someday ros:...
Definition: transport_hints.h:54
ros::TransportHints::getTransports
const V_string & getTransports()
Returns a vector of transports, ordered by preference.
Definition: transport_hints.h:156
ros::Subscription::getInfo
void getInfo(XmlRpc::XmlRpcValue &info)
Definition: subscription.cpp:123
XmlRpc::XmlRpcClient
subscription_callback_helper.h
subscription.h
ros::Header::write
static void write(const M_string &key_vals, boost::shared_array< uint8_t > &buffer, uint32_t &size)
ros::network::getHost
const ROSCPP_DECL std::string & getHost()
Definition: network.cpp:51
ros::Subscription::cached_deserializers_
V_TypeAndDeserializer cached_deserializers_
Definition: subscription.h:243
ros::TransportHints::reliable
TransportHints & reliable()
Specifies a reliable transport. Currently this means TCP.
Definition: transport_hints.h:60
ros::StatisticsLogger::callback
void callback(const boost::shared_ptr< M_string > &connection_header, const std::string &topic, const std::string &callerid, const SerializedMessage &m, const uint64_t &bytes_sent, const ros::Time &received_time, bool dropped, int connection_id)
Definition: statistics.cpp:53
ros::urisEqual
bool urisEqual(const std::string &uri1, const std::string &uri2)
Definition: subscription.cpp:208
ros::Subscription::Subscription
Subscription(const std::string &name, const std::string &md5sum, const std::string &datatype, const TransportHints &transport_hints)
Definition: subscription.cpp:69
ros::Subscription::V_PublisherLink
std::vector< PublisherLinkPtr > V_PublisherLink
Definition: subscription.h:223
ros::Subscription::md5sum
const std::string md5sum()
Definition: subscription.cpp:858
ros::Subscription::addLocalConnection
void addLocalConnection(const PublicationPtr &pub)
Definition: subscription.cpp:189
ros::Subscription::shutdown_mutex_
boost::mutex shutdown_mutex_
Definition: subscription.h:217
ros::Subscription::getName
const std::string & getName() const
Definition: subscription.h:118
ros::Subscription::getNumPublishers
uint32_t getNumPublishers()
Definition: subscription.cpp:142
transport_udp.h
ros::Subscription::nonconst_callbacks_
uint32_t nonconst_callbacks_
Definition: subscription.h:213
ros::closeTransport
void closeTransport(const TransportUDPPtr &trans)
Definition: subscription.cpp:448
ros::Subscription::~Subscription
virtual ~Subscription()
Definition: subscription.cpp:80
transport_tcp.h
ros::Header::parse
bool parse(const boost::shared_array< uint8_t > &buffer, uint32_t size, std::string &error_msg)
boost::shared_array< uint8_t >
ros::Subscription::callbacks_
V_CallbackInfo callbacks_
Definition: subscription.h:212
ros::TransportHints::getMaxDatagramSize
int getMaxDatagramSize()
Returns the maximum datagram size specified on this TransportHints, or 0 if no size was specified.
Definition: transport_hints.h:123
ROS_WARN
#define ROS_WARN(...)
ros::Subscription::name_
std::string name_
Definition: subscription.h:207
ros::XMLRPCManager::instance
static const XMLRPCManagerPtr & instance()
Definition: xmlrpc_manager.cpp:98
ros::PollManager::instance
static const PollManagerPtr & instance()
Definition: poll_manager.cpp:36
ros::Subscription::drop
void drop()
Terminate all our PublisherLinks.
Definition: subscription.cpp:159
ros::Subscription::statistics_
StatisticsLogger statistics_
Definition: subscription.h:229
XmlRpc::XmlRpcValue::getType
const Type & getType() const
ros::SerializedMessage::type_info
const std::type_info * type_info
ros::Subscription::datatype
const std::string datatype()
Definition: subscription.cpp:853
ros::Subscription::LatchInfo::connection_header
boost::shared_ptr< std::map< std::string, std::string > > connection_header
Definition: subscription.h:235
ros::Subscription::publisher_links_mutex_
boost::mutex publisher_links_mutex_
Definition: subscription.h:225
ros::this_node::getName
const ROSCPP_DECL std::string & getName()
Returns the name of the current node.
Definition: this_node.cpp:74
connection.h
ros::Subscription::LatchInfo::receipt_time
ros::Time receipt_time
Definition: subscription.h:236
ros::Subscription::M_string
std::map< std::string, std::string > M_string
Definition: subscription.h:102
ros::Subscription::callbacks_mutex_
boost::mutex callbacks_mutex_
Definition: subscription.h:211
ros::Subscription::getStats
XmlRpc::XmlRpcValue getStats()
Definition: subscription.cpp:96
ros::Subscription::latched_messages_
M_PublisherLinkToLatchInfo latched_messages_
Definition: subscription.h:240
ros::Subscription::pending_connections_
S_PendingConnection pending_connections_
Definition: subscription.h:220
ros::Time
poll_manager.h
ros::Header::getValue
bool getValue(const std::string &key, std::string &value) const
publication.h
ros::Subscription::dropAllConnections
void dropAllConnections()
Definition: subscription.cpp:169
io.h
ros::HeaderReceivedFunc
boost::function< bool(const ConnectionPtr &, const Header &)> HeaderReceivedFunc
Definition: connection.h:62
ROS_ERROR
#define ROS_ERROR(...)
network.h
ros::ConnectionManager::instance
static const ConnectionManagerPtr & instance()
Definition: connection_manager.cpp:43
XmlRpc::XmlRpcValue::setSize
void setSize(int size)
datatype
const char * datatype()
ros::Subscription::removeCallback
void removeCallback(const SubscriptionCallbackHelperPtr &helper)
Definition: subscription.cpp:769
ros::Subscription::headerReceived
void headerReceived(const PublisherLinkPtr &link, const Header &h)
Definition: subscription.cpp:799
ros::V_string
std::vector< std::string > V_string
ros::Subscription::removePublisherLink
void removePublisherLink(const PublisherLinkPtr &pub_link)
Removes a subscriber from our list.
Definition: subscription.cpp:814
ros::Subscription::datatype_
std::string datatype_
Definition: subscription.h:210
ROS_ASSERT
#define ROS_ASSERT(cond)
ros::Subscription::transport_hints_
TransportHints transport_hints_
Definition: subscription.h:227
ros::network::splitURI
ROSCPP_DECL bool splitURI(const std::string &uri, std::string &host, uint32_t &port)
Definition: network.cpp:56
ROSCPP_LOG_DEBUG
#define ROSCPP_LOG_DEBUG(...)
Definition: file_log.h:35
ros::Subscription::publisher_links_
V_PublisherLink publisher_links_
Definition: subscription.h:224
ros::Subscription::pending_connections_mutex_
boost::mutex pending_connections_mutex_
Definition: subscription.h:221
XmlRpc::XmlRpcValue
ros::CallbackQueueInterface
Abstract interface for a queue used to handle all callbacks within roscpp.
Definition: callback_queue_interface.h:82
ros::Subscription::addPublisherLink
void addPublisherLink(const PublisherLinkPtr &link)
Definition: subscription.cpp:809
file_log.h
ros::Time::now
static Time now()


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas , Jacob Perron
autogenerated on Sat Sep 14 2024 02:59:36