cache.h
Go to the documentation of this file.
1 /*********************************************************************
2 * Software License Agreement (BSD License)
3 *
4 * Copyright (c) 2008, Willow Garage, Inc.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above
14 * copyright notice, this list of conditions and the following
15 * disclaimer in the documentation and/or other materials provided
16 * with the distribution.
17 * * Neither the name of the Willow Garage nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 * POSSIBILITY OF SUCH DAMAGE.
33 *********************************************************************/
34 
35 #ifndef MESSAGE_FILTERS_CACHE_H_
36 #define MESSAGE_FILTERS_CACHE_H_
37 
38 #include <deque>
39 #include "boost/thread.hpp"
40 #include "boost/shared_ptr.hpp"
41 
42 #include "ros/time.h"
43 
44 #include "connection.h"
45 #include "simple_filter.h"
46 
47 namespace message_filters
48 {
49 
65 template<class M>
66 class Cache : public SimpleFilter<M>
67 {
68 public:
71 
72  template<class F>
73  Cache(F& f, unsigned int cache_size = 1)
74  {
75  setCacheSize(cache_size) ;
76  connectInput(f) ;
77  }
78 
84  Cache(unsigned int cache_size = 1)
85  {
86  setCacheSize(cache_size);
87  }
88 
89  template<class F>
90  void connectInput(F& f)
91  {
92  incoming_connection_ = f.registerCallback(typename SimpleFilter<M>::EventCallback(boost::bind(&Cache::callback, this, _1)));
93  }
94 
96  {
98  }
99 
104  void setCacheSize(unsigned int cache_size)
105  {
106  if (cache_size == 0)
107  {
108  //ROS_ERROR("Cannot set max_size to 0") ;
109  return ;
110  }
111 
112  cache_size_ = cache_size ;
113  }
114 
119  void add(const MConstPtr& msg)
120  {
121  add(EventType(msg));
122  }
123 
128  void add(const EventType& evt)
129  {
130  namespace mt = ros::message_traits;
131 
132  //printf(" Cache Size: %u\n", cache_.size()) ;
133  {
134  boost::mutex::scoped_lock lock(cache_lock_);
135 
136  while (cache_.size() >= cache_size_) // Keep popping off old data until we have space for a new msg
137  cache_.pop_front() ; // The front of the deque has the oldest elem, so we can get rid of it
138 
139  // No longer naively pushing msgs to back. Want to make sure they're sorted correctly
140  //cache_.push_back(msg) ; // Add the newest message to the back of the deque
141 
142  typename std::deque<EventType >::reverse_iterator rev_it = cache_.rbegin();
143 
144  // Keep walking backwards along deque until we hit the beginning,
145  // or until we find a timestamp that's smaller than (or equal to) msg's timestamp
146  ros::Time evt_stamp = mt::TimeStamp<M>::value(*evt.getMessage());
147  while(rev_it != cache_.rend() && mt::TimeStamp<M>::value(*(*rev_it).getMessage()) > evt_stamp)
148  rev_it++;
149 
150  // Add msg to the cache
151  cache_.insert(rev_it.base(), evt);
152 
153  }
154 
155  this->signalMessage(evt);
156  }
157 
166  std::vector<MConstPtr> getInterval(const ros::Time& start, const ros::Time& end) const
167  {
168  namespace mt = ros::message_traits;
169 
170  boost::mutex::scoped_lock lock(cache_lock_);
171 
172  // Find the starting index. (Find the first index after [or at] the start of the interval)
173  unsigned int start_index = 0 ;
174  while(start_index < cache_.size() &&
175  mt::TimeStamp<M>::value(*cache_[start_index].getMessage()) < start)
176  {
177  start_index++ ;
178  }
179 
180  // Find the ending index. (Find the first index after the end of interval)
181  unsigned int end_index = start_index ;
182  while(end_index < cache_.size() &&
183  mt::TimeStamp<M>::value(*cache_[end_index].getMessage()) <= end)
184  {
185  end_index++ ;
186  }
187 
188  std::vector<MConstPtr> interval_elems ;
189  interval_elems.reserve(end_index - start_index) ;
190  for (unsigned int i=start_index; i<end_index; i++)
191  {
192  interval_elems.push_back(cache_[i].getMessage()) ;
193  }
194 
195  return interval_elems ;
196  }
197 
198 
205  std::vector<MConstPtr> getSurroundingInterval(const ros::Time& start, const ros::Time& end) const
206  {
207  namespace mt = ros::message_traits;
208 
209  boost::mutex::scoped_lock lock(cache_lock_);
210  // Find the starting index. (Find the first index after [or at] the start of the interval)
211  unsigned int start_index = cache_.size()-1;
212  while(start_index > 0 &&
213  mt::TimeStamp<M>::value(*cache_[start_index].getMessage()) > start)
214  {
215  start_index--;
216  }
217  unsigned int end_index = start_index;
218  while(end_index < cache_.size()-1 &&
219  mt::TimeStamp<M>::value(*cache_[end_index].getMessage()) < end)
220  {
221  end_index++;
222  }
223 
224  std::vector<MConstPtr> interval_elems;
225  interval_elems.reserve(end_index - start_index + 1) ;
226  for (unsigned int i=start_index; i<=end_index; i++)
227  {
228  interval_elems.push_back(cache_[i].getMessage()) ;
229  }
230 
231  return interval_elems;
232  }
233 
239  MConstPtr getElemBeforeTime(const ros::Time& time) const
240  {
241  namespace mt = ros::message_traits;
242 
243  boost::mutex::scoped_lock lock(cache_lock_);
244 
245  MConstPtr out ;
246 
247  unsigned int i=0 ;
248  int elem_index = -1 ;
249  while (i<cache_.size() &&
250  mt::TimeStamp<M>::value(*cache_[i].getMessage()) < time)
251  {
252  elem_index = i ;
253  i++ ;
254  }
255 
256  if (elem_index >= 0)
257  out = cache_[elem_index].getMessage() ;
258 
259  return out ;
260  }
261 
267  MConstPtr getElemAfterTime(const ros::Time& time) const
268  {
269  namespace mt = ros::message_traits;
270 
271  boost::mutex::scoped_lock lock(cache_lock_);
272 
273  MConstPtr out ;
274 
275  int i=cache_.size()-1 ;
276  int elem_index = -1 ;
277  while (i>=0 &&
278  mt::TimeStamp<M>::value(*cache_[i].getMessage()) > time)
279  {
280  elem_index = i ;
281  i-- ;
282  }
283 
284  if (elem_index >= 0)
285  out = cache_[elem_index].getMessage() ;
286  else
287  out.reset() ;
288 
289  return out ;
290  }
291 
296  {
297  namespace mt = ros::message_traits;
298 
299  boost::mutex::scoped_lock lock(cache_lock_);
300 
301  ros::Time latest_time;
302 
303  if (cache_.size() > 0)
304  latest_time = mt::TimeStamp<M>::value(*cache_.back().getMessage());
305 
306  return latest_time ;
307  }
308 
313  {
314  namespace mt = ros::message_traits;
315 
316  boost::mutex::scoped_lock lock(cache_lock_);
317 
318  ros::Time oldest_time;
319 
320  if (cache_.size() > 0)
321  oldest_time = mt::TimeStamp<M>::value(*cache_.front().getMessage());
322 
323  return oldest_time ;
324  }
325 
326 
327 private:
328  void callback(const EventType& evt)
329  {
330  add(evt);
331  }
332 
333  mutable boost::mutex cache_lock_ ;
334  std::deque<EventType> cache_ ;
335  unsigned int cache_size_ ;
336 
338 };
339 
340 }
341 
342 
343 #endif /* MESSAGE_FILTERS_CACHE_H_ */
boost::shared_ptr< M const > MConstPtr
Definition: cache.h:69
Cache(unsigned int cache_size=1)
Definition: cache.h:84
void connectInput(F &f)
Definition: cache.h:90
void signalMessage(const MConstPtr &msg)
Call all registered callbacks, passing them the specified message.
boost::shared_ptr< M > getMessage() const
MConstPtr getElemBeforeTime(const ros::Time &time) const
Grab the newest element that occurs right before the specified time.
Definition: cache.h:239
MConstPtr getElemAfterTime(const ros::Time &time) const
Grab the oldest element that occurs right after the specified time.
Definition: cache.h:267
void disconnect()
disconnects this connection
Definition: connection.cpp:52
void setCacheSize(unsigned int cache_size)
Definition: cache.h:104
unsigned int cache_size_
Maximum number of elements allowed in the cache.
Definition: cache.h:335
ros::MessageEvent< M const > EventType
Definition: cache.h:70
std::vector< MConstPtr > getSurroundingInterval(const ros::Time &start, const ros::Time &end) const
Retrieve the smallest interval of messages that surrounds an interval from start to end...
Definition: cache.h:205
ros::Time getOldestTime() const
Returns the timestamp associated with the oldest packet cache.
Definition: cache.h:312
void callback(const EventType &evt)
Definition: cache.h:328
Connection incoming_connection_
Definition: cache.h:337
std::vector< MConstPtr > getInterval(const ros::Time &start, const ros::Time &end) const
Receive a vector of messages that occur between a start and end time (inclusive). ...
Definition: cache.h:166
ros::Time getLatestTime() const
Returns the timestamp associated with the newest packet cache.
Definition: cache.h:295
Stores a time history of messages.
Definition: cache.h:66
Convenience base-class for simple filters which output a single message.
Definition: simple_filter.h:60
boost::function< void(const EventType &)> EventCallback
Definition: simple_filter.h:66
std::deque< EventType > cache_
Cache for the messages.
Definition: cache.h:334
Encapsulates a connection from one filter to another (or to a user-specified callback) ...
Definition: connection.h:48
void add(const EventType &evt)
Add a message to the cache, and pop off any elements that are too old. This method is registered with...
Definition: cache.h:128
Cache(F &f, unsigned int cache_size=1)
Definition: cache.h:73
void add(const MConstPtr &msg)
Add a message to the cache, and pop off any elements that are too old. This method is registered with...
Definition: cache.h:119
boost::mutex cache_lock_
Lock for cache_.
Definition: cache.h:333


message_filters
Author(s): Josh Faust, Vijay Pradeep
autogenerated on Fri Nov 23 2018 11:19:18