$search
00001 /********************************************************************* 00002 * Software License Agreement (BSD License) 00003 * 00004 * Copyright (c) 2010, Willow Garage, Inc. 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions 00009 * are met: 00010 * 00011 * * Redistributions of source code must retain the above copyright 00012 * notice, this list of conditions and the following disclaimer. 00013 * * Redistributions in binary form must reproduce the above 00014 * copyright notice, this list of conditions and the following 00015 * disclaimer in the documentation and/or other materials provided 00016 * with the distribution. 00017 * * Neither the name of the Willow Garage nor the names of its 00018 * contributors may be used to endorse or promote products derived 00019 * from this software without specific prior written permission. 00020 * 00021 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 * POSSIBILITY OF SUCH DAMAGE. 00033 *********************************************************************/ 00034 00035 #ifndef ROSRT_SUBSCRIBER_H 00036 #define ROSRT_SUBSCRIBER_H 00037 00038 #include <lockfree/object_pool.h> 00039 #include "detail/pool_gc.h" 00040 00041 #include <ros/atomic.h> 00042 #include <ros/ros.h> 00043 00044 namespace ros 00045 { 00046 class CallbackQueueInterface; 00047 } // namespace ros 00048 00049 namespace rosrt 00050 { 00051 00052 namespace detail 00053 { 00054 ros::CallbackQueueInterface* getSubscriberCallbackQueue(); 00055 } // namespace detail 00056 00074 template<typename M> 00075 class Subscriber 00076 { 00077 public: 00081 Subscriber() 00082 : pool_(0) 00083 { 00084 } 00085 00091 Subscriber(uint32_t message_pool_size) 00092 : pool_(0) 00093 { 00094 initialize(message_pool_size); 00095 } 00096 00105 Subscriber(uint32_t message_pool_size, ros::NodeHandle& nh, const std::string& topic, const ros::TransportHints& transport_hints = ros::TransportHints()) 00106 : pool_(0) 00107 { 00108 initialize(message_pool_size); 00109 subscribe(nh, topic, transport_hints); 00110 } 00111 00112 ~Subscriber() 00113 { 00114 M const* latest = latest_.exchange(0); 00115 if (latest) 00116 { 00117 pool_->free(latest); 00118 } 00119 00120 detail::addPoolToGC((void*)pool_, detail::deletePool<M>, detail::poolIsDeletable<M>); 00121 } 00122 00128 void initialize(uint32_t message_pool_size) 00129 { 00130 ROS_ASSERT(message_pool_size > 1); 00131 ROS_ASSERT(!pool_); 00132 pool_ = new lockfree::ObjectPool<M>(); 00133 pool_->initialize(message_pool_size, M()); 00134 latest_.store(0); 00135 } 00136 00146 bool initialize(uint32_t message_pool_size, ros::NodeHandle& nh, const std::string& topic, const ros::TransportHints& transport_hints = ros::TransportHints()) 00147 { 00148 initialize(message_pool_size); 00149 return subscribe(nh, topic, transport_hints); 00150 } 00151 00159 bool subscribe(ros::NodeHandle& nh, const std::string& topic, const ros::TransportHints& transport_hints = ros::TransportHints()) 00160 { 00161 ros::SubscribeOptions ops; 00162 #ifdef ROS_NEW_SERIALIZATION_API 00163 ops.template init<M>(topic, 1, boost::bind(&Subscriber::callback, this, _1), boost::bind(&lockfree::ObjectPool<M>::allocateShared, pool_)); 00164 #else 00165 ops.template init<M>(topic, 1, boost::bind(&Subscriber::callback, this, _1)); 00166 #endif 00167 ops.callback_queue = detail::getSubscriberCallbackQueue(); 00168 sub_ = nh.subscribe(ops); 00169 return (bool)sub_; 00170 } 00171 00182 boost::shared_ptr<M const> poll() 00183 { 00184 M const* latest = latest_.exchange(0); 00185 if (!latest) 00186 { 00187 return boost::shared_ptr<M const>(); 00188 } 00189 00190 boost::shared_ptr<M const> ptr = pool_->makeShared(latest); 00191 if (!ptr) 00192 { 00193 pool_->free(latest); 00194 return boost::shared_ptr<M const>(); 00195 } 00196 00197 return ptr; 00198 } 00199 00200 private: 00201 void callback(const boost::shared_ptr<M const>& msg) 00202 { 00203 M const* latest = 0; 00204 // If our pool doesn't own this message (due to multiple subscribers on the same topic) 00205 // make a copy 00206 if (!pool_->owns(msg.get())) 00207 { 00208 M* copy = pool_->allocate(); 00209 00210 if (!copy) 00211 { 00212 return; 00213 } 00214 00215 *copy = *msg; 00216 latest = copy; 00217 } 00218 else 00219 { 00220 latest = pool_->removeShared(msg); 00221 } 00222 00223 M const* old = latest_.exchange(latest); 00224 if (old) 00225 { 00226 pool_->free(old); 00227 } 00228 } 00229 00230 ros::atomic<M const*> latest_; 00231 00232 lockfree::ObjectPool<M>* pool_; 00233 ros::Subscriber sub_; 00234 }; 00235 00236 } // namespace rosrt 00237 00238 #endif // ROSRT_SUBSCRIBER_H