Go to the documentation of this file.00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 
00033 
00034 
00035 #ifndef ROSRT_SUBSCRIBER_H
00036 #define ROSRT_SUBSCRIBER_H
00037 
00038 #include <lockfree/object_pool.h>
00039 #include "detail/pool_gc.h"
00040 
00041 #include <ros/atomic.h>
00042 #include <ros/ros.h>
00043 
00044 namespace ros
00045 {
00046 class CallbackQueueInterface;
00047 } 
00048 
00049 namespace rosrt
00050 {
00051 
00052 namespace detail
00053 {
00054   ros::CallbackQueueInterface* getSubscriberCallbackQueue();
00055 } 
00056 
00074 template<typename M>
00075 class Subscriber
00076 {
00077 public:
00081   Subscriber()
00082   : pool_(0)
00083   {
00084   }
00085 
00091   Subscriber(uint32_t message_pool_size)
00092   : pool_(0)
00093   {
00094     initialize(message_pool_size);
00095   }
00096 
00105   Subscriber(uint32_t message_pool_size, ros::NodeHandle& nh, const std::string& topic, const ros::TransportHints& transport_hints = ros::TransportHints())
00106   : pool_(0)
00107   {
00108     initialize(message_pool_size);
00109     subscribe(nh, topic, transport_hints);
00110   }
00111 
00112   ~Subscriber()
00113   {
00114     M const* latest = latest_.exchange(0);
00115     if (latest)
00116     {
00117       pool_->free(latest);
00118     }
00119 
00120     detail::addPoolToGC((void*)pool_, detail::deletePool<M>, detail::poolIsDeletable<M>);
00121   }
00122 
00128   void initialize(uint32_t message_pool_size)
00129   {
00130     ROS_ASSERT(message_pool_size > 1);
00131     ROS_ASSERT(!pool_);
00132     pool_ = new lockfree::ObjectPool<M>();
00133     pool_->initialize(message_pool_size, M());
00134     latest_.store(0);
00135   }
00136 
00146   bool initialize(uint32_t message_pool_size, ros::NodeHandle& nh, const std::string& topic, const ros::TransportHints& transport_hints = ros::TransportHints())
00147   {
00148     initialize(message_pool_size);
00149     return subscribe(nh, topic, transport_hints);
00150   }
00151 
00159   bool subscribe(ros::NodeHandle& nh, const std::string& topic, const ros::TransportHints& transport_hints = ros::TransportHints())
00160   {
00161     ros::SubscribeOptions ops;
00162 #ifdef ROS_NEW_SERIALIZATION_API
00163     ops.template init<M>(topic, 1, boost::bind(&Subscriber::callback, this, _1), boost::bind(&lockfree::ObjectPool<M>::allocateShared, pool_));
00164 #else
00165     ops.template init<M>(topic, 1, boost::bind(&Subscriber::callback, this, _1));
00166 #endif
00167     ops.callback_queue = detail::getSubscriberCallbackQueue();
00168     sub_ = nh.subscribe(ops);
00169     return (bool)sub_;
00170   }
00171 
00182   boost::shared_ptr<M const> poll()
00183   {
00184     M const* latest = latest_.exchange(0);
00185     if (!latest)
00186     {
00187       return boost::shared_ptr<M const>();
00188     }
00189 
00190     boost::shared_ptr<M const> ptr = pool_->makeShared(latest);
00191     if (!ptr)
00192     {
00193       pool_->free(latest);
00194       return boost::shared_ptr<M const>();
00195     }
00196 
00197     return ptr;
00198   }
00199 
00200 private:
00201   void callback(const boost::shared_ptr<M const>& msg)
00202   {
00203     M const* latest = 0;
00204     
00205     
00206     if (!pool_->owns(msg.get()))
00207     {
00208       M* copy = pool_->allocate();
00209 
00210       if (!copy)
00211       {
00212         return;
00213       }
00214 
00215       *copy = *msg;
00216       latest = copy;
00217     }
00218     else
00219     {
00220       latest = pool_->removeShared(msg);
00221     }
00222 
00223     M const* old = latest_.exchange(latest);
00224     if (old)
00225     {
00226       pool_->free(old);
00227     }
00228   }
00229 
00230   ros::atomic<M const*> latest_;
00231 
00232   lockfree::ObjectPool<M>* pool_;
00233   ros::Subscriber sub_;
00234 };
00235 
00236 } 
00237 
00238 #endif // ROSRT_SUBSCRIBER_H