subscriber.h
Go to the documentation of this file.
00001 /*********************************************************************
00002 * Software License Agreement (BSD License)
00003 *
00004 *  Copyright (c) 2010, Willow Garage, Inc.
00005 *  All rights reserved.
00006 *
00007 *  Redistribution and use in source and binary forms, with or without
00008 *  modification, are permitted provided that the following conditions
00009 *  are met:
00010 *
00011 *   * Redistributions of source code must retain the above copyright
00012 *     notice, this list of conditions and the following disclaimer.
00013 *   * Redistributions in binary form must reproduce the above
00014 *     copyright notice, this list of conditions and the following
00015 *     disclaimer in the documentation and/or other materials provided
00016 *     with the distribution.
00017 *   * Neither the name of the Willow Garage nor the names of its
00018 *     contributors may be used to endorse or promote products derived
00019 *     from this software without specific prior written permission.
00020 *
00021 *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 *  POSSIBILITY OF SUCH DAMAGE.
00033 *********************************************************************/
00034 
00035 #ifndef ROSRT_SUBSCRIBER_H
00036 #define ROSRT_SUBSCRIBER_H
00037 
00038 #include <lockfree/object_pool.h>
00039 #include "detail/pool_gc.h"
00040 
00041 #include <ros/atomic.h>
00042 #include <ros/ros.h>
00043 
00044 namespace ros
00045 {
00046 class CallbackQueueInterface;
00047 } // namespace ros
00048 
00049 namespace rosrt
00050 {
00051 
00052 namespace detail
00053 {
00054   ros::CallbackQueueInterface* getSubscriberCallbackQueue();
00055 } // namespace detail
00056 
00074 template<typename M>
00075 class Subscriber
00076 {
00077 public:
00081   Subscriber()
00082   : pool_(0)
00083   {
00084   }
00085 
00091   Subscriber(uint32_t message_pool_size)
00092   : pool_(0)
00093   {
00094     initialize(message_pool_size);
00095   }
00096 
00105   Subscriber(uint32_t message_pool_size, ros::NodeHandle& nh, const std::string& topic, const ros::TransportHints& transport_hints = ros::TransportHints())
00106   : pool_(0)
00107   {
00108     initialize(message_pool_size);
00109     subscribe(nh, topic, transport_hints);
00110   }
00111 
00112   ~Subscriber()
00113   {
00114     M const* latest = latest_.exchange(0);
00115     if (latest)
00116     {
00117       pool_->free(latest);
00118     }
00119 
00120     detail::addPoolToGC((void*)pool_, detail::deletePool<M>, detail::poolIsDeletable<M>);
00121   }
00122 
00128   void initialize(uint32_t message_pool_size)
00129   {
00130     ROS_ASSERT(message_pool_size > 1);
00131     ROS_ASSERT(!pool_);
00132     pool_ = new lockfree::ObjectPool<M>();
00133     pool_->initialize(message_pool_size, M());
00134     latest_.store(0);
00135   }
00136 
00146   bool initialize(uint32_t message_pool_size, ros::NodeHandle& nh, const std::string& topic, const ros::TransportHints& transport_hints = ros::TransportHints())
00147   {
00148     initialize(message_pool_size);
00149     return subscribe(nh, topic, transport_hints);
00150   }
00151 
00159   bool subscribe(ros::NodeHandle& nh, const std::string& topic, const ros::TransportHints& transport_hints = ros::TransportHints())
00160   {
00161     ros::SubscribeOptions ops;
00162 #ifdef ROS_NEW_SERIALIZATION_API
00163     ops.template init<M>(topic, 1, boost::bind(&Subscriber::callback, this, _1), boost::bind(&lockfree::ObjectPool<M>::allocateShared, pool_));
00164 #else
00165     ops.template init<M>(topic, 1, boost::bind(&Subscriber::callback, this, _1));
00166 #endif
00167     ops.callback_queue = detail::getSubscriberCallbackQueue();
00168     sub_ = nh.subscribe(ops);
00169     return (bool)sub_;
00170   }
00171 
00182   boost::shared_ptr<M const> poll()
00183   {
00184     M const* latest = latest_.exchange(0);
00185     if (!latest)
00186     {
00187       return boost::shared_ptr<M const>();
00188     }
00189 
00190     boost::shared_ptr<M const> ptr = pool_->makeShared(latest);
00191     if (!ptr)
00192     {
00193       pool_->free(latest);
00194       return boost::shared_ptr<M const>();
00195     }
00196 
00197     return ptr;
00198   }
00199 
00200 private:
00201   void callback(const boost::shared_ptr<M const>& msg)
00202   {
00203     M const* latest = 0;
00204     // If our pool doesn't own this message (due to multiple subscribers on the same topic)
00205     // make a copy
00206     if (!pool_->owns(msg.get()))
00207     {
00208       M* copy = pool_->allocate();
00209 
00210       if (!copy)
00211       {
00212         return;
00213       }
00214 
00215       *copy = *msg;
00216       latest = copy;
00217     }
00218     else
00219     {
00220       latest = pool_->removeShared(msg);
00221     }
00222 
00223     M const* old = latest_.exchange(latest);
00224     if (old)
00225     {
00226       pool_->free(old);
00227     }
00228   }
00229 
00230   ros::atomic<M const*> latest_;
00231 
00232   lockfree::ObjectPool<M>* pool_;
00233   ros::Subscriber sub_;
00234 };
00235 
00236 } // namespace rosrt
00237 
00238 #endif // ROSRT_SUBSCRIBER_H


rosrt
Author(s): Josh Faust
autogenerated on Sat Jun 8 2019 20:43:39