subscription.h
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2008, Morgan Quigley and Willow Garage, Inc.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are met:
6  * * Redistributions of source code must retain the above copyright notice,
7  * this list of conditions and the following disclaimer.
8  * * Redistributions in binary form must reproduce the above copyright
9  * notice, this list of conditions and the following disclaimer in the
10  * documentation and/or other materials provided with the distribution.
11  * * Neither the names of Stanford University or Willow Garage, Inc. nor the names of its
12  * contributors may be used to endorse or promote products derived from
13  * this software without specific prior written permission.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
19  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25  * POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #ifndef ROSCPP_SUBSCRIPTION_H
29 #define ROSCPP_SUBSCRIPTION_H
30 
31 #include <queue>
32 #include "ros/common.h"
33 #include "ros/header.h"
34 #include "ros/forwards.h"
35 #include "ros/transport_hints.h"
36 #include "ros/xmlrpc_manager.h"
37 #include "ros/statistics.h"
38 #include "xmlrpcpp/XmlRpc.h"
39 
40 #include <boost/thread.hpp>
41 #include <boost/shared_ptr.hpp>
42 #include <boost/enable_shared_from_this.hpp>
43 
44 namespace ros
45 {
46 
47 class PublisherLink;
49 
50 class SubscriptionCallback;
52 
55 
58 
61 
65 class ROSCPP_DECL Subscription : public boost::enable_shared_from_this<Subscription>
66 {
67 public:
68  Subscription(const std::string &name, const std::string& md5sum, const std::string& datatype, const TransportHints& transport_hints);
69  virtual ~Subscription();
70 
74  void drop();
78  void shutdown();
83  bool pubUpdate(const std::vector<std::string> &pubs);
88  bool negotiateConnection(const std::string& xmlrpc_uri);
89 
90  void addLocalConnection(const PublicationPtr& pub);
91 
95  bool isDropped() { return dropped_; }
96  XmlRpc::XmlRpcValue getStats();
97  void getInfo(XmlRpc::XmlRpcValue& info);
98 
99  bool addCallback(const SubscriptionCallbackHelperPtr& helper, const std::string& md5sum, CallbackQueueInterface* queue, int32_t queue_size, const VoidConstPtr& tracked_object, bool allow_concurrent_callbacks);
100  void removeCallback(const SubscriptionCallbackHelperPtr& helper);
101 
102  typedef std::map<std::string, std::string> M_string;
103 
108  uint32_t handleMessage(const SerializedMessage& m, bool ser, bool nocopy, const boost::shared_ptr<M_string>& connection_header, const PublisherLinkPtr& link);
109 
110  const std::string datatype();
111  const std::string md5sum();
112 
116  void removePublisherLink(const PublisherLinkPtr& pub_link);
117 
118  const std::string& getName() const { return name_; }
119  uint32_t getNumCallbacks() const { return callbacks_.size(); }
120  uint32_t getNumPublishers();
121 
122  // We'll keep a list of these objects, representing in-progress XMLRPC
123  // connections to other nodes.
124  class ROSCPP_DECL PendingConnection : public ASyncXMLRPCConnection
125  {
126  public:
127  PendingConnection(XmlRpc::XmlRpcClient* client, TransportUDPPtr udp_transport, const SubscriptionWPtr& parent, const std::string& remote_uri)
128  : client_(client)
129  , udp_transport_(udp_transport)
130  , parent_(parent)
131  , remote_uri_(remote_uri)
132  {}
133 
135  {
136  delete client_;
137  }
138 
139  XmlRpc::XmlRpcClient* getClient() const { return client_; }
140  TransportUDPPtr getUDPTransport() const { return udp_transport_; }
141 
143  {
145  }
146 
148  {
149  disp->removeSource(client_);
150  }
151 
152  virtual bool check()
153  {
154  SubscriptionPtr parent = parent_.lock();
155  if (!parent)
156  {
157  return true;
158  }
159 
160  XmlRpc::XmlRpcValue result;
161  if (client_->executeCheckDone(result))
162  {
163  parent->pendingConnectionDone(boost::dynamic_pointer_cast<PendingConnection>(shared_from_this()), result);
164  return true;
165  }
166 
167  return false;
168  }
169 
170  const std::string& getRemoteURI() { return remote_uri_; }
171 
172  private:
176  std::string remote_uri_;
177  };
179 
180  void pendingConnectionDone(const PendingConnectionPtr& pending_conn, XmlRpc::XmlRpcValue& result);
181 
182  void getPublishTypes(bool& ser, bool& nocopy, const std::type_info& ti);
183 
184  void headerReceived(const PublisherLinkPtr& link, const Header& h);
185 
186 private:
187  Subscription(const Subscription &); // not copyable
188  Subscription &operator =(const Subscription &); // nor assignable
189 
190  void dropAllConnections();
191 
192  void addPublisherLink(const PublisherLinkPtr& link);
193 
195  {
197 
198  // Only used if callback_queue_ is non-NULL (NodeHandle API)
199  SubscriptionCallbackHelperPtr helper_;
200  SubscriptionQueuePtr subscription_queue_;
203  };
205  typedef std::vector<CallbackInfoPtr> V_CallbackInfo;
206 
207  std::string name_;
208  boost::mutex md5sum_mutex_;
209  std::string md5sum_;
210  std::string datatype_;
211  boost::mutex callbacks_mutex_;
212  V_CallbackInfo callbacks_;
214 
215  bool dropped_;
217  boost::mutex shutdown_mutex_;
218 
219  typedef std::set<PendingConnectionPtr> S_PendingConnection;
220  S_PendingConnection pending_connections_;
222 
223  typedef std::vector<PublisherLinkPtr> V_PublisherLink;
224  V_PublisherLink publisher_links_;
226 
228 
230 
231  struct LatchInfo
232  {
234  PublisherLinkPtr link;
237  };
238 
239  typedef std::map<PublisherLinkPtr, LatchInfo> M_PublisherLinkToLatchInfo;
240  M_PublisherLinkToLatchInfo latched_messages_;
241 
242  typedef std::vector<std::pair<const std::type_info*, MessageDeserializerPtr> > V_TypeAndDeserializer;
243  V_TypeAndDeserializer cached_deserializers_;
244 };
245 
246 }
247 
248 #endif
249 
Manages a subscription on a single topic.
Definition: subscription.h:65
const std::string & getRemoteURI()
Definition: subscription.h:170
S_PendingConnection pending_connections_
Definition: subscription.h:220
SubscriptionQueuePtr subscription_queue_
Definition: subscription.h:200
boost::mutex publisher_links_mutex_
Definition: subscription.h:225
Abstract base class used by subscriptions to deal with concrete message types through a common interf...
TransportHints transport_hints_
Definition: subscription.h:227
boost::shared_ptr< SubscriptionCallback > SubscriptionCallbackPtr
Definition: subscription.h:50
boost::shared_ptr< CallbackInfo > CallbackInfoPtr
Definition: subscription.h:204
std::vector< PublisherLinkPtr > V_PublisherLink
Definition: subscription.h:223
void removeSource(XmlRpcSource *source)
M_PublisherLinkToLatchInfo latched_messages_
Definition: subscription.h:240
Abstract interface for a queue used to handle all callbacks within roscpp.
V_PublisherLink publisher_links_
Definition: subscription.h:224
boost::mutex pending_connections_mutex_
Definition: subscription.h:221
uint32_t nonconst_callbacks_
Definition: subscription.h:213
virtual void addToDispatch(XmlRpc::XmlRpcDispatch *disp)
Definition: subscription.h:142
boost::shared_ptr< SubscriptionQueue > SubscriptionQueuePtr
Definition: subscription.h:53
boost::shared_ptr< SubscriptionCallbackHelper > SubscriptionCallbackHelperPtr
Provides a way of specifying network transport hints to ros::NodeHandle::subscribe() and someday ros:...
boost::weak_ptr< Subscription > SubscriptionWPtr
Definition: forwards.h:74
boost::shared_ptr< PublisherLink > PublisherLinkPtr
Definition: forwards.h:78
std::string md5sum_
Definition: subscription.h:209
virtual void removeFromDispatch(XmlRpc::XmlRpcDispatch *disp)
Definition: subscription.h:147
V_TypeAndDeserializer cached_deserializers_
Definition: subscription.h:243
std::map< std::string, std::string > M_string
Definition: subscription.h:102
const char * datatype()
SerializedMessage message
Definition: subscription.h:233
boost::weak_ptr< void const > VoidConstWPtr
Definition: forwards.h:53
StatisticsLogger statistics_
Definition: subscription.h:229
boost::mutex callbacks_mutex_
Definition: subscription.h:211
uint32_t getNumCallbacks() const
Definition: subscription.h:119
std::set< PendingConnectionPtr > S_PendingConnection
Definition: subscription.h:219
std::vector< CallbackInfoPtr > V_CallbackInfo
Definition: subscription.h:205
std::map< PublisherLinkPtr, LatchInfo > M_PublisherLinkToLatchInfo
Definition: subscription.h:239
boost::mutex md5sum_mutex_
Definition: subscription.h:208
std::vector< std::pair< const std::type_info *, MessageDeserializerPtr > > V_TypeAndDeserializer
Definition: subscription.h:242
std::string name_
Definition: subscription.h:207
This class logs statistics data about a ROS connection and publishs them periodically on a common top...
Definition: statistics.h:49
CallbackQueueInterface * callback_queue_
Definition: subscription.h:196
TransportUDPPtr getUDPTransport() const
Definition: subscription.h:140
SubscriptionCallbackHelperPtr helper_
Definition: subscription.h:199
bool isDropped()
Returns whether this Subscription has been dropped or not.
Definition: subscription.h:95
XmlRpc::XmlRpcClient * getClient() const
Definition: subscription.h:139
void addSource(XmlRpcSource *source, unsigned eventMask)
std::string datatype_
Definition: subscription.h:210
XmlRpc::XmlRpcClient * client_
Definition: subscription.h:173
ROSCPP_DECL void shutdown()
Disconnects everything and unregisters from the master. It is generally not necessary to call this fu...
Definition: init.cpp:594
const char * md5sum()
boost::mutex shutdown_mutex_
Definition: subscription.h:217
const std::string & getName() const
Definition: subscription.h:118
boost::shared_ptr< MessageDeserializer > MessageDeserializerPtr
boost::shared_ptr< PendingConnection > PendingConnectionPtr
Definition: subscription.h:178
V_CallbackInfo callbacks_
Definition: subscription.h:212
boost::shared_ptr< std::map< std::string, std::string > > connection_header
Definition: subscription.h:235
PendingConnection(XmlRpc::XmlRpcClient *client, TransportUDPPtr udp_transport, const SubscriptionWPtr &parent, const std::string &remote_uri)
Definition: subscription.h:127


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas
autogenerated on Mon Feb 28 2022 23:33:27