Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #ifndef MESSAGE_FILTERS_TIME_SEQUENCER_H
00036 #define MESSAGE_FILTERS_TIME_SEQUENCER_H
00037
00038 #include <ros/ros.h>
00039
00040 #include "connection.h"
00041 #include "simple_filter.h"
00042
00043 namespace message_filters
00044 {
00045
00074 template<class M>
00075 class TimeSequencer : public SimpleFilter<M>
00076 {
00077 public:
00078 typedef boost::shared_ptr<M const> MConstPtr;
00079 typedef ros::MessageEvent<M const> EventType;
00080
00089 template<class F>
00090 TimeSequencer(F& f, ros::Duration delay, ros::Duration update_rate, uint32_t queue_size, ros::NodeHandle nh = ros::NodeHandle())
00091 : delay_(delay)
00092 , update_rate_(update_rate)
00093 , queue_size_(queue_size)
00094 , nh_(nh)
00095 {
00096 init();
00097 connectInput(f);
00098 }
00099
00110 TimeSequencer(ros::Duration delay, ros::Duration update_rate, uint32_t queue_size, ros::NodeHandle nh = ros::NodeHandle())
00111 : delay_(delay)
00112 , update_rate_(update_rate)
00113 , queue_size_(queue_size)
00114 , nh_(nh)
00115 {
00116 init();
00117 }
00118
00122 template<class F>
00123 void connectInput(F& f)
00124 {
00125 incoming_connection_.disconnect();
00126 incoming_connection_ = f.registerCallback(typename SimpleFilter<M>::EventCallback(boost::bind(&TimeSequencer::cb, this, _1)));
00127 }
00128
00129 ~TimeSequencer()
00130 {
00131 update_timer_.stop();
00132 incoming_connection_.disconnect();
00133 }
00134
00135 void add(const EventType& evt)
00136 {
00137 namespace mt = ros::message_traits;
00138
00139 boost::mutex::scoped_lock lock(messages_mutex_);
00140 if (mt::TimeStamp<M>::value(*evt.getMessage()) < last_time_)
00141 {
00142 return;
00143 }
00144
00145 messages_.insert(evt);
00146
00147 if (queue_size_ != 0 && messages_.size() > queue_size_)
00148 {
00149 messages_.erase(*messages_.begin());
00150 }
00151 }
00152
00156 void add(const MConstPtr& msg)
00157 {
00158 EventType evt(msg);
00159 add(evt);
00160 }
00161
00162 private:
00163 class MessageSort
00164 {
00165 public:
00166 bool operator()(const EventType& lhs, const EventType& rhs) const
00167 {
00168 namespace mt = ros::message_traits;
00169 return mt::TimeStamp<M>::value(*lhs.getMessage()) < mt::TimeStamp<M>::value(*rhs.getMessage());
00170 }
00171 };
00172 typedef std::multiset<EventType, MessageSort> S_Message;
00173 typedef std::vector<EventType> V_Message;
00174
00175 void cb(const EventType& evt)
00176 {
00177 add(evt);
00178 }
00179
00180 void dispatch()
00181 {
00182 namespace mt = ros::message_traits;
00183
00184 V_Message to_call;
00185
00186 {
00187 boost::mutex::scoped_lock lock(messages_mutex_);
00188
00189 while (!messages_.empty())
00190 {
00191 const EventType& e = *messages_.begin();
00192 ros::Time stamp = mt::TimeStamp<M>::value(*e.getMessage());
00193 if (stamp + delay_ <= ros::Time::now())
00194 {
00195 last_time_ = stamp;
00196 to_call.push_back(e);
00197 messages_.erase(messages_.begin());
00198 }
00199 else
00200 {
00201 break;
00202 }
00203 }
00204 }
00205
00206 {
00207 typename V_Message::iterator it = to_call.begin();
00208 typename V_Message::iterator end = to_call.end();
00209 for (; it != end; ++it)
00210 {
00211 this->signalMessage(*it);
00212 }
00213 }
00214 }
00215
00216 void update(const ros::TimerEvent&)
00217 {
00218 dispatch();
00219 }
00220
00221 void init()
00222 {
00223 update_timer_ = nh_.createTimer(update_rate_, &TimeSequencer::update, this);
00224 }
00225
00226 ros::Duration delay_;
00227 ros::Duration update_rate_;
00228 uint32_t queue_size_;
00229 ros::NodeHandle nh_;
00230
00231 ros::Timer update_timer_;
00232
00233 Connection incoming_connection_;
00234
00235
00236 S_Message messages_;
00237 boost::mutex messages_mutex_;
00238 ros::Time last_time_;
00239 };
00240
00241 }
00242
00243 #endif