$search
00001 /********************************************************************* 00002 * Software License Agreement (BSD License) 00003 * 00004 * Copyright (c) 2008, Willow Garage, Inc. 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions 00009 * are met: 00010 * 00011 * * Redistributions of source code must retain the above copyright 00012 * notice, this list of conditions and the following disclaimer. 00013 * * Redistributions in binary form must reproduce the above 00014 * copyright notice, this list of conditions and the following 00015 * disclaimer in the documentation and/or other materials provided 00016 * with the distribution. 00017 * * Neither the name of the Willow Garage nor the names of its 00018 * contributors may be used to endorse or promote products derived 00019 * from this software without specific prior written permission. 00020 * 00021 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 * POSSIBILITY OF SUCH DAMAGE. 00033 *********************************************************************/ 00034 00035 #ifndef MESSAGE_FILTERS_TIME_SEQUENCER_H 00036 #define MESSAGE_FILTERS_TIME_SEQUENCER_H 00037 00038 #include <ros/ros.h> 00039 00040 #include "connection.h" 00041 #include "simple_filter.h" 00042 00043 namespace message_filters 00044 { 00045 00074 template<class M> 00075 class TimeSequencer : public SimpleFilter<M> 00076 { 00077 public: 00078 typedef boost::shared_ptr<M const> MConstPtr; 00079 typedef ros::MessageEvent<M const> EventType; 00080 00089 template<class F> 00090 TimeSequencer(F& f, ros::Duration delay, ros::Duration update_rate, uint32_t queue_size, ros::NodeHandle nh = ros::NodeHandle()) 00091 : delay_(delay) 00092 , update_rate_(update_rate) 00093 , queue_size_(queue_size) 00094 , nh_(nh) 00095 { 00096 init(); 00097 connectInput(f); 00098 } 00099 00110 TimeSequencer(ros::Duration delay, ros::Duration update_rate, uint32_t queue_size, ros::NodeHandle nh = ros::NodeHandle()) 00111 : delay_(delay) 00112 , update_rate_(update_rate) 00113 , queue_size_(queue_size) 00114 , nh_(nh) 00115 { 00116 init(); 00117 } 00118 00122 template<class F> 00123 void connectInput(F& f) 00124 { 00125 incoming_connection_.disconnect(); 00126 incoming_connection_ = f.registerCallback(typename SimpleFilter<M>::EventCallback(boost::bind(&TimeSequencer::cb, this, _1))); 00127 } 00128 00129 ~TimeSequencer() 00130 { 00131 update_timer_.stop(); 00132 incoming_connection_.disconnect(); 00133 } 00134 00135 void add(const EventType& evt) 00136 { 00137 namespace mt = ros::message_traits; 00138 00139 boost::mutex::scoped_lock lock(messages_mutex_); 00140 if (mt::TimeStamp<M>::value(*evt.getMessage()) < last_time_) 00141 { 00142 return; 00143 } 00144 00145 messages_.insert(evt); 00146 00147 if (queue_size_ != 0 && messages_.size() > queue_size_) 00148 { 00149 messages_.erase(*messages_.begin()); 00150 } 00151 } 00152 00156 void add(const MConstPtr& msg) 00157 { 00158 EventType evt(msg); 00159 add(evt); 00160 } 00161 00162 private: 00163 class MessageSort 00164 { 00165 public: 00166 bool operator()(const EventType& lhs, const EventType& rhs) const 00167 { 00168 namespace mt = ros::message_traits; 00169 return mt::TimeStamp<M>::value(*lhs.getMessage()) < mt::TimeStamp<M>::value(*rhs.getMessage()); 00170 } 00171 }; 00172 typedef std::multiset<EventType, MessageSort> S_Message; 00173 typedef std::vector<EventType> V_Message; 00174 00175 void cb(const EventType& evt) 00176 { 00177 add(evt); 00178 } 00179 00180 void dispatch() 00181 { 00182 namespace mt = ros::message_traits; 00183 00184 V_Message to_call; 00185 00186 { 00187 boost::mutex::scoped_lock lock(messages_mutex_); 00188 00189 while (!messages_.empty()) 00190 { 00191 const EventType& e = *messages_.begin(); 00192 ros::Time stamp = mt::TimeStamp<M>::value(*e.getMessage()); 00193 if (stamp + delay_ <= ros::Time::now()) 00194 { 00195 last_time_ = stamp; 00196 to_call.push_back(e); 00197 messages_.erase(messages_.begin()); 00198 } 00199 else 00200 { 00201 break; 00202 } 00203 } 00204 } 00205 00206 { 00207 typename V_Message::iterator it = to_call.begin(); 00208 typename V_Message::iterator end = to_call.end(); 00209 for (; it != end; ++it) 00210 { 00211 this->signalMessage(*it); 00212 } 00213 } 00214 } 00215 00216 void update(const ros::TimerEvent&) 00217 { 00218 dispatch(); 00219 } 00220 00221 void init() 00222 { 00223 update_timer_ = nh_.createTimer(update_rate_, &TimeSequencer::update, this); 00224 } 00225 00226 ros::Duration delay_; 00227 ros::Duration update_rate_; 00228 uint32_t queue_size_; 00229 ros::NodeHandle nh_; 00230 00231 ros::Timer update_timer_; 00232 00233 Connection incoming_connection_; 00234 00235 00236 S_Message messages_; 00237 boost::mutex messages_mutex_; 00238 ros::Time last_time_; 00239 }; 00240 00241 } 00242 00243 #endif