subscriber.h
Go to the documentation of this file.
1 /*********************************************************************
2 * Software License Agreement (BSD License)
3 *
4 * Copyright (c) 2010, Willow Garage, Inc.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above
14 * copyright notice, this list of conditions and the following
15 * disclaimer in the documentation and/or other materials provided
16 * with the distribution.
17 * * Neither the name of the Willow Garage nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 * POSSIBILITY OF SUCH DAMAGE.
33 *********************************************************************/
34 
35 #ifndef ROSRT_SUBSCRIBER_H
36 #define ROSRT_SUBSCRIBER_H
37 
38 #include <lockfree/object_pool.h>
39 #include "detail/pool_gc.h"
40 
41 #include <ros/atomic.h>
42 #include <ros/ros.h>
43 
44 namespace ros
45 {
46 class CallbackQueueInterface;
47 } // namespace ros
48 
49 namespace rosrt
50 {
51 
52 namespace detail
53 {
55 } // namespace detail
56 
74 template<typename M>
76 {
77 public:
82  : pool_(0)
83  {
84  }
85 
91  Subscriber(uint32_t message_pool_size)
92  : pool_(0)
93  {
94  initialize(message_pool_size);
95  }
96 
105  Subscriber(uint32_t message_pool_size, ros::NodeHandle& nh, const std::string& topic, const ros::TransportHints& transport_hints = ros::TransportHints())
106  : pool_(0)
107  {
108  initialize(message_pool_size);
109  subscribe(nh, topic, transport_hints);
110  }
111 
113  {
114  M const* latest = latest_.exchange(0);
115  if (latest)
116  {
117  pool_->free(latest);
118  }
119 
120  detail::addPoolToGC((void*)pool_, detail::deletePool<M>, detail::poolIsDeletable<M>);
121  }
122 
128  void initialize(uint32_t message_pool_size)
129  {
130  ROS_ASSERT(message_pool_size > 1);
131  ROS_ASSERT(!pool_);
133  pool_->initialize(message_pool_size, M());
134  latest_.store(0);
135  }
136 
146  bool initialize(uint32_t message_pool_size, ros::NodeHandle& nh, const std::string& topic, const ros::TransportHints& transport_hints = ros::TransportHints())
147  {
148  initialize(message_pool_size);
149  return subscribe(nh, topic, transport_hints);
150  }
151 
159  bool subscribe(ros::NodeHandle& nh, const std::string& topic, const ros::TransportHints& transport_hints = ros::TransportHints())
160  {
162 #ifdef ROS_NEW_SERIALIZATION_API
163  ops.template init<M>(topic, 1, boost::bind(&Subscriber::callback, this, _1), boost::bind(&lockfree::ObjectPool<M>::allocateShared, pool_));
164 #else
165  ops.template init<M>(topic, 1, boost::bind(&Subscriber::callback, this, _1));
166 #endif
168  sub_ = nh.subscribe(ops);
169  return (bool)sub_;
170  }
171 
183  {
184  M const* latest = latest_.exchange(0);
185  if (!latest)
186  {
188  }
189 
191  if (!ptr)
192  {
193  pool_->free(latest);
195  }
196 
197  return ptr;
198  }
199 
200 private:
202  {
203  M const* latest = 0;
204  // If our pool doesn't own this message (due to multiple subscribers on the same topic)
205  // make a copy
206  if (!pool_->owns(msg.get()))
207  {
208  M* copy = pool_->allocate();
209 
210  if (!copy)
211  {
212  return;
213  }
214 
215  *copy = *msg;
216  latest = copy;
217  }
218  else
219  {
220  latest = pool_->removeShared(msg);
221  }
222 
223  M const* old = latest_.exchange(latest);
224  if (old)
225  {
226  pool_->free(old);
227  }
228  }
229 
230  ros::atomic<M const*> latest_;
231 
234 };
235 
236 } // namespace rosrt
237 
238 #endif // ROSRT_SUBSCRIBER_H
rosrt::Subscriber::callback
void callback(const boost::shared_ptr< M const > &msg)
Definition: subscriber.h:201
rosrt::Subscriber::Subscriber
Subscriber(uint32_t message_pool_size, ros::NodeHandle &nh, const std::string &topic, const ros::TransportHints &transport_hints=ros::TransportHints())
Constructor with initialization and subscription.
Definition: subscriber.h:105
rosrt::detail::getSubscriberCallbackQueue
ros::CallbackQueueInterface * getSubscriberCallbackQueue()
Definition: subscriber.cpp:125
object_pool.h
boost::shared_ptr
lockfree::ObjectPool< M >
ros
rosrt
Definition: managers.h:38
ros.h
lockfree::ObjectPool::removeShared
T * removeShared(const boost::shared_ptr< T > &t)
ros::TransportHints
ros::SubscribeOptions::callback_queue
CallbackQueueInterface * callback_queue
rosrt::Subscriber::pool_
lockfree::ObjectPool< M > * pool_
Definition: subscriber.h:232
rosrt::Subscriber::Subscriber
Subscriber()
Default constructor. You must call initialize() before doing anything else if you use this constructo...
Definition: subscriber.h:81
rosrt::Subscriber::latest_
ros::atomic< M const * > latest_
Definition: subscriber.h:230
lockfree::ObjectPool::allocate
T * allocate()
lockfree::ObjectPool::owns
bool owns(const boost::shared_ptr< T const > &t)
rosrt::Subscriber::poll
boost::shared_ptr< M const > poll()
Definition: subscriber.h:182
rosrt::detail::addPoolToGC
void addPoolToGC(void *pool, PoolDeleteFunc deleter, PoolDeletableFunc deletable)
Definition: simple_gc.cpp:123
atomic.h
rosrt::Subscriber::~Subscriber
~Subscriber()
Definition: subscriber.h:112
rosrt::Subscriber::Subscriber
Subscriber(uint32_t message_pool_size)
Constructor with initialization. Call subscribe() to subscribe to a topic.
Definition: subscriber.h:91
ros::NodeHandle::subscribe
Subscriber subscribe(const std::string &topic, uint32_t queue_size, const boost::function< void(C)> &callback, const VoidConstPtr &tracked_object=VoidConstPtr(), const TransportHints &transport_hints=TransportHints())
lockfree::ObjectPool::makeShared
boost::shared_ptr< T > makeShared(T *t)
rosrt::Subscriber::initialize
bool initialize(uint32_t message_pool_size, ros::NodeHandle &nh, const std::string &topic, const ros::TransportHints &transport_hints=ros::TransportHints())
Initialize this subscribe. Only use with the default constructor.
Definition: subscriber.h:146
lockfree::ObjectPool::free
void free(T const *t)
rosrt::Subscriber::initialize
void initialize(uint32_t message_pool_size)
Initialize this subscribe. Only use with the default constructor.
Definition: subscriber.h:128
ros::SubscribeOptions
lockfree::ObjectPool::initialize
void initialize(uint32_t count, const T &tmpl)
ROS_ASSERT
#define ROS_ASSERT(cond)
rosrt::Subscriber::subscribe
bool subscribe(ros::NodeHandle &nh, const std::string &topic, const ros::TransportHints &transport_hints=ros::TransportHints())
Subscribe to a topic.
Definition: subscriber.h:159
rosrt::Subscriber::sub_
ros::Subscriber sub_
Definition: subscriber.h:233
pool_gc.h
rosrt::Subscriber
A lock-free subscriber. Allows you to receive ROS messages inside a realtime thread.
Definition: subscriber.h:75
ros::CallbackQueueInterface
ros::NodeHandle
ros::Subscriber


rosrt
Author(s): Josh Faust
autogenerated on Wed Mar 2 2022 00:54:17