Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #ifndef ROSRT_SUBSCRIBER_H
00036 #define ROSRT_SUBSCRIBER_H
00037
00038 #include <lockfree/object_pool.h>
00039 #include "detail/pool_gc.h"
00040
00041 #include <ros/atomic.h>
00042 #include <ros/ros.h>
00043
00044 namespace ros
00045 {
00046 class CallbackQueueInterface;
00047 }
00048
00049 namespace rosrt
00050 {
00051
00052 namespace detail
00053 {
00054 ros::CallbackQueueInterface* getSubscriberCallbackQueue();
00055 }
00056
00074 template<typename M>
00075 class Subscriber
00076 {
00077 public:
00081 Subscriber()
00082 : pool_(0)
00083 {
00084 }
00085
00091 Subscriber(uint32_t message_pool_size)
00092 : pool_(0)
00093 {
00094 initialize(message_pool_size);
00095 }
00096
00105 Subscriber(uint32_t message_pool_size, ros::NodeHandle& nh, const std::string& topic, const ros::TransportHints& transport_hints = ros::TransportHints())
00106 : pool_(0)
00107 {
00108 initialize(message_pool_size);
00109 subscribe(nh, topic, transport_hints);
00110 }
00111
00112 ~Subscriber()
00113 {
00114 M const* latest = latest_.exchange(0);
00115 if (latest)
00116 {
00117 pool_->free(latest);
00118 }
00119
00120 detail::addPoolToGC((void*)pool_, detail::deletePool<M>, detail::poolIsDeletable<M>);
00121 }
00122
00128 void initialize(uint32_t message_pool_size)
00129 {
00130 ROS_ASSERT(message_pool_size > 1);
00131 ROS_ASSERT(!pool_);
00132 pool_ = new lockfree::ObjectPool<M>();
00133 pool_->initialize(message_pool_size, M());
00134 latest_.store(0);
00135 }
00136
00146 bool initialize(uint32_t message_pool_size, ros::NodeHandle& nh, const std::string& topic, const ros::TransportHints& transport_hints = ros::TransportHints())
00147 {
00148 initialize(message_pool_size);
00149 return subscribe(nh, topic, transport_hints);
00150 }
00151
00159 bool subscribe(ros::NodeHandle& nh, const std::string& topic, const ros::TransportHints& transport_hints = ros::TransportHints())
00160 {
00161 ros::SubscribeOptions ops;
00162 #ifdef ROS_NEW_SERIALIZATION_API
00163 ops.template init<M>(topic, 1, boost::bind(&Subscriber::callback, this, _1), boost::bind(&lockfree::ObjectPool<M>::allocateShared, pool_));
00164 #else
00165 ops.template init<M>(topic, 1, boost::bind(&Subscriber::callback, this, _1));
00166 #endif
00167 ops.callback_queue = detail::getSubscriberCallbackQueue();
00168 sub_ = nh.subscribe(ops);
00169 return (bool)sub_;
00170 }
00171
00182 boost::shared_ptr<M const> poll()
00183 {
00184 M const* latest = latest_.exchange(0);
00185 if (!latest)
00186 {
00187 return boost::shared_ptr<M const>();
00188 }
00189
00190 boost::shared_ptr<M const> ptr = pool_->makeShared(latest);
00191 if (!ptr)
00192 {
00193 pool_->free(latest);
00194 return boost::shared_ptr<M const>();
00195 }
00196
00197 return ptr;
00198 }
00199
00200 private:
00201 void callback(const boost::shared_ptr<M const>& msg)
00202 {
00203 M const* latest = 0;
00204
00205
00206 if (!pool_->owns(msg.get()))
00207 {
00208 M* copy = pool_->allocate();
00209
00210 if (!copy)
00211 {
00212 return;
00213 }
00214
00215 *copy = *msg;
00216 latest = copy;
00217 }
00218 else
00219 {
00220 latest = pool_->removeShared(msg);
00221 }
00222
00223 M const* old = latest_.exchange(latest);
00224 if (old)
00225 {
00226 pool_->free(old);
00227 }
00228 }
00229
00230 ros::atomic<M const*> latest_;
00231
00232 lockfree::ObjectPool<M>* pool_;
00233 ros::Subscriber sub_;
00234 };
00235
00236 }
00237
00238 #endif // ROSRT_SUBSCRIBER_H