subscription.h
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2008, Morgan Quigley and Willow Garage, Inc.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are met:
6  * * Redistributions of source code must retain the above copyright notice,
7  * this list of conditions and the following disclaimer.
8  * * Redistributions in binary form must reproduce the above copyright
9  * notice, this list of conditions and the following disclaimer in the
10  * documentation and/or other materials provided with the distribution.
11  * * Neither the names of Stanford University or Willow Garage, Inc. nor the names of its
12  * contributors may be used to endorse or promote products derived from
13  * this software without specific prior written permission.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
19  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25  * POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #ifndef ROSCPP_SUBSCRIPTION_H
29 #define ROSCPP_SUBSCRIPTION_H
30 
31 #include <queue>
32 #include "ros/common.h"
33 #include "ros/header.h"
34 #include "ros/forwards.h"
35 #include "ros/transport_hints.h"
36 #include "ros/xmlrpc_manager.h"
37 #include "ros/statistics.h"
38 #include "xmlrpcpp/XmlRpc.h"
39 
40 #include <boost/thread.hpp>
41 #include <boost/shared_ptr.hpp>
42 #include <boost/enable_shared_from_this.hpp>
43 
44 namespace ros
45 {
46 
47 class PublisherLink;
49 
50 class SubscriptionCallback;
52 
55 
58 
61 
65 class ROSCPP_DECL Subscription : public boost::enable_shared_from_this<Subscription>
66 {
67 public:
68  Subscription(const std::string &name, const std::string& md5sum, const std::string& datatype, const TransportHints& transport_hints);
69  virtual ~Subscription();
70 
74  void drop();
78  void shutdown();
83  bool pubUpdate(const std::vector<std::string> &pubs);
88  bool negotiateConnection(const std::string& xmlrpc_uri);
89 
90  void addLocalConnection(const PublicationPtr& pub);
91 
95  bool isDropped() { return dropped_; }
96  XmlRpc::XmlRpcValue getStats();
97  void getInfo(XmlRpc::XmlRpcValue& info);
98 
99  bool addCallback(const SubscriptionCallbackHelperPtr& helper, const std::string& md5sum, CallbackQueueInterface* queue, int32_t queue_size, const VoidConstPtr& tracked_object, bool allow_concurrent_callbacks);
100  void removeCallback(const SubscriptionCallbackHelperPtr& helper);
101 
102  typedef std::map<std::string, std::string> M_string;
103 
108  uint32_t handleMessage(const SerializedMessage& m, bool ser, bool nocopy, const boost::shared_ptr<M_string>& connection_header, const PublisherLinkPtr& link);
109 
110  const std::string datatype();
111  const std::string md5sum();
112 
116  void removePublisherLink(const PublisherLinkPtr& pub_link);
117 
118  const std::string& getName() const { return name_; }
119  uint32_t getNumCallbacks() const { return callbacks_.size(); }
120  uint32_t getNumPublishers();
121 
122  // We'll keep a list of these objects, representing in-progress XMLRPC
123  // connections to other nodes.
124  class ROSCPP_DECL PendingConnection : public ASyncXMLRPCConnection
125  {
126  public:
127  PendingConnection(XmlRpc::XmlRpcClient* client, TransportUDPPtr udp_transport, const SubscriptionWPtr& parent, const std::string& remote_uri)
128  : client_(client)
129  , udp_transport_(udp_transport)
130  , parent_(parent)
131  , remote_uri_(remote_uri)
132  {}
133 
135  {
136  delete client_;
137  }
138 
139  XmlRpc::XmlRpcClient* getClient() const { return client_; }
140  TransportUDPPtr getUDPTransport() const { return udp_transport_; }
141 
143  {
145  }
146 
148  {
149  disp->removeSource(client_);
150  }
151 
152  virtual bool check()
153  {
154  SubscriptionPtr parent = parent_.lock();
155  if (!parent)
156  {
157  return true;
158  }
159 
160  XmlRpc::XmlRpcValue result;
161  if (client_->executeCheckDone(result))
162  {
163  parent->pendingConnectionDone(boost::dynamic_pointer_cast<PendingConnection>(shared_from_this()), result);
164  return true;
165  }
166 
167  return false;
168  }
169 
170  const std::string& getRemoteURI() { return remote_uri_; }
171 
172  private:
176  std::string remote_uri_;
177  };
179 
180  void pendingConnectionDone(const PendingConnectionPtr& pending_conn, XmlRpc::XmlRpcValue& result);
181 
182  void getPublishTypes(bool& ser, bool& nocopy, const std::type_info& ti);
183 
184  void headerReceived(const PublisherLinkPtr& link, const Header& h);
185 
186 private:
187  Subscription(const Subscription &); // not copyable
188  Subscription &operator =(const Subscription &); // nor assignable
189 
190  void dropAllConnections();
191 
192  void addPublisherLink(const PublisherLinkPtr& link);
193 
195  {
197 
198  // Only used if callback_queue_ is non-NULL (NodeHandle API)
203  };
205  typedef std::vector<CallbackInfoPtr> V_CallbackInfo;
206 
207  std::string name_;
208  boost::mutex md5sum_mutex_;
209  std::string md5sum_;
210  std::string datatype_;
211  boost::mutex callbacks_mutex_;
214 
215  bool dropped_;
217  boost::mutex shutdown_mutex_;
218 
219  typedef std::set<PendingConnectionPtr> S_PendingConnection;
222 
223  typedef std::vector<PublisherLinkPtr> V_PublisherLink;
226 
228 
230 
231  struct LatchInfo
232  {
237  };
238 
239  typedef std::map<PublisherLinkPtr, LatchInfo> M_PublisherLinkToLatchInfo;
241 
242  typedef std::vector<std::pair<const std::type_info*, MessageDeserializerPtr> > V_TypeAndDeserializer;
244 };
245 
246 }
247 
248 #endif
249 
ros::Subscription::S_PendingConnection
std::set< PendingConnectionPtr > S_PendingConnection
Definition: subscription.h:219
ros::Subscription::V_CallbackInfo
std::vector< CallbackInfoPtr > V_CallbackInfo
Definition: subscription.h:205
ros::SubscriptionCallbackHelperPtr
boost::shared_ptr< SubscriptionCallbackHelper > SubscriptionCallbackHelperPtr
Definition: message_deserializer.h:42
ros::SerializedMessage
ros::Subscription::PendingConnection::getClient
XmlRpc::XmlRpcClient * getClient() const
Definition: subscription.h:139
XmlRpc::XmlRpcDispatch::WritableEvent
WritableEvent
transport_hints.h
md5sum
const char * md5sum()
ros::Subscription::PendingConnection::remote_uri_
std::string remote_uri_
Definition: subscription.h:176
ros::Subscription::LatchInfo
Definition: subscription.h:231
ros::MessageDeserializerPtr
boost::shared_ptr< MessageDeserializer > MessageDeserializerPtr
Definition: message_deserializer.h:61
boost::shared_ptr< PublisherLink >
forwards.h
ros::Subscription::PendingConnection::check
virtual bool check()
Definition: subscription.h:152
ros::Subscription::shutting_down_
bool shutting_down_
Definition: subscription.h:216
ros::Subscription::md5sum_
std::string md5sum_
Definition: subscription.h:209
ros
ros::Subscription::PendingConnectionPtr
boost::shared_ptr< PendingConnection > PendingConnectionPtr
Definition: subscription.h:178
ros::Header
ros::Subscription::LatchInfo::message
SerializedMessage message
Definition: subscription.h:233
ros::Subscription::md5sum_mutex_
boost::mutex md5sum_mutex_
Definition: subscription.h:208
ros::Subscription::LatchInfo::link
PublisherLinkPtr link
Definition: subscription.h:234
ros::Subscription::dropped_
bool dropped_
Definition: subscription.h:215
ros::TransportHints
Provides a way of specifying network transport hints to ros::NodeHandle::subscribe() and someday ros:...
Definition: transport_hints.h:54
ros::shutdown
ROSCPP_DECL void shutdown()
Disconnects everything and unregisters from the master. It is generally not necessary to call this fu...
Definition: init.cpp:605
XmlRpc::XmlRpcClient
statistics.h
ros::Subscription::M_PublisherLinkToLatchInfo
std::map< PublisherLinkPtr, LatchInfo > M_PublisherLinkToLatchInfo
Definition: subscription.h:239
ros::Subscription::PendingConnection::udp_transport_
TransportUDPPtr udp_transport_
Definition: subscription.h:174
ros::Subscription::cached_deserializers_
V_TypeAndDeserializer cached_deserializers_
Definition: subscription.h:243
ros::Subscription::PendingConnection
Definition: subscription.h:124
ros::Subscription::V_PublisherLink
std::vector< PublisherLinkPtr > V_PublisherLink
Definition: subscription.h:223
XmlRpc::XmlRpcDispatch::addSource
void addSource(XmlRpcSource *source, unsigned eventMask)
ros::Subscription::CallbackInfo::has_tracked_object_
bool has_tracked_object_
Definition: subscription.h:201
ros::Subscription::PendingConnection::getUDPTransport
TransportUDPPtr getUDPTransport() const
Definition: subscription.h:140
ros::Subscription::shutdown_mutex_
boost::mutex shutdown_mutex_
Definition: subscription.h:217
ros::Subscription::getName
const std::string & getName() const
Definition: subscription.h:118
ros::Subscription::nonconst_callbacks_
uint32_t nonconst_callbacks_
Definition: subscription.h:213
ros::PublisherLinkPtr
boost::shared_ptr< PublisherLink > PublisherLinkPtr
Definition: forwards.h:78
ros::Subscription::PendingConnection::client_
XmlRpc::XmlRpcClient * client_
Definition: subscription.h:173
ros::SubscriptionCallbackPtr
boost::shared_ptr< SubscriptionCallback > SubscriptionCallbackPtr
Definition: subscription.h:50
XmlRpc.h
ros::Subscription::callbacks_
V_CallbackInfo callbacks_
Definition: subscription.h:212
ros::Subscription::name_
std::string name_
Definition: subscription.h:207
xmlrpc_manager.h
ros::Subscription::isDropped
bool isDropped()
Returns whether this Subscription has been dropped or not.
Definition: subscription.h:95
XmlRpc::XmlRpcDispatch::removeSource
void removeSource(XmlRpcSource *source)
ros::SubscriptionWPtr
boost::weak_ptr< Subscription > SubscriptionWPtr
Definition: forwards.h:74
ros::Subscription::CallbackInfo::callback_queue_
CallbackQueueInterface * callback_queue_
Definition: subscription.h:196
ros::Subscription::CallbackInfoPtr
boost::shared_ptr< CallbackInfo > CallbackInfoPtr
Definition: subscription.h:204
ros::Subscription::PendingConnection::PendingConnection
PendingConnection(XmlRpc::XmlRpcClient *client, TransportUDPPtr udp_transport, const SubscriptionWPtr &parent, const std::string &remote_uri)
Definition: subscription.h:127
ros::Subscription::statistics_
StatisticsLogger statistics_
Definition: subscription.h:229
ros::Subscription::CallbackInfo::helper_
SubscriptionCallbackHelperPtr helper_
Definition: subscription.h:199
ros::Subscription::PendingConnection::removeFromDispatch
virtual void removeFromDispatch(XmlRpc::XmlRpcDispatch *disp)
Definition: subscription.h:147
ros::Subscription::PendingConnection::parent_
SubscriptionWPtr parent_
Definition: subscription.h:175
ros::Subscription::LatchInfo::connection_header
boost::shared_ptr< std::map< std::string, std::string > > connection_header
Definition: subscription.h:235
ros::VoidConstWPtr
boost::weak_ptr< void const > VoidConstWPtr
Definition: forwards.h:53
ros::Subscription::PendingConnection::addToDispatch
virtual void addToDispatch(XmlRpc::XmlRpcDispatch *disp)
Definition: subscription.h:142
ros::Subscription::publisher_links_mutex_
boost::mutex publisher_links_mutex_
Definition: subscription.h:225
ros::Subscription::LatchInfo::receipt_time
ros::Time receipt_time
Definition: subscription.h:236
ros::Subscription::PendingConnection::~PendingConnection
~PendingConnection()
Definition: subscription.h:134
ros::Subscription::M_string
std::map< std::string, std::string > M_string
Definition: subscription.h:102
ros::Subscription::callbacks_mutex_
boost::mutex callbacks_mutex_
Definition: subscription.h:211
ros::Subscription::latched_messages_
M_PublisherLinkToLatchInfo latched_messages_
Definition: subscription.h:240
ros::Subscription::pending_connections_
S_PendingConnection pending_connections_
Definition: subscription.h:220
ros::Time
ros::Subscription::getNumCallbacks
uint32_t getNumCallbacks() const
Definition: subscription.h:119
ros::Subscription::PendingConnection::getRemoteURI
const std::string & getRemoteURI()
Definition: subscription.h:170
ros::SubscriptionQueuePtr
boost::shared_ptr< SubscriptionQueue > SubscriptionQueuePtr
Definition: subscription.h:53
ros::SubscriptionQueue
Definition: subscription_queue.h:50
ros::Subscription::CallbackInfo::tracked_object_
VoidConstWPtr tracked_object_
Definition: subscription.h:202
datatype
const char * datatype()
ros::Subscription
Manages a subscription on a single topic.
Definition: subscription.h:65
header.h
ros::MessageDeserializer
Definition: message_deserializer.h:45
XmlRpc::XmlRpcDispatch
ros::StatisticsLogger
This class logs statistics data about a ROS connection and publishs them periodically on a common top...
Definition: statistics.h:49
ros::Subscription::CallbackInfo::subscription_queue_
SubscriptionQueuePtr subscription_queue_
Definition: subscription.h:200
ros::Subscription::datatype_
std::string datatype_
Definition: subscription.h:210
ros::Subscription::CallbackInfo
Definition: subscription.h:194
ros::ASyncXMLRPCConnection
Definition: xmlrpc_manager.h:60
XmlRpc::XmlRpcDispatch::Exception
Exception
ros::Subscription::V_TypeAndDeserializer
std::vector< std::pair< const std::type_info *, MessageDeserializerPtr > > V_TypeAndDeserializer
Definition: subscription.h:242
ros::SubscriptionCallbackHelper
Abstract base class used by subscriptions to deal with concrete message types through a common interf...
Definition: subscription_callback_helper.h:69
ros::Subscription::transport_hints_
TransportHints transport_hints_
Definition: subscription.h:227
ros::Subscription::publisher_links_
V_PublisherLink publisher_links_
Definition: subscription.h:224
ros::Subscription::pending_connections_mutex_
boost::mutex pending_connections_mutex_
Definition: subscription.h:221
XmlRpc::XmlRpcValue
ros::CallbackQueueInterface
Abstract interface for a queue used to handle all callbacks within roscpp.
Definition: callback_queue_interface.h:82


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas , Jacob Perron
autogenerated on Thu Nov 23 2023 04:01:44