Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #ifndef MESSAGE_FILTERS_CACHE_H_
00036 #define MESSAGE_FILTERS_CACHE_H_
00037
00038 #include <deque>
00039 #include "boost/thread.hpp"
00040 #include "boost/shared_ptr.hpp"
00041
00042 #include "ros/time.h"
00043
00044 #include "connection.h"
00045 #include "simple_filter.h"
00046
00047 namespace message_filters
00048 {
00049
00065 template<class M>
00066 class Cache : public SimpleFilter<M>
00067 {
00068 public:
00069 typedef boost::shared_ptr<M const> MConstPtr;
00070 typedef ros::MessageEvent<M const> EventType;
00071
00072 template<class F>
00073 Cache(F& f, unsigned int cache_size = 1)
00074 {
00075 setCacheSize(cache_size) ;
00076 connectInput(f) ;
00077 }
00078
00084 Cache(unsigned int cache_size = 1)
00085 {
00086 setCacheSize(cache_size);
00087 }
00088
00089 template<class F>
00090 void connectInput(F& f)
00091 {
00092 incoming_connection_ = f.registerCallback(typename SimpleFilter<M>::EventCallback(boost::bind(&Cache::callback, this, _1)));
00093 }
00094
00095 ~Cache()
00096 {
00097 incoming_connection_.disconnect();
00098 }
00099
00104 void setCacheSize(unsigned int cache_size)
00105 {
00106 if (cache_size == 0)
00107 {
00108
00109 return ;
00110 }
00111
00112 cache_size_ = cache_size ;
00113 }
00114
00119 void add(const MConstPtr& msg)
00120 {
00121 add(EventType(msg));
00122 }
00123
00128 void add(const EventType& evt)
00129 {
00130 namespace mt = ros::message_traits;
00131
00132
00133 {
00134 boost::mutex::scoped_lock lock(cache_lock_);
00135
00136 while (cache_.size() >= cache_size_)
00137 cache_.pop_front() ;
00138
00139
00140
00141
00142 typename std::deque<EventType >::reverse_iterator rev_it = cache_.rbegin();
00143
00144
00145
00146 ros::Time evt_stamp = mt::TimeStamp<M>::value(*evt.getMessage());
00147 while(rev_it != cache_.rend() && mt::TimeStamp<M>::value(*(*rev_it).getMessage()) > evt_stamp)
00148 rev_it++;
00149
00150
00151 cache_.insert(rev_it.base(), evt);
00152
00153 }
00154
00155 this->signalMessage(evt);
00156 }
00157
00166 std::vector<MConstPtr> getInterval(const ros::Time& start, const ros::Time& end) const
00167 {
00168 namespace mt = ros::message_traits;
00169
00170 boost::mutex::scoped_lock lock(cache_lock_);
00171
00172
00173 unsigned int start_index = 0 ;
00174 while(start_index < cache_.size() &&
00175 mt::TimeStamp<M>::value(*cache_[start_index].getMessage()) < start)
00176 {
00177 start_index++ ;
00178 }
00179
00180
00181 unsigned int end_index = start_index ;
00182 while(end_index < cache_.size() &&
00183 mt::TimeStamp<M>::value(*cache_[end_index].getMessage()) <= end)
00184 {
00185 end_index++ ;
00186 }
00187
00188 std::vector<MConstPtr> interval_elems ;
00189 interval_elems.reserve(end_index - start_index) ;
00190 for (unsigned int i=start_index; i<end_index; i++)
00191 {
00192 interval_elems.push_back(cache_[i].getMessage()) ;
00193 }
00194
00195 return interval_elems ;
00196 }
00197
00198
00205 std::vector<MConstPtr> getSurroundingInterval(const ros::Time& start, const ros::Time& end) const
00206 {
00207 namespace mt = ros::message_traits;
00208
00209 boost::mutex::scoped_lock lock(cache_lock_);
00210
00211 unsigned int start_index = cache_.size()-1;
00212 while(start_index > 0 &&
00213 mt::TimeStamp<M>::value(*cache_[start_index].getMessage()) > start)
00214 {
00215 start_index--;
00216 }
00217 unsigned int end_index = start_index;
00218 while(end_index < cache_.size()-1 &&
00219 mt::TimeStamp<M>::value(*cache_[end_index].getMessage()) < end)
00220 {
00221 end_index++;
00222 }
00223
00224 std::vector<MConstPtr> interval_elems;
00225 interval_elems.reserve(end_index - start_index + 1) ;
00226 for (unsigned int i=start_index; i<=end_index; i++)
00227 {
00228 interval_elems.push_back(cache_[i].getMessage()) ;
00229 }
00230
00231 return interval_elems;
00232 }
00233
00239 MConstPtr getElemBeforeTime(const ros::Time& time) const
00240 {
00241 namespace mt = ros::message_traits;
00242
00243 boost::mutex::scoped_lock lock(cache_lock_);
00244
00245 MConstPtr out ;
00246
00247 unsigned int i=0 ;
00248 int elem_index = -1 ;
00249 while (i<cache_.size() &&
00250 mt::TimeStamp<M>::value(*cache_[i].getMessage()) < time)
00251 {
00252 elem_index = i ;
00253 i++ ;
00254 }
00255
00256 if (elem_index >= 0)
00257 out = cache_[elem_index].getMessage() ;
00258
00259 return out ;
00260 }
00261
00267 MConstPtr getElemAfterTime(const ros::Time& time) const
00268 {
00269 namespace mt = ros::message_traits;
00270
00271 boost::mutex::scoped_lock lock(cache_lock_);
00272
00273 MConstPtr out ;
00274
00275 int i=cache_.size()-1 ;
00276 int elem_index = -1 ;
00277 while (i>=0 &&
00278 mt::TimeStamp<M>::value(*cache_[i].getMessage()) > time)
00279 {
00280 elem_index = i ;
00281 i-- ;
00282 }
00283
00284 if (elem_index >= 0)
00285 out = cache_[elem_index].getMessage() ;
00286 else
00287 out.reset() ;
00288
00289 return out ;
00290 }
00291
00295 ros::Time getLatestTime() const
00296 {
00297 namespace mt = ros::message_traits;
00298
00299 boost::mutex::scoped_lock lock(cache_lock_);
00300
00301 ros::Time latest_time;
00302
00303 if (cache_.size() > 0)
00304 latest_time = mt::TimeStamp<M>::value(*cache_.back().getMessage());
00305
00306 return latest_time ;
00307 }
00308
00312 ros::Time getOldestTime() const
00313 {
00314 namespace mt = ros::message_traits;
00315
00316 boost::mutex::scoped_lock lock(cache_lock_);
00317
00318 ros::Time oldest_time;
00319
00320 if (cache_.size() > 0)
00321 oldest_time = mt::TimeStamp<M>::value(*cache_.front().getMessage());
00322
00323 return oldest_time ;
00324 }
00325
00326
00327 private:
00328 void callback(const EventType& evt)
00329 {
00330 add(evt);
00331 }
00332
00333 mutable boost::mutex cache_lock_ ;
00334 std::deque<EventType> cache_ ;
00335 unsigned int cache_size_ ;
00336
00337 Connection incoming_connection_;
00338 };
00339
00340 }
00341
00342
00343 #endif