timer_manager.h
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2009, Willow Garage, Inc.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are met:
6  * * Redistributions of source code must retain the above copyright notice,
7  * this list of conditions and the following disclaimer.
8  * * Redistributions in binary form must reproduce the above copyright
9  * notice, this list of conditions and the following disclaimer in the
10  * documentation and/or other materials provided with the distribution.
11  * * Neither the names of Stanford University or Willow Garage, Inc. nor the names of its
12  * contributors may be used to endorse or promote products derived from
13  * this software without specific prior written permission.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
19  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25  * POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #ifndef ROSCPP_TIMER_MANAGER_H
29 #define ROSCPP_TIMER_MANAGER_H
30 
31 #include "ros/forwards.h"
32 #include "ros/time.h"
33 #include "ros/file_log.h"
34 
35 #include <boost/thread/thread.hpp>
36 #include <boost/thread/mutex.hpp>
37 #include <boost/thread/recursive_mutex.hpp>
38 
39 #include "ros/assert.h"
42 
43 #include <vector>
44 #include <list>
45 
46 namespace ros
47 {
48 
49 namespace {
50  template<class T>
51  class TimerManagerTraits
52  {
53  public:
54  typedef boost::chrono::system_clock::time_point time_point;
55  };
56 
57  template<>
58  class TimerManagerTraits<SteadyTime>
59  {
60  public:
61  typedef boost::chrono::steady_clock::time_point time_point;
62  };
63 }
64 
65 template<class T, class D, class E>
66 class TimerManager
67 {
68 private:
69  struct TimerInfo
70  {
71  int32_t handle;
72  D period;
73 
74  boost::function<void(const E&)> callback;
76 
78 
81 
83 
84  bool removed;
85 
88 
89  // TODO: atomicize
90  boost::mutex waiting_mutex;
92 
93  bool oneshot;
94 
95  // debugging info
96  uint32_t total_calls;
97  };
99  typedef boost::weak_ptr<TimerInfo> TimerInfoWPtr;
100  typedef std::vector<TimerInfoPtr> V_TimerInfo;
101 
102  typedef std::list<int32_t> L_int32;
103 
104 public:
105  TimerManager();
106  ~TimerManager();
107 
108  int32_t add(const D& period, const boost::function<void(const E&)>& callback, CallbackQueueInterface* callback_queue, const VoidConstPtr& tracked_object, bool oneshot);
109  void remove(int32_t handle);
110 
111  bool hasPending(int32_t handle);
112  void setPeriod(int32_t handle, const D& period, bool reset=true);
113 
115  {
116  static TimerManager<T, D, E> global;
117  return global;
118  }
119 
120 private:
121  void threadFunc();
122 
123  bool waitingCompare(int32_t lhs, int32_t rhs);
124  TimerInfoPtr findTimer(int32_t handle);
125  void schedule(const TimerInfoPtr& info);
126  void updateNext(const TimerInfoPtr& info, const T& current_time);
127 
128  V_TimerInfo timers_;
129  boost::mutex timers_mutex_;
131  volatile bool new_timer_;
132 
133  boost::mutex waiting_mutex_;
134  L_int32 waiting_;
135 
136  uint32_t id_counter_;
137  boost::mutex id_mutex_;
138 
140 
141  boost::thread thread_;
142 
143  bool quit_;
144 
146  {
147  public:
148  TimerQueueCallback(TimerManager<T, D, E>* parent, const TimerInfoPtr& info, T last_expected, T last_real, T current_expected)
149  : parent_(parent)
150  , info_(info)
151  , last_expected_(last_expected)
152  , last_real_(last_real)
153  , current_expected_(current_expected)
154  , called_(false)
155  {
156  boost::mutex::scoped_lock lock(info->waiting_mutex);
157  ++info->waiting_callbacks;
158  }
159 
161  {
162  TimerInfoPtr info = info_.lock();
163  if (info)
164  {
165  boost::mutex::scoped_lock lock(info->waiting_mutex);
166  --info->waiting_callbacks;
167  }
168  }
169 
171  {
172  TimerInfoPtr info = info_.lock();
173  if (!info)
174  {
175  return Invalid;
176  }
177 
178  {
179  ++info->total_calls;
180  called_ = true;
181 
182  VoidConstPtr tracked;
183  if (info->has_tracked_object)
184  {
185  tracked = info->tracked_object.lock();
186  if (!tracked)
187  {
188  return Invalid;
189  }
190  }
191 
192  E event;
193  event.last_expected = last_expected_;
194  event.last_real = last_real_;
195  event.current_expected = current_expected_;
196  event.current_real = T::now();
197  event.profile.last_duration = info->last_cb_duration;
198 
199  SteadyTime cb_start = SteadyTime::now();
200  info->callback(event);
201  SteadyTime cb_end = SteadyTime::now();
202  info->last_cb_duration = cb_end - cb_start;
203 
204  info->last_real = event.current_real;
205 
206  parent_->schedule(info);
207  }
208 
209  return Success;
210  }
211 
212  private:
214  TimerInfoWPtr info_;
218 
219  bool called_;
220  };
221 };
222 
223 template<class T, class D, class E>
225  new_timer_(false), id_counter_(0), thread_started_(false), quit_(false)
226 {}
227 
228 template<class T, class D, class E>
230 {
231  quit_ = true;
232  {
233  boost::mutex::scoped_lock lock(timers_mutex_);
234  timers_cond_.notify_all();
235  }
236  if (thread_started_)
237  {
238  thread_.join();
239  }
240 }
241 
242 template<class T, class D, class E>
243 bool TimerManager<T, D, E>::waitingCompare(int32_t lhs, int32_t rhs)
244 {
245  TimerInfoPtr infol = findTimer(lhs);
246  TimerInfoPtr infor = findTimer(rhs);
247  if (!infol || !infor)
248  {
249  return infol < infor;
250  }
251 
252  return infol->next_expected < infor->next_expected;
253 }
254 
255 template<class T, class D, class E>
257 {
258  typename V_TimerInfo::iterator it = timers_.begin();
259  typename V_TimerInfo::iterator end = timers_.end();
260  for (; it != end; ++it)
261  {
262  if ((*it)->handle == handle)
263  {
264  return *it;
265  }
266  }
267 
268  return TimerInfoPtr();
269 }
270 
271 template<class T, class D, class E>
273 {
274  boost::mutex::scoped_lock lock(timers_mutex_);
275  TimerInfoPtr info = findTimer(handle);
276 
277  if (!info)
278  {
279  return false;
280  }
281 
282  if (info->has_tracked_object)
283  {
284  VoidConstPtr tracked = info->tracked_object.lock();
285  if (!tracked)
286  {
287  return false;
288  }
289  }
290 
291  boost::mutex::scoped_lock lock2(info->waiting_mutex);
292  return info->next_expected <= T::now() || info->waiting_callbacks != 0;
293 }
294 
295 template<class T, class D, class E>
296 int32_t TimerManager<T, D, E>::add(const D& period, const boost::function<void(const E&)>& callback, CallbackQueueInterface* callback_queue,
297  const VoidConstPtr& tracked_object, bool oneshot)
298 {
299  TimerInfoPtr info(boost::make_shared<TimerInfo>());
300  info->period = period;
301  info->callback = callback;
302  info->callback_queue = callback_queue;
303  info->last_expected = T::now();
304  info->next_expected = info->last_expected + period;
305  info->removed = false;
306  info->has_tracked_object = false;
307  info->waiting_callbacks = 0;
308  info->total_calls = 0;
309  info->oneshot = oneshot;
310  if (tracked_object)
311  {
312  info->tracked_object = tracked_object;
313  info->has_tracked_object = true;
314  }
315 
316  {
317  boost::mutex::scoped_lock lock(id_mutex_);
318  info->handle = id_counter_++;
319  }
320 
321  {
322  boost::mutex::scoped_lock lock(timers_mutex_);
323  timers_.push_back(info);
324 
325  if (!thread_started_)
326  {
327  thread_ = boost::thread(boost::bind(&TimerManager::threadFunc, this));
328  thread_started_ = true;
329  }
330 
331  {
332  boost::mutex::scoped_lock lock(waiting_mutex_);
333  waiting_.push_back(info->handle);
334  waiting_.sort(boost::bind(&TimerManager::waitingCompare, this, _1, _2));
335  }
336 
337  new_timer_ = true;
338  timers_cond_.notify_all();
339  }
340 
341  return info->handle;
342 }
343 
344 template<class T, class D, class E>
345 void TimerManager<T, D, E>::remove(int32_t handle)
346 {
347  CallbackQueueInterface* callback_queue = 0;
348  uint64_t remove_id = 0;
349 
350  {
351  boost::mutex::scoped_lock lock(timers_mutex_);
352 
353  typename V_TimerInfo::iterator it = timers_.begin();
354  typename V_TimerInfo::iterator end = timers_.end();
355  for (; it != end; ++it)
356  {
357  const TimerInfoPtr& info = *it;
358  if (info->handle == handle)
359  {
360  info->removed = true;
361  callback_queue = info->callback_queue;
362  remove_id = (uint64_t)info.get();
363  timers_.erase(it);
364  break;
365  }
366  }
367 
368  {
369  boost::mutex::scoped_lock lock2(waiting_mutex_);
370  // Remove from the waiting list if it's in it
371  L_int32::iterator it = std::find(waiting_.begin(), waiting_.end(), handle);
372  if (it != waiting_.end())
373  {
374  waiting_.erase(it);
375  }
376  }
377  }
378 
379  if (callback_queue)
380  {
381  callback_queue->removeByID(remove_id);
382  }
383 }
384 
385 template<class T, class D, class E>
386 void TimerManager<T, D, E>::schedule(const TimerInfoPtr& info)
387 {
388  boost::mutex::scoped_lock lock(timers_mutex_);
389 
390  if (info->removed)
391  {
392  return;
393  }
394 
395  updateNext(info, T::now());
396  {
397  boost::mutex::scoped_lock lock(waiting_mutex_);
398 
399  waiting_.push_back(info->handle);
400  // waitingCompare requires a lock on the timers_mutex_
401  waiting_.sort(boost::bind(&TimerManager::waitingCompare, this, _1, _2));
402  }
403 
404  new_timer_ = true;
405  timers_cond_.notify_one();
406 }
407 
408 template<class T, class D, class E>
409 void TimerManager<T, D, E>::updateNext(const TimerInfoPtr& info, const T& current_time)
410 {
411  if (info->oneshot)
412  {
413  info->next_expected = T(INT_MAX, 999999999);
414  }
415  else
416  {
417  // Protect against someone having called setPeriod()
418  // If the next expected time is already past the current time
419  // don't update it
420  if (info->next_expected <= current_time)
421  {
422  info->last_expected = info->next_expected;
423  info->next_expected += info->period;
424  }
425 
426  // detect time jumping forward, as well as callbacks that are too slow
427  if (info->next_expected + info->period < current_time)
428  {
429  ROS_DEBUG("Time jumped forward by [%f] for timer of period [%f], resetting timer (current=%f, next_expected=%f)", (current_time - info->next_expected).toSec(), info->period.toSec(), current_time.toSec(), info->next_expected.toSec());
430  info->next_expected = current_time;
431  }
432  }
433 }
434 
435 template<class T, class D, class E>
436 void TimerManager<T, D, E>::setPeriod(int32_t handle, const D& period, bool reset)
437 {
438  boost::mutex::scoped_lock lock(timers_mutex_);
439  TimerInfoPtr info = findTimer(handle);
440 
441  if (!info)
442  {
443  return;
444  }
445 
446  {
447  boost::mutex::scoped_lock lock(waiting_mutex_);
448 
449  if(reset)
450  {
451  info->next_expected = T::now() + period;
452  }
453 
454  // else if some time has elapsed since last cb (called outside of cb)
455  else if( (T::now() - info->last_real) < info->period)
456  {
457  // if elapsed time is greater than the new period
458  // do the callback now
459  if( (T::now() - info->last_real) > period)
460  {
461  info->next_expected = T::now();
462  }
463 
464  // else, account for elapsed time by using last_real+period
465  else
466  {
467  info->next_expected = info->last_real + period;
468  }
469  }
470 
471  // Else if called in a callback, last_real has not been updated yet => (now - last_real) > period
472  // In this case, let next_expected be updated only in updateNext
473 
474  info->period = period;
475  waiting_.sort(boost::bind(&TimerManager::waitingCompare, this, _1, _2));
476  }
477 
478  new_timer_ = true;
479  timers_cond_.notify_one();
480 }
481 
482 template<class T, class D, class E>
484 {
485  T current;
486  while (!quit_)
487  {
488  T sleep_end;
489 
490  boost::mutex::scoped_lock lock(timers_mutex_);
491 
492  // detect time jumping backwards
493  if (T::now() < current)
494  {
495  ROSCPP_LOG_DEBUG("Time jumped backward, resetting timers");
496 
497  current = T::now();
498 
499  typename V_TimerInfo::iterator it = timers_.begin();
500  typename V_TimerInfo::iterator end = timers_.end();
501  for (; it != end; ++it)
502  {
503  const TimerInfoPtr& info = *it;
504 
505  // Timer may have been added after the time jump, so also check if time has jumped past its last call time
506  if (current < info->last_expected)
507  {
508  info->last_expected = current;
509  info->next_expected = current + info->period;
510  }
511  }
512  }
513 
514  current = T::now();
515 
516  {
517  boost::mutex::scoped_lock waitlock(waiting_mutex_);
518 
519  if (waiting_.empty())
520  {
521  sleep_end = current + D(0.1);
522  }
523  else
524  {
525  TimerInfoPtr info = findTimer(waiting_.front());
526 
527  while (!waiting_.empty() && info && info->next_expected <= current)
528  {
529  current = T::now();
530 
531  //ROS_DEBUG("Scheduling timer callback for timer [%d] of period [%f], [%f] off expected", info->handle, info->period.toSec(), (current - info->next_expected).toSec());
532  CallbackInterfacePtr cb(boost::make_shared<TimerQueueCallback>(this, info, info->last_expected, info->last_real, info->next_expected));
533  info->callback_queue->addCallback(cb, (uint64_t)info.get());
534 
535  waiting_.pop_front();
536 
537  if (waiting_.empty())
538  {
539  break;
540  }
541 
542  info = findTimer(waiting_.front());
543  }
544 
545  if (info)
546  {
547  sleep_end = info->next_expected;
548  }
549  }
550  }
551 
552  while (!new_timer_ && T::now() < sleep_end && !quit_)
553  {
554  // detect backwards jumps in time
555 
556  if (T::now() < current)
557  {
558  ROSCPP_LOG_DEBUG("Time jumped backwards, breaking out of sleep");
559  break;
560  }
561 
562  current = T::now();
563 
564  if (current >= sleep_end)
565  {
566  break;
567  }
568 
569  // If we're on simulation time we need to check now() against sleep_end more often than on system time,
570  // since simulation time may be running faster than real time.
571  if (!T::isSystemTime())
572  {
573  timers_cond_.wait_for(lock, boost::chrono::milliseconds(1));
574  }
575  else
576  {
577  // On system time we can simply sleep for the rest of the wait time, since anything else requiring processing will
578  // signal the condition variable
579  typename TimerManagerTraits<T>::time_point end_tp(boost::chrono::nanoseconds(sleep_end.toNSec()));
580  timers_cond_.wait_until(lock, end_tp);
581  }
582  }
583 
584  new_timer_ = false;
585  }
586 }
587 
588 }
589 
590 #endif
CallResult
Possible results for the call() method.
boost::function< void(const E &)> callback
Definition: timer_manager.h:74
void remove(int32_t handle)
std::list< int32_t > L_int32
boost::thread thread_
Abstract interface for items which can be added to a CallbackQueueInterface.
Abstract interface for a queue used to handle all callbacks within roscpp.
TimerManager< T, D, E > * parent_
V_TimerInfo timers_
bool hasPending(int32_t handle)
virtual void removeByID(uint64_t owner_id)=0
Remove all callbacks associated with an owner id.
int32_t add(const D &period, const boost::function< void(const E &)> &callback, CallbackQueueInterface *callback_queue, const VoidConstPtr &tracked_object, bool oneshot)
#define ROSCPP_LOG_DEBUG(...)
Definition: file_log.h:35
static SteadyTime now()
boost::condition_variable condition_variable_monotonic
boost::weak_ptr< void const > VoidConstWPtr
Definition: forwards.h:53
void threadFunc(boost::barrier *b)
volatile bool new_timer_
void updateNext(const TimerInfoPtr &info, const T &current_time)
CallbackQueueInterface * callback_queue
Definition: timer_manager.h:75
CallResult call()
Call this callback.
TimerQueueCallback(TimerManager< T, D, E > *parent, const TimerInfoPtr &info, T last_expected, T last_real, T current_expected)
bool waitingCompare(int32_t lhs, int32_t rhs)
boost::weak_ptr< TimerInfo > TimerInfoWPtr
Definition: timer_manager.h:99
boost::mutex timers_mutex_
boost::shared_ptr< TimerInfo > TimerInfoPtr
Definition: timer_manager.h:98
std::vector< TimerInfoPtr > V_TimerInfo
static TimerManager & global()
TimerInfoPtr findTimer(int32_t handle)
ros::internal::condition_variable_monotonic timers_cond_
boost::mutex id_mutex_
boost::mutex waiting_mutex_
void schedule(const TimerInfoPtr &info)
#define ROS_DEBUG(...)
void setPeriod(int32_t handle, const D &period, bool reset=true)


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas
autogenerated on Mon Nov 2 2020 03:52:26