timer_manager.h
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2009, Willow Garage, Inc.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are met:
6  * * Redistributions of source code must retain the above copyright notice,
7  * this list of conditions and the following disclaimer.
8  * * Redistributions in binary form must reproduce the above copyright
9  * notice, this list of conditions and the following disclaimer in the
10  * documentation and/or other materials provided with the distribution.
11  * * Neither the names of Stanford University or Willow Garage, Inc. nor the names of its
12  * contributors may be used to endorse or promote products derived from
13  * this software without specific prior written permission.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
19  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25  * POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #ifndef ROSCPP_TIMER_MANAGER_H
29 #define ROSCPP_TIMER_MANAGER_H
30 
31 #include "ros/forwards.h"
32 #include "ros/time.h"
33 #include "ros/file_log.h"
34 
35 #include <boost/thread/thread.hpp>
36 #include <boost/thread/mutex.hpp>
37 #include <boost/thread/recursive_mutex.hpp>
38 
39 #include "ros/assert.h"
42 
43 #include <vector>
44 #include <list>
45 
46 namespace ros
47 {
48 
49 namespace {
50  template<class T>
51  class TimerManagerTraits
52  {
53  public:
54  typedef boost::chrono::system_clock::time_point time_point;
55  typedef boost::chrono::system_clock::duration duration;
56  };
57 
58  template<>
59  class TimerManagerTraits<SteadyTime>
60  {
61  public:
62  typedef boost::chrono::steady_clock::time_point time_point;
63  typedef boost::chrono::steady_clock::duration duration;
64  };
65 }
66 
67 template<class T, class D, class E>
68 class TimerManager
69 {
70 private:
71  struct TimerInfo
72  {
73  int32_t handle;
74  D period;
75 
76  boost::function<void(const E&)> callback;
78 
80 
83 
86 
87  bool removed;
88 
91 
92  // TODO: atomicize
93  boost::mutex waiting_mutex;
95 
96  bool oneshot;
97 
98  // debugging info
99  uint32_t total_calls;
100  };
102  typedef boost::weak_ptr<TimerInfo> TimerInfoWPtr;
103  typedef std::vector<TimerInfoPtr> V_TimerInfo;
104 
105  typedef std::list<int32_t> L_int32;
106 
107 public:
108  TimerManager();
109  ~TimerManager();
110 
111  int32_t add(const D& period, const boost::function<void(const E&)>& callback, CallbackQueueInterface* callback_queue, const VoidConstPtr& tracked_object, bool oneshot);
112  void remove(int32_t handle);
113 
114  bool hasPending(int32_t handle);
115  void setPeriod(int32_t handle, const D& period, bool reset=true);
116 
118  {
119  static TimerManager<T, D, E> global;
120  return global;
121  }
122 
123 private:
124  void threadFunc();
125 
126  bool waitingCompare(int32_t lhs, int32_t rhs);
127  TimerInfoPtr findTimer(int32_t handle);
128  void schedule(const TimerInfoPtr& info);
129  void updateNext(const TimerInfoPtr& info, const T& current_time);
130 
131  V_TimerInfo timers_;
132  boost::mutex timers_mutex_;
134  volatile bool new_timer_;
135 
136  boost::mutex waiting_mutex_;
137  L_int32 waiting_;
138 
139  uint32_t id_counter_;
140  boost::mutex id_mutex_;
141 
143 
144  boost::thread thread_;
145 
146  bool quit_;
147 
149  {
150  public:
151  TimerQueueCallback(TimerManager<T, D, E>* parent, const TimerInfoPtr& info, T last_expected, T last_real, T current_expected, T last_expired, T current_expired)
152  : parent_(parent)
153  , info_(info)
154  , last_expected_(last_expected)
155  , last_real_(last_real)
156  , current_expected_(current_expected)
157  , last_expired_(last_expired)
158  , current_expired_(current_expired)
159  , called_(false)
160  {
161  boost::mutex::scoped_lock lock(info->waiting_mutex);
162  ++info->waiting_callbacks;
163  }
164 
166  {
167  TimerInfoPtr info = info_.lock();
168  if (info)
169  {
170  boost::mutex::scoped_lock lock(info->waiting_mutex);
171  --info->waiting_callbacks;
172  }
173  }
174 
176  {
177  TimerInfoPtr info = info_.lock();
178  if (!info)
179  {
180  return Invalid;
181  }
182 
183  {
184  ++info->total_calls;
185  called_ = true;
186 
187  VoidConstPtr tracked;
188  if (info->has_tracked_object)
189  {
190  tracked = info->tracked_object.lock();
191  if (!tracked)
192  {
193  return Invalid;
194  }
195  }
196 
197  E event;
198  event.last_expected = last_expected_;
199  event.last_real = last_real_;
200  event.last_expired = last_expired_;
201  event.current_expected = current_expected_;
202  event.current_real = T::now();
203  event.current_expired = current_expired_;
204  event.profile.last_duration = info->last_cb_duration;
205 
206  SteadyTime cb_start = SteadyTime::now();
207  info->callback(event);
208  SteadyTime cb_end = SteadyTime::now();
209  info->last_cb_duration = cb_end - cb_start;
210 
211  info->last_real = event.current_real;
212  info->last_expired = event.current_expired;
213 
214  parent_->schedule(info);
215  }
216 
217  return Success;
218  }
219 
220  private:
222  TimerInfoWPtr info_;
228 
229  bool called_;
230  };
231 };
232 
233 template<class T, class D, class E>
235  new_timer_(false), id_counter_(0), thread_started_(false), quit_(false)
236 {}
237 
238 template<class T, class D, class E>
240 {
241  quit_ = true;
242  {
243  boost::mutex::scoped_lock lock(timers_mutex_);
244  timers_cond_.notify_all();
245  }
246  if (thread_started_)
247  {
248  thread_.join();
249  }
250 }
251 
252 template<class T, class D, class E>
253 bool TimerManager<T, D, E>::waitingCompare(int32_t lhs, int32_t rhs)
254 {
255  TimerInfoPtr infol = findTimer(lhs);
256  TimerInfoPtr infor = findTimer(rhs);
257  if (!infol || !infor)
258  {
259  return infol < infor;
260  }
261 
262  return infol->next_expected < infor->next_expected;
263 }
264 
265 template<class T, class D, class E>
267 {
268  typename V_TimerInfo::iterator it = timers_.begin();
269  typename V_TimerInfo::iterator end = timers_.end();
270  for (; it != end; ++it)
271  {
272  if ((*it)->handle == handle)
273  {
274  return *it;
275  }
276  }
277 
278  return TimerInfoPtr();
279 }
280 
281 template<class T, class D, class E>
283 {
284  boost::mutex::scoped_lock lock(timers_mutex_);
285  TimerInfoPtr info = findTimer(handle);
286 
287  if (!info)
288  {
289  return false;
290  }
291 
292  if (info->has_tracked_object)
293  {
294  VoidConstPtr tracked = info->tracked_object.lock();
295  if (!tracked)
296  {
297  return false;
298  }
299  }
300 
301  boost::mutex::scoped_lock lock2(info->waiting_mutex);
302  return info->next_expected <= T::now() || info->waiting_callbacks != 0;
303 }
304 
305 template<class T, class D, class E>
306 int32_t TimerManager<T, D, E>::add(const D& period, const boost::function<void(const E&)>& callback, CallbackQueueInterface* callback_queue,
307  const VoidConstPtr& tracked_object, bool oneshot)
308 {
309  TimerInfoPtr info(boost::make_shared<TimerInfo>());
310  info->period = period;
311  info->callback = callback;
312  info->callback_queue = callback_queue;
313  info->last_expected = T::now();
314  info->next_expected = info->last_expected + period;
315  info->removed = false;
316  info->has_tracked_object = false;
317  info->waiting_callbacks = 0;
318  info->total_calls = 0;
319  info->oneshot = oneshot;
320  if (tracked_object)
321  {
322  info->tracked_object = tracked_object;
323  info->has_tracked_object = true;
324  }
325 
326  {
327  boost::mutex::scoped_lock lock(id_mutex_);
328  info->handle = id_counter_++;
329  }
330 
331  {
332  boost::mutex::scoped_lock lock(timers_mutex_);
333  timers_.push_back(info);
334 
335  if (!thread_started_)
336  {
337  thread_ = boost::thread(boost::bind(&TimerManager::threadFunc, this));
338  thread_started_ = true;
339  }
340 
341  {
342  boost::mutex::scoped_lock lock(waiting_mutex_);
343  waiting_.push_back(info->handle);
344  waiting_.sort(boost::bind(&TimerManager::waitingCompare, this, _1, _2));
345  }
346 
347  new_timer_ = true;
348  timers_cond_.notify_all();
349  }
350 
351  return info->handle;
352 }
353 
354 template<class T, class D, class E>
355 void TimerManager<T, D, E>::remove(int32_t handle)
356 {
357  CallbackQueueInterface* callback_queue = 0;
358  uint64_t remove_id = 0;
359 
360  {
361  boost::mutex::scoped_lock lock(timers_mutex_);
362 
363  typename V_TimerInfo::iterator it = timers_.begin();
364  typename V_TimerInfo::iterator end = timers_.end();
365  for (; it != end; ++it)
366  {
367  const TimerInfoPtr& info = *it;
368  if (info->handle == handle)
369  {
370  info->removed = true;
371  callback_queue = info->callback_queue;
372  remove_id = (uint64_t)info.get();
373  timers_.erase(it);
374  break;
375  }
376  }
377 
378  {
379  boost::mutex::scoped_lock lock2(waiting_mutex_);
380  // Remove from the waiting list if it's in it
381  L_int32::iterator it = std::find(waiting_.begin(), waiting_.end(), handle);
382  if (it != waiting_.end())
383  {
384  waiting_.erase(it);
385  }
386  }
387  }
388 
389  if (callback_queue)
390  {
391  callback_queue->removeByID(remove_id);
392  }
393 }
394 
395 template<class T, class D, class E>
396 void TimerManager<T, D, E>::schedule(const TimerInfoPtr& info)
397 {
398  boost::mutex::scoped_lock lock(timers_mutex_);
399 
400  if (info->removed)
401  {
402  return;
403  }
404 
405  updateNext(info, T::now());
406  {
407  boost::mutex::scoped_lock lock(waiting_mutex_);
408 
409  waiting_.push_back(info->handle);
410  // waitingCompare requires a lock on the timers_mutex_
411  waiting_.sort(boost::bind(&TimerManager::waitingCompare, this, _1, _2));
412  }
413 
414  new_timer_ = true;
415  timers_cond_.notify_one();
416 }
417 
418 template<class T, class D, class E>
419 void TimerManager<T, D, E>::updateNext(const TimerInfoPtr& info, const T& current_time)
420 {
421  if (info->oneshot)
422  {
423  info->next_expected = T(INT_MAX, 999999999);
424  }
425  else
426  {
427  // Protect against someone having called setPeriod()
428  // If the next expected time is already past the current time
429  // don't update it
430  if (info->next_expected <= current_time)
431  {
432  info->last_expected = info->next_expected;
433  info->next_expected += info->period;
434  }
435 
436  // detect time jumping forward, as well as callbacks that are too slow
437  if (info->next_expected + info->period < current_time)
438  {
439  ROS_DEBUG("Time jumped forward by [%f] for timer of period [%f], resetting timer (current=%f, next_expected=%f)", (current_time - info->next_expected).toSec(), info->period.toSec(), current_time.toSec(), info->next_expected.toSec());
440  info->next_expected = current_time;
441  }
442  }
443 }
444 
445 template<class T, class D, class E>
446 void TimerManager<T, D, E>::setPeriod(int32_t handle, const D& period, bool reset)
447 {
448  boost::mutex::scoped_lock lock(timers_mutex_);
449  TimerInfoPtr info = findTimer(handle);
450 
451  if (!info)
452  {
453  return;
454  }
455 
456  {
457  boost::mutex::scoped_lock lock(waiting_mutex_);
458 
459  if(reset)
460  {
461  info->next_expected = T::now() + period;
462  }
463 
464  // else if some time has elapsed since last cb (called outside of cb)
465  else if( (T::now() - info->last_real) < info->period)
466  {
467  // if elapsed time is greater than the new period
468  // do the callback now
469  if( (T::now() - info->last_real) > period)
470  {
471  info->next_expected = T::now();
472  }
473 
474  // else, account for elapsed time by using last_real+period
475  else
476  {
477  info->next_expected = info->last_real + period;
478  }
479  }
480 
481  // Else if called in a callback, last_real has not been updated yet => (now - last_real) > period
482  // In this case, let next_expected be updated only in updateNext
483 
484  info->period = period;
485  waiting_.sort(boost::bind(&TimerManager::waitingCompare, this, _1, _2));
486  }
487 
488  new_timer_ = true;
489  timers_cond_.notify_one();
490 }
491 
492 template<class T, class D, class E>
494 {
495  T current;
496  while (!quit_)
497  {
498  T sleep_end;
499 
500  boost::mutex::scoped_lock lock(timers_mutex_);
501 
502  // detect time jumping backwards
503  if (T::now() < current)
504  {
505  ROSCPP_LOG_DEBUG("Time jumped backward, resetting timers");
506 
507  current = T::now();
508 
509  typename V_TimerInfo::iterator it = timers_.begin();
510  typename V_TimerInfo::iterator end = timers_.end();
511  for (; it != end; ++it)
512  {
513  const TimerInfoPtr& info = *it;
514 
515  // Timer may have been added after the time jump, so also check if time has jumped past its last call time
516  if (current < info->last_expected)
517  {
518  info->last_expected = current;
519  info->next_expected = current + info->period;
520  }
521  }
522  }
523 
524  current = T::now();
525 
526  {
527  boost::mutex::scoped_lock waitlock(waiting_mutex_);
528 
529  if (waiting_.empty())
530  {
531  sleep_end = current + D(0.1);
532  }
533  else
534  {
535  TimerInfoPtr info = findTimer(waiting_.front());
536 
537  while (!waiting_.empty() && info && info->next_expected <= current)
538  {
539  current = T::now();
540 
541  //ROS_DEBUG("Scheduling timer callback for timer [%d] of period [%f], [%f] off expected", info->handle, info->period.toSec(), (current - info->next_expected).toSec());
542  CallbackInterfacePtr cb(boost::make_shared<TimerQueueCallback>(this, info, info->last_expected, info->last_real, info->next_expected, info->last_expired, current));
543  info->callback_queue->addCallback(cb, (uint64_t)info.get());
544 
545  waiting_.pop_front();
546 
547  if (waiting_.empty())
548  {
549  break;
550  }
551 
552  info = findTimer(waiting_.front());
553  }
554 
555  if (info)
556  {
557  sleep_end = info->next_expected;
558  }
559  }
560  }
561 
562  while (!new_timer_ && T::now() < sleep_end && !quit_)
563  {
564  // detect backwards jumps in time
565 
566  if (T::now() < current)
567  {
568  ROSCPP_LOG_DEBUG("Time jumped backwards, breaking out of sleep");
569  break;
570  }
571 
572  current = T::now();
573 
574  if (current >= sleep_end)
575  {
576  break;
577  }
578 
579  // If we're on simulation time we need to check now() against sleep_end more often than on system time,
580  // since simulation time may be running faster than real time.
581  if (!T::isSystemTime())
582  {
583  timers_cond_.wait_for(lock, boost::chrono::milliseconds(1));
584  }
585  else
586  {
587  // On system time we can simply sleep for the rest of the wait time, since anything else requiring processing will
588  // signal the condition variable
589  typename TimerManagerTraits<T>::time_point end_tp(
590  boost::chrono::duration_cast<typename TimerManagerTraits<T>::duration>(
591  boost::chrono::nanoseconds(sleep_end.toNSec())
592  )
593  );
594  timers_cond_.wait_until(lock, end_tp);
595  }
596  }
597 
598  new_timer_ = false;
599  }
600 }
601 
602 }
603 
604 #endif
CallResult
Possible results for the call() method.
boost::function< void(const E &)> callback
Definition: timer_manager.h:76
void remove(int32_t handle)
std::list< int32_t > L_int32
boost::thread thread_
Abstract interface for items which can be added to a CallbackQueueInterface.
Abstract interface for a queue used to handle all callbacks within roscpp.
TimerManager< T, D, E > * parent_
V_TimerInfo timers_
bool hasPending(int32_t handle)
virtual void removeByID(uint64_t owner_id)=0
Remove all callbacks associated with an owner id.
int32_t add(const D &period, const boost::function< void(const E &)> &callback, CallbackQueueInterface *callback_queue, const VoidConstPtr &tracked_object, bool oneshot)
#define ROSCPP_LOG_DEBUG(...)
Definition: file_log.h:35
static SteadyTime now()
boost::condition_variable condition_variable_monotonic
boost::weak_ptr< void const > VoidConstWPtr
Definition: forwards.h:53
void threadFunc(boost::barrier *b)
volatile bool new_timer_
void updateNext(const TimerInfoPtr &info, const T &current_time)
CallbackQueueInterface * callback_queue
Definition: timer_manager.h:77
TimerQueueCallback(TimerManager< T, D, E > *parent, const TimerInfoPtr &info, T last_expected, T last_real, T current_expected, T last_expired, T current_expired)
CallResult call()
Call this callback.
bool waitingCompare(int32_t lhs, int32_t rhs)
boost::weak_ptr< TimerInfo > TimerInfoWPtr
boost::mutex timers_mutex_
boost::shared_ptr< TimerInfo > TimerInfoPtr
std::vector< TimerInfoPtr > V_TimerInfo
static TimerManager & global()
TimerInfoPtr findTimer(int32_t handle)
ros::internal::condition_variable_monotonic timers_cond_
boost::mutex id_mutex_
boost::mutex waiting_mutex_
void schedule(const TimerInfoPtr &info)
#define ROS_DEBUG(...)
void setPeriod(int32_t handle, const D &period, bool reset=true)


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas
autogenerated on Mon Feb 28 2022 23:33:27