subscription.cpp
Go to the documentation of this file.
1 /*
2  * Software License Agreement (BSD License)
3  *
4  * Copyright (c) 2008, Willow Garage, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above
14  * copyright notice, this list of conditions and the following
15  * disclaimer in the documentation and/or other materials provided
16  * with the distribution.
17  * * Neither the name of Willow Garage, Inc. nor the names of its
18  * contributors may be used to endorse or promote products derived
19  * from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include <sstream>
36 #include <fcntl.h>
37 #include <cerrno>
38 #include <cstring>
39 #include <typeinfo>
40 
41 #include "ros/common.h"
42 #include "ros/io.h"
43 #include "ros/subscription.h"
44 #include "ros/publication.h"
48 #include "ros/connection.h"
52 #include "ros/this_node.h"
53 #include "ros/network.h"
54 #include "ros/poll_manager.h"
55 #include "ros/connection_manager.h"
57 #include "ros/subscription_queue.h"
58 #include "ros/file_log.h"
59 #include "ros/transport_hints.h"
61 
62 #include <boost/make_shared.hpp>
63 
65 
66 namespace ros
67 {
68 
69 Subscription::Subscription(const std::string &name, const std::string& md5sum, const std::string& datatype, const TransportHints& transport_hints)
70 : name_(name)
71 , md5sum_(md5sum)
72 , datatype_(datatype)
73 , nonconst_callbacks_(0)
74 , dropped_(false)
75 , shutting_down_(false)
76 , transport_hints_(transport_hints)
77 {
78 }
79 
81 {
82  pending_connections_.clear();
83  callbacks_.clear();
84 }
85 
87 {
88  {
89  boost::mutex::scoped_lock lock(shutdown_mutex_);
90  shutting_down_ = true;
91  }
92 
93  drop();
94 }
95 
97 {
98  XmlRpcValue stats;
99  stats[0] = name_;
100  XmlRpcValue conn_data;
101  conn_data.setSize(0);
102 
103  boost::mutex::scoped_lock lock(publisher_links_mutex_);
104 
105  uint32_t cidx = 0;
106  for (V_PublisherLink::iterator c = publisher_links_.begin();
107  c != publisher_links_.end(); ++c)
108  {
109  const PublisherLink::Stats& s = (*c)->getStats();
110  conn_data[cidx][0] = (*c)->getConnectionID();
111  conn_data[cidx][1] = (int)s.bytes_received_;
112  conn_data[cidx][2] = (int)s.messages_received_;
113  conn_data[cidx][3] = (int)s.drops_;
114  conn_data[cidx][4] = 0; // figure out something for this. not sure.
115  }
116 
117  stats[1] = conn_data;
118  return stats;
119 }
120 
121 // [(connection_id, publisher_xmlrpc_uri, direction, transport, topic_name, connected, connection_info_string)*]
122 // e.g. [(1, 'http://host:54893/', 'i', 'TCPROS', '/chatter', 1, 'TCPROS connection on port 59746 to [host:34318 on socket 11]')]
124 {
125  boost::mutex::scoped_lock lock(publisher_links_mutex_);
126 
127  for (V_PublisherLink::iterator c = publisher_links_.begin();
128  c != publisher_links_.end(); ++c)
129  {
130  XmlRpcValue curr_info;
131  curr_info[0] = (int)(*c)->getConnectionID();
132  curr_info[1] = (*c)->getPublisherXMLRPCURI();
133  curr_info[2] = "i";
134  curr_info[3] = (*c)->getTransportType();
135  curr_info[4] = name_;
136  curr_info[5] = true; // For length compatibility with rospy
137  curr_info[6] = (*c)->getTransportInfo();
138  info[info.size()] = curr_info;
139  }
140 }
141 
143 {
144  boost::mutex::scoped_lock lock(publisher_links_mutex_);
145  return (uint32_t)publisher_links_.size();
146 }
147 
149 {
150  if (!dropped_)
151  {
152  dropped_ = true;
153 
155  }
156 }
157 
159 {
160  // Swap our subscribers list with a local one so we can only lock for a short period of time, because a
161  // side effect of our calling drop() on connections can be re-locking the subscribers mutex
162  V_PublisherLink localsubscribers;
163 
164  {
165  boost::mutex::scoped_lock lock(publisher_links_mutex_);
166 
167  localsubscribers.swap(publisher_links_);
168  }
169 
170  V_PublisherLink::iterator it = localsubscribers.begin();
171  V_PublisherLink::iterator end = localsubscribers.end();
172  for (;it != end; ++it)
173  {
174  (*it)->drop();
175  }
176 }
177 
179 {
180  boost::mutex::scoped_lock lock(publisher_links_mutex_);
181  if (dropped_)
182  {
183  return;
184  }
185 
186  ROSCPP_LOG_DEBUG("Creating intraprocess link for topic [%s]", name_.c_str());
187 
188  IntraProcessPublisherLinkPtr pub_link(boost::make_shared<IntraProcessPublisherLink>(shared_from_this(), XMLRPCManager::instance()->getServerURI(), transport_hints_));
189  IntraProcessSubscriberLinkPtr sub_link(boost::make_shared<IntraProcessSubscriberLink>(pub));
190  pub_link->setPublisher(sub_link);
191  sub_link->setSubscriber(pub_link);
192 
193  addPublisherLink(pub_link);
194  pub->addSubscriberLink(sub_link);
195 }
196 
197 bool urisEqual(const std::string& uri1, const std::string& uri2)
198 {
199  std::string host1, host2;
200  uint32_t port1 = 0, port2 = 0;
201  network::splitURI(uri1, host1, port1);
202  network::splitURI(uri2, host2, port2);
203  return port1 == port2 && host1 == host2;
204 }
205 
206 bool Subscription::pubUpdate(const V_string& new_pubs)
207 {
208  boost::mutex::scoped_lock lock(shutdown_mutex_);
209 
210  if (shutting_down_ || dropped_)
211  {
212  return false;
213  }
214 
215  bool retval = true;
216 
217  {
218  std::stringstream ss;
219 
220  for (V_string::const_iterator up_i = new_pubs.begin();
221  up_i != new_pubs.end(); ++up_i)
222  {
223  ss << *up_i << ", ";
224  }
225 
226  ss << " already have these connections: ";
227  {
228  boost::mutex::scoped_lock lock(publisher_links_mutex_);
229  for (V_PublisherLink::iterator spc = publisher_links_.begin();
230  spc!= publisher_links_.end(); ++spc)
231  {
232  ss << (*spc)->getPublisherXMLRPCURI() << ", ";
233  }
234  }
235 
236  boost::mutex::scoped_lock lock(pending_connections_mutex_);
237  S_PendingConnection::iterator it = pending_connections_.begin();
238  S_PendingConnection::iterator end = pending_connections_.end();
239  for (; it != end; ++it)
240  {
241  ss << (*it)->getRemoteURI() << ", ";
242  }
243 
244  ROSCPP_LOG_DEBUG("Publisher update for [%s]: %s", name_.c_str(), ss.str().c_str());
245  }
246 
247  V_string additions;
248  V_PublisherLink subtractions;
249  V_PublisherLink to_add;
250  // could use the STL set operations... but these sets are so small
251  // it doesn't really matter.
252  {
253  boost::mutex::scoped_lock lock(publisher_links_mutex_);
254 
255  for (V_PublisherLink::iterator spc = publisher_links_.begin();
256  spc!= publisher_links_.end(); ++spc)
257  {
258  bool found = false;
259  for (V_string::const_iterator up_i = new_pubs.begin();
260  !found && up_i != new_pubs.end(); ++up_i)
261  {
262  if (urisEqual((*spc)->getPublisherXMLRPCURI(), *up_i))
263  {
264  found = true;
265  break;
266  }
267  }
268 
269  if (!found)
270  {
271  subtractions.push_back(*spc);
272  }
273  }
274 
275  for (V_string::const_iterator up_i = new_pubs.begin(); up_i != new_pubs.end(); ++up_i)
276  {
277  bool found = false;
278  for (V_PublisherLink::iterator spc = publisher_links_.begin();
279  !found && spc != publisher_links_.end(); ++spc)
280  {
281  if (urisEqual(*up_i, (*spc)->getPublisherXMLRPCURI()))
282  {
283  found = true;
284  break;
285  }
286  }
287 
288  if (!found)
289  {
290  boost::mutex::scoped_lock lock(pending_connections_mutex_);
291  S_PendingConnection::iterator it = pending_connections_.begin();
292  S_PendingConnection::iterator end = pending_connections_.end();
293  for (; it != end; ++it)
294  {
295  if (urisEqual(*up_i, (*it)->getRemoteURI()))
296  {
297  found = true;
298  break;
299  }
300  }
301  }
302 
303  if (!found)
304  {
305  additions.push_back(*up_i);
306  }
307  }
308  }
309 
310  for (V_PublisherLink::iterator i = subtractions.begin(); i != subtractions.end(); ++i)
311  {
312  const PublisherLinkPtr& link = *i;
313  if (link->getPublisherXMLRPCURI() != XMLRPCManager::instance()->getServerURI())
314  {
315  ROSCPP_LOG_DEBUG("Disconnecting from publisher [%s] of topic [%s] at [%s]",
316  link->getCallerID().c_str(), name_.c_str(), link->getPublisherXMLRPCURI().c_str());
317  link->drop();
318  }
319  else
320  {
321  ROSCPP_LOG_DEBUG("Disconnect: skipping myself for topic [%s]", name_.c_str());
322  }
323  }
324 
325  for (V_string::iterator i = additions.begin();
326  i != additions.end(); ++i)
327  {
328  // this function should never negotiate a self-subscription
329  if (XMLRPCManager::instance()->getServerURI() != *i)
330  {
331  retval &= negotiateConnection(*i);
332  }
333  else
334  {
335  ROSCPP_LOG_DEBUG("Skipping myself (%s, %s)", name_.c_str(), XMLRPCManager::instance()->getServerURI().c_str());
336  }
337  }
338 
339  return retval;
340 }
341 
342 bool Subscription::negotiateConnection(const std::string& xmlrpc_uri)
343 {
344  XmlRpcValue tcpros_array, protos_array, params;
345  XmlRpcValue udpros_array;
346  TransportUDPPtr udp_transport;
347  int protos = 0;
348  V_string transports = transport_hints_.getTransports();
349  if (transports.empty())
350  {
352  transports = transport_hints_.getTransports();
353  }
354  for (V_string::const_iterator it = transports.begin();
355  it != transports.end();
356  ++it)
357  {
358  if (*it == "UDP")
359  {
360  int max_datagram_size = transport_hints_.getMaxDatagramSize();
361  udp_transport = boost::make_shared<TransportUDP>(&PollManager::instance()->getPollSet());
362  if (!max_datagram_size)
363  max_datagram_size = udp_transport->getMaxDatagramSize();
364  udp_transport->createIncoming(0, false);
365  udpros_array[0] = "UDPROS";
366  M_string m;
367  m["topic"] = getName();
368  m["md5sum"] = md5sum();
369  m["callerid"] = this_node::getName();
370  m["type"] = datatype();
372  uint32_t len;
373  Header::write(m, buffer, len);
374  XmlRpcValue v(buffer.get(), len);
375  udpros_array[1] = v;
376  udpros_array[2] = network::getHost();
377  udpros_array[3] = udp_transport->getServerPort();
378  udpros_array[4] = max_datagram_size;
379 
380  protos_array[protos++] = udpros_array;
381  }
382  else if (*it == "TCP")
383  {
384  tcpros_array[0] = std::string("TCPROS");
385  protos_array[protos++] = tcpros_array;
386  }
387  else
388  {
389  ROS_WARN("Unsupported transport type hinted: %s, skipping", it->c_str());
390  }
391  }
392  params[0] = this_node::getName();
393  params[1] = name_;
394  params[2] = protos_array;
395  std::string peer_host;
396  uint32_t peer_port;
397  if (!network::splitURI(xmlrpc_uri, peer_host, peer_port))
398  {
399  ROS_ERROR("Bad xml-rpc URI: [%s]", xmlrpc_uri.c_str());
400  return false;
401  }
402 
403  XmlRpc::XmlRpcClient* c = new XmlRpc::XmlRpcClient(peer_host.c_str(),
404  peer_port, "/");
405  // if (!c.execute("requestTopic", params, result) || !g_node->validateXmlrpcResponse("requestTopic", result, proto))
406 
407  // Initiate the negotiation. We'll come back and check on it later.
408  if (!c->executeNonBlock("requestTopic", params))
409  {
410  ROSCPP_LOG_DEBUG("Failed to contact publisher [%s:%d] for topic [%s]",
411  peer_host.c_str(), peer_port, name_.c_str());
412  delete c;
413  if (udp_transport)
414  {
415  udp_transport->close();
416  }
417 
418  return false;
419  }
420 
421  ROSCPP_LOG_DEBUG("Began asynchronous xmlrpc connection to [%s:%d]", peer_host.c_str(), peer_port);
422 
423  // The PendingConnectionPtr takes ownership of c, and will delete it on
424  // destruction.
425  PendingConnectionPtr conn(boost::make_shared<PendingConnection>(c, udp_transport, shared_from_this(), xmlrpc_uri));
426 
427  XMLRPCManager::instance()->addASyncConnection(conn);
428  // Put this connection on the list that we'll look at later.
429  {
430  boost::mutex::scoped_lock pending_connections_lock(pending_connections_mutex_);
431  pending_connections_.insert(conn);
432  }
433 
434  return true;
435 }
436 
437 void closeTransport(const TransportUDPPtr& trans)
438 {
439  if (trans)
440  {
441  trans->close();
442  }
443 }
444 
446 {
447  boost::mutex::scoped_lock lock(shutdown_mutex_);
448  if (shutting_down_ || dropped_)
449  {
450  return;
451  }
452 
453  {
454  boost::mutex::scoped_lock pending_connections_lock(pending_connections_mutex_);
455  pending_connections_.erase(conn);
456  }
457 
458  TransportUDPPtr udp_transport;
459 
460  std::string peer_host = conn->getClient()->getHost();
461  uint32_t peer_port = conn->getClient()->getPort();
462  std::stringstream ss;
463  ss << "http://" << peer_host << ":" << peer_port << "/";
464  std::string xmlrpc_uri = ss.str();
465  udp_transport = conn->getUDPTransport();
466 
467  XmlRpc::XmlRpcValue proto;
468  if(!XMLRPCManager::instance()->validateXmlrpcResponse("requestTopic", result, proto))
469  {
470  ROSCPP_LOG_DEBUG("Failed to contact publisher [%s:%d] for topic [%s]",
471  peer_host.c_str(), peer_port, name_.c_str());
472  closeTransport(udp_transport);
473  return;
474  }
475 
476  if (proto.size() == 0)
477  {
478  ROSCPP_LOG_DEBUG("Couldn't agree on any common protocols with [%s] for topic [%s]", xmlrpc_uri.c_str(), name_.c_str());
479  closeTransport(udp_transport);
480  return;
481  }
482 
483  if (proto.getType() != XmlRpcValue::TypeArray)
484  {
485  ROSCPP_LOG_DEBUG("Available protocol info returned from %s is not a list.", xmlrpc_uri.c_str());
486  closeTransport(udp_transport);
487  return;
488  }
489  if (proto[0].getType() != XmlRpcValue::TypeString)
490  {
491  ROSCPP_LOG_DEBUG("Available protocol info list doesn't have a string as its first element.");
492  closeTransport(udp_transport);
493  return;
494  }
495 
496  std::string proto_name = proto[0];
497  if (proto_name == "TCPROS")
498  {
499  if (proto.size() != 3 ||
500  proto[1].getType() != XmlRpcValue::TypeString ||
501  proto[2].getType() != XmlRpcValue::TypeInt)
502  {
503  ROSCPP_LOG_DEBUG("publisher implements TCPROS, but the " \
504  "parameters aren't string,int");
505  return;
506  }
507  std::string pub_host = proto[1];
508  int pub_port = proto[2];
509  ROSCPP_CONN_LOG_DEBUG("Connecting via tcpros to topic [%s] at host [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
510 
511  TransportTCPPtr transport(boost::make_shared<TransportTCP>(&PollManager::instance()->getPollSet()));
512  if (transport->connect(pub_host, pub_port))
513  {
514  ConnectionPtr connection(boost::make_shared<Connection>());
515  TransportPublisherLinkPtr pub_link(boost::make_shared<TransportPublisherLink>(shared_from_this(), xmlrpc_uri, transport_hints_));
516 
517  connection->initialize(transport, false, HeaderReceivedFunc());
518  pub_link->initialize(connection);
519 
520  ConnectionManager::instance()->addConnection(connection);
521 
522  boost::mutex::scoped_lock lock(publisher_links_mutex_);
523  addPublisherLink(pub_link);
524 
525  ROSCPP_CONN_LOG_DEBUG("Connected to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
526  }
527  else
528  {
529  ROSCPP_CONN_LOG_DEBUG("Failed to connect to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
530  }
531  }
532  else if (proto_name == "UDPROS")
533  {
534  if (proto.size() != 6 ||
535  proto[1].getType() != XmlRpcValue::TypeString ||
536  proto[2].getType() != XmlRpcValue::TypeInt ||
537  proto[3].getType() != XmlRpcValue::TypeInt ||
538  proto[4].getType() != XmlRpcValue::TypeInt ||
539  proto[5].getType() != XmlRpcValue::TypeBase64)
540  {
541  ROSCPP_LOG_DEBUG("publisher implements UDPROS, but the " \
542  "parameters aren't string,int,int,int,base64");
543  closeTransport(udp_transport);
544  return;
545  }
546  std::string pub_host = proto[1];
547  int pub_port = proto[2];
548  int conn_id = proto[3];
549  int max_datagram_size = proto[4];
550  std::vector<char> header_bytes = proto[5];
551  boost::shared_array<uint8_t> buffer(new uint8_t[header_bytes.size()]);
552  memcpy(buffer.get(), &header_bytes[0], header_bytes.size());
553  Header h;
554  std::string err;
555  if (!h.parse(buffer, header_bytes.size(), err))
556  {
557  ROSCPP_LOG_DEBUG("Unable to parse UDPROS connection header: %s", err.c_str());
558  closeTransport(udp_transport);
559  return;
560  }
561  ROSCPP_LOG_DEBUG("Connecting via udpros to topic [%s] at host [%s:%d] connection id [%08x] max_datagram_size [%d]", name_.c_str(), pub_host.c_str(), pub_port, conn_id, max_datagram_size);
562 
563  std::string error_msg;
564  if (h.getValue("error", error_msg))
565  {
566  ROSCPP_LOG_DEBUG("Received error message in header for connection to [%s]: [%s]", xmlrpc_uri.c_str(), error_msg.c_str());
567  closeTransport(udp_transport);
568  return;
569  }
570 
571  TransportPublisherLinkPtr pub_link(boost::make_shared<TransportPublisherLink>(shared_from_this(), xmlrpc_uri, transport_hints_));
572  if (pub_link->setHeader(h))
573  {
574  ConnectionPtr connection(boost::make_shared<Connection>());
575  connection->initialize(udp_transport, false, NULL);
576  connection->setHeader(h);
577  pub_link->initialize(connection);
578 
579  ConnectionManager::instance()->addConnection(connection);
580 
581  boost::mutex::scoped_lock lock(publisher_links_mutex_);
582  addPublisherLink(pub_link);
583 
584  ROSCPP_LOG_DEBUG("Connected to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
585  }
586  else
587  {
588  ROSCPP_LOG_DEBUG("Failed to connect to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
589  closeTransport(udp_transport);
590  return;
591  }
592  }
593  else
594  {
595  ROSCPP_LOG_DEBUG("Publisher offered unsupported transport [%s]", proto_name.c_str());
596  }
597 }
598 
599 uint32_t Subscription::handleMessage(const SerializedMessage& m, bool ser, bool nocopy, const boost::shared_ptr<M_string>& connection_header, const PublisherLinkPtr& link)
600 {
601  boost::mutex::scoped_lock lock(callbacks_mutex_);
602 
603  uint32_t drops = 0;
604 
605  // Cache the deserializers by type info. If all the subscriptions are the same type this has the same performance as before. If
606  // there are subscriptions with different C++ type (but same ROS message type), this now works correctly rather than passing
607  // garbage to the messages with different C++ types than the first one.
608  cached_deserializers_.clear();
609 
610  ros::Time receipt_time = ros::Time::now();
611 
612  for (V_CallbackInfo::iterator cb = callbacks_.begin();
613  cb != callbacks_.end(); ++cb)
614  {
615  const CallbackInfoPtr& info = *cb;
616 
617  ROS_ASSERT(info->callback_queue_);
618 
619  const std::type_info* ti = &info->helper_->getTypeInfo();
620 
621  if ((nocopy && m.type_info && *ti == *m.type_info) || (ser && (!m.type_info || *ti != *m.type_info)))
622  {
623  MessageDeserializerPtr deserializer;
624 
625  V_TypeAndDeserializer::iterator des_it = cached_deserializers_.begin();
626  V_TypeAndDeserializer::iterator des_end = cached_deserializers_.end();
627  for (; des_it != des_end; ++des_it)
628  {
629  if (*des_it->first == *ti)
630  {
631  deserializer = des_it->second;
632  break;
633  }
634  }
635 
636  if (!deserializer)
637  {
638  deserializer = boost::make_shared<MessageDeserializer>(info->helper_, m, connection_header);
639  cached_deserializers_.push_back(std::make_pair(ti, deserializer));
640  }
641 
642  bool was_full = false;
643  bool nonconst_need_copy = false;
644  if (callbacks_.size() > 1)
645  {
646  nonconst_need_copy = true;
647  }
648 
649  info->subscription_queue_->push(info->helper_, deserializer, info->has_tracked_object_, info->tracked_object_, nonconst_need_copy, receipt_time, &was_full);
650 
651  if (was_full)
652  {
653  ++drops;
654  }
655  else
656  {
657  info->callback_queue_->addCallback(info->subscription_queue_, (uint64_t)info.get());
658  }
659  }
660  }
661 
662  // measure statistics
663  statistics_.callback(connection_header, name_, link->getCallerID(), m, link->getStats().bytes_received_, receipt_time, drops > 0);
664 
665  // If this link is latched, store off the message so we can immediately pass it to new subscribers later
666  if (link->isLatched())
667  {
668  LatchInfo li;
669  li.connection_header = connection_header;
670  li.link = link;
671  li.message = m;
672  li.receipt_time = receipt_time;
673  latched_messages_[link] = li;
674  }
675 
676  cached_deserializers_.clear();
677 
678  return drops;
679 }
680 
681 bool Subscription::addCallback(const SubscriptionCallbackHelperPtr& helper, const std::string& md5sum, CallbackQueueInterface* queue, int32_t queue_size, const VoidConstPtr& tracked_object, bool allow_concurrent_callbacks)
682 {
683  ROS_ASSERT(helper);
684  ROS_ASSERT(queue);
685 
686  statistics_.init(helper);
687 
688  // Decay to a real type as soon as we have a subscriber with a real type
689  {
690  boost::mutex::scoped_lock lock(md5sum_mutex_);
691  if (md5sum_ == "*" && md5sum != "*")
692  {
693 
694  md5sum_ = md5sum;
695  }
696  }
697 
698  if (md5sum != "*" && md5sum != this->md5sum())
699  {
700  return false;
701  }
702 
703  {
704  boost::mutex::scoped_lock lock(callbacks_mutex_);
705 
706  CallbackInfoPtr info(boost::make_shared<CallbackInfo>());
707  info->helper_ = helper;
708  info->callback_queue_ = queue;
709  info->subscription_queue_ = boost::make_shared<SubscriptionQueue>(name_, queue_size, allow_concurrent_callbacks);
710  info->tracked_object_ = tracked_object;
711  info->has_tracked_object_ = false;
712  if (tracked_object)
713  {
714  info->has_tracked_object_ = true;
715  }
716 
717  if (!helper->isConst())
718  {
720  }
721 
722  callbacks_.push_back(info);
723  cached_deserializers_.reserve(callbacks_.size());
724 
725  // if we have any latched links, we need to immediately schedule callbacks
726  if (!latched_messages_.empty())
727  {
728  boost::mutex::scoped_lock lock(publisher_links_mutex_);
729 
730  V_PublisherLink::iterator it = publisher_links_.begin();
731  V_PublisherLink::iterator end = publisher_links_.end();
732  for (; it != end;++it)
733  {
734  const PublisherLinkPtr& link = *it;
735  if (link->isLatched())
736  {
737  M_PublisherLinkToLatchInfo::iterator des_it = latched_messages_.find(link);
738  if (des_it != latched_messages_.end())
739  {
740  const LatchInfo& latch_info = des_it->second;
741 
742  MessageDeserializerPtr des(boost::make_shared<MessageDeserializer>(helper, latch_info.message, latch_info.connection_header));
743  bool was_full = false;
744  info->subscription_queue_->push(info->helper_, des, info->has_tracked_object_, info->tracked_object_, true, latch_info.receipt_time, &was_full);
745  if (!was_full)
746  {
747  info->callback_queue_->addCallback(info->subscription_queue_, (uint64_t)info.get());
748  }
749  }
750  }
751  }
752  }
753  }
754 
755  return true;
756 }
757 
759 {
760  CallbackInfoPtr info;
761  {
762  boost::mutex::scoped_lock cbs_lock(callbacks_mutex_);
763  for (V_CallbackInfo::iterator it = callbacks_.begin();
764  it != callbacks_.end(); ++it)
765  {
766  if ((*it)->helper_ == helper)
767  {
768  info = *it;
769  callbacks_.erase(it);
770 
771  if (!helper->isConst())
772  {
774  }
775 
776  break;
777  }
778  }
779  }
780 
781  if (info)
782  {
783  info->subscription_queue_->clear();
784  info->callback_queue_->removeByID((uint64_t)info.get());
785  }
786 }
787 
789 {
790  (void)h;
791  boost::mutex::scoped_lock lock(md5sum_mutex_);
792  if (md5sum_ == "*")
793  {
794  md5sum_ = link->getMD5Sum();
795  }
796 }
797 
799 {
800  publisher_links_.push_back(link);
801 }
802 
804 {
805  boost::mutex::scoped_lock lock(publisher_links_mutex_);
806 
807  V_PublisherLink::iterator it = std::find(publisher_links_.begin(), publisher_links_.end(), pub_link);
808  if (it != publisher_links_.end())
809  {
810  publisher_links_.erase(it);
811  }
812 
813  if (pub_link->isLatched())
814  {
815  latched_messages_.erase(pub_link);
816  }
817 }
818 
819 void Subscription::getPublishTypes(bool& ser, bool& nocopy, const std::type_info& ti)
820 {
821  boost::mutex::scoped_lock lock(callbacks_mutex_);
822  for (V_CallbackInfo::iterator cb = callbacks_.begin();
823  cb != callbacks_.end(); ++cb)
824  {
825  const CallbackInfoPtr& info = *cb;
826  if (info->helper_->getTypeInfo() == ti)
827  {
828  nocopy = true;
829  }
830  else
831  {
832  ser = true;
833  }
834 
835  if (nocopy && ser)
836  {
837  return;
838  }
839  }
840 }
841 
842 const std::string Subscription::datatype()
843 {
844  return datatype_;
845 }
846 
847 const std::string Subscription::md5sum()
848 {
849  boost::mutex::scoped_lock lock(md5sum_mutex_);
850  return md5sum_;
851 }
852 
853 }
void callback(const boost::shared_ptr< M_string > &connection_header, const std::string &topic, const std::string &callerid, const SerializedMessage &m, const uint64_t &bytes_sent, const ros::Time &received_time, bool dropped)
Definition: statistics.cpp:53
S_PendingConnection pending_connections_
Definition: subscription.h:220
boost::mutex publisher_links_mutex_
Definition: subscription.h:225
void init(const SubscriptionCallbackHelperPtr &helper)
Definition: statistics.cpp:44
TransportHints & reliable()
Specifies a reliable transport. Currently this means TCP.
TransportHints transport_hints_
Definition: subscription.h:227
int size() const
XmlRpcServer s
std::vector< PublisherLinkPtr > V_PublisherLink
Definition: subscription.h:223
ROSCPP_DECL bool splitURI(const std::string &uri, std::string &host, uint32_t &port)
Definition: network.cpp:55
void pendingConnectionDone(const PendingConnectionPtr &pending_conn, XmlRpc::XmlRpcValue &result)
ROSCPP_DECL const std::string & getName()
Returns the name of the current node.
Definition: this_node.cpp:81
M_PublisherLinkToLatchInfo latched_messages_
Definition: subscription.h:240
bool addCallback(const SubscriptionCallbackHelperPtr &helper, const std::string &md5sum, CallbackQueueInterface *queue, int32_t queue_size, const VoidConstPtr &tracked_object, bool allow_concurrent_callbacks)
const std::string & getName() const
Definition: subscription.h:118
XmlRpc::XmlRpcValue getStats()
Abstract interface for a queue used to handle all callbacks within roscpp.
V_PublisherLink publisher_links_
Definition: subscription.h:224
boost::mutex pending_connections_mutex_
Definition: subscription.h:221
Type const & getType() const
uint32_t nonconst_callbacks_
Definition: subscription.h:213
#define ROS_WARN(...)
boost::function< bool(const ConnectionPtr &, const Header &)> HeaderReceivedFunc
Definition: connection.h:62
void getInfo(XmlRpc::XmlRpcValue &info)
Provides a way of specifying network transport hints to ros::NodeHandle::subscribe() and someday ros:...
std::string md5sum_
Definition: subscription.h:209
void headerReceived(const PublisherLinkPtr &link, const Header &h)
V_TypeAndDeserializer cached_deserializers_
Definition: subscription.h:243
void removeCallback(const SubscriptionCallbackHelperPtr &helper)
bool negotiateConnection(const std::string &xmlrpc_uri)
Negotiates a connection with a publisher.
std::map< std::string, std::string > M_string
Definition: subscription.h:102
#define ROSCPP_LOG_DEBUG(...)
Definition: file_log.h:35
void getPublishTypes(bool &ser, bool &nocopy, const std::type_info &ti)
std::vector< std::string > V_string
ROSCPP_DECL const std::string & getHost()
Definition: network.cpp:50
void setSize(int size)
static const ConnectionManagerPtr & instance()
SerializedMessage message
Definition: subscription.h:233
uint32_t handleMessage(const SerializedMessage &m, bool ser, bool nocopy, const boost::shared_ptr< M_string > &connection_header, const PublisherLinkPtr &link)
Called to notify that a new message has arrived from a publisher. Schedules the callback for invokati...
const std::type_info * type_info
StatisticsLogger statistics_
Definition: subscription.h:229
static void write(const M_string &key_vals, boost::shared_array< uint8_t > &buffer, uint32_t &size)
boost::mutex callbacks_mutex_
Definition: subscription.h:211
int getMaxDatagramSize()
Returns the maximum datagram size specified on this TransportHints, or 0 if no size was specified...
const std::string md5sum()
void removePublisherLink(const PublisherLinkPtr &pub_link)
Removes a subscriber from our list.
void addPublisherLink(const PublisherLinkPtr &link)
boost::mutex md5sum_mutex_
Definition: subscription.h:208
Subscription(const std::string &name, const std::string &md5sum, const std::string &datatype, const TransportHints &transport_hints)
void shutdown()
Terminate all our PublisherLinks and join our callback thread if it exists.
std::string name_
Definition: subscription.h:207
static const PollManagerPtr & instance()
bool pubUpdate(const std::vector< std::string > &pubs)
Handle a publisher update list received from the master. Creates/drops PublisherLinks based on the li...
const V_string & getTransports()
Returns a vector of transports, ordered by preference.
#define ROSCPP_CONN_LOG_DEBUG(...)
Definition: file_log.h:36
std::string datatype_
Definition: subscription.h:210
static const XMLRPCManagerPtr & instance()
void addLocalConnection(const PublicationPtr &pub)
static Time now()
bool urisEqual(const std::string &uri1, const std::string &uri2)
bool executeNonBlock(const char *method, XmlRpcValue const &params)
const std::string datatype()
boost::mutex shutdown_mutex_
Definition: subscription.h:217
void closeTransport(const TransportUDPPtr &trans)
#define ROS_ASSERT(cond)
#define ROS_ERROR(...)
virtual ~Subscription()
V_CallbackInfo callbacks_
Definition: subscription.h:212
uint32_t getNumPublishers()
void drop()
Terminate all our PublisherLinks.
boost::shared_ptr< std::map< std::string, std::string > > connection_header
Definition: subscription.h:235


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas
autogenerated on Mon Nov 2 2020 03:52:26