timer_manager.h
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2009, Willow Garage, Inc.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are met:
6  * * Redistributions of source code must retain the above copyright notice,
7  * this list of conditions and the following disclaimer.
8  * * Redistributions in binary form must reproduce the above copyright
9  * notice, this list of conditions and the following disclaimer in the
10  * documentation and/or other materials provided with the distribution.
11  * * Neither the names of Stanford University or Willow Garage, Inc. nor the names of its
12  * contributors may be used to endorse or promote products derived from
13  * this software without specific prior written permission.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
19  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25  * POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #ifndef ROSCPP_TIMER_MANAGER_H
29 #define ROSCPP_TIMER_MANAGER_H
30 
31 // check if we might need to include our own backported version boost::condition_variable
32 // in order to use CLOCK_MONOTONIC for the SteadyTimer
33 // the include order here is important!
34 #ifdef BOOST_THREAD_HAS_CONDATTR_SET_CLOCK_MONOTONIC
35 #include <boost/version.hpp>
36 #if BOOST_VERSION < 106100
37 // use backported version of boost condition variable, see https://svn.boost.org/trac/boost/ticket/6377
39 #else // Boost version is 1.61 or greater and has the steady clock fixes
40 #include <boost/thread/condition_variable.hpp>
41 #endif
42 #else // !BOOST_THREAD_HAS_CONDATTR_SET_CLOCK_MONOTONIC
43 #include <boost/thread/condition_variable.hpp>
44 #endif // BOOST_THREAD_HAS_CONDATTR_SET_CLOCK_MONOTONIC
45 
46 #include "ros/forwards.h"
47 #include "ros/time.h"
48 #include "ros/file_log.h"
49 
50 #include <boost/thread/thread.hpp>
51 #include <boost/thread/mutex.hpp>
52 #include <boost/thread/recursive_mutex.hpp>
53 
54 #include "ros/assert.h"
56 
57 #include <vector>
58 #include <list>
59 
60 namespace ros
61 {
62 
63 template<class T, class D, class E>
64 class TimerManager
65 {
66 private:
67  struct TimerInfo
68  {
69  int32_t handle;
70  D period;
71 
72  boost::function<void(const E&)> callback;
74 
76 
79 
81 
82  bool removed;
83 
86 
87  // TODO: atomicize
88  boost::mutex waiting_mutex;
90 
91  bool oneshot;
92 
93  // debugging info
94  uint32_t total_calls;
95  };
97  typedef boost::weak_ptr<TimerInfo> TimerInfoWPtr;
98  typedef std::vector<TimerInfoPtr> V_TimerInfo;
99 
100  typedef std::list<int32_t> L_int32;
101 
102 public:
103  TimerManager();
104  ~TimerManager();
105 
106  int32_t add(const D& period, const boost::function<void(const E&)>& callback, CallbackQueueInterface* callback_queue, const VoidConstPtr& tracked_object, bool oneshot);
107  void remove(int32_t handle);
108 
109  bool hasPending(int32_t handle);
110  void setPeriod(int32_t handle, const D& period, bool reset=true);
111 
113  {
115  return global;
116  }
117 
118 private:
119  void threadFunc();
120 
121  bool waitingCompare(int32_t lhs, int32_t rhs);
122  TimerInfoPtr findTimer(int32_t handle);
123  void schedule(const TimerInfoPtr& info);
124  void updateNext(const TimerInfoPtr& info, const T& current_time);
125 
126  V_TimerInfo timers_;
127  boost::mutex timers_mutex_;
129  volatile bool new_timer_;
130 
131  boost::mutex waiting_mutex_;
132  L_int32 waiting_;
133 
134  uint32_t id_counter_;
135  boost::mutex id_mutex_;
136 
138 
139  boost::thread thread_;
140 
141  bool quit_;
142 
144  {
145  public:
146  TimerQueueCallback(TimerManager<T, D, E>* parent, const TimerInfoPtr& info, T last_expected, T last_real, T current_expected)
147  : parent_(parent)
148  , info_(info)
149  , last_expected_(last_expected)
150  , last_real_(last_real)
151  , current_expected_(current_expected)
152  , called_(false)
153  {
154  boost::mutex::scoped_lock lock(info->waiting_mutex);
155  ++info->waiting_callbacks;
156  }
157 
159  {
160  TimerInfoPtr info = info_.lock();
161  if (info)
162  {
163  boost::mutex::scoped_lock lock(info->waiting_mutex);
164  --info->waiting_callbacks;
165  }
166  }
167 
169  {
170  TimerInfoPtr info = info_.lock();
171  if (!info)
172  {
173  return Invalid;
174  }
175 
176  {
177  ++info->total_calls;
178  called_ = true;
179 
180  VoidConstPtr tracked;
181  if (info->has_tracked_object)
182  {
183  tracked = info->tracked_object.lock();
184  if (!tracked)
185  {
186  return Invalid;
187  }
188  }
189 
190  E event;
191  event.last_expected = last_expected_;
192  event.last_real = last_real_;
193  event.current_expected = current_expected_;
194  event.current_real = T::now();
195  event.profile.last_duration = info->last_cb_duration;
196 
197  SteadyTime cb_start = SteadyTime::now();
198  info->callback(event);
199  SteadyTime cb_end = SteadyTime::now();
200  info->last_cb_duration = cb_end - cb_start;
201 
202  info->last_real = event.current_real;
203 
204  parent_->schedule(info);
205  }
206 
207  return Success;
208  }
209 
210  private:
212  TimerInfoWPtr info_;
216 
217  bool called_;
218  };
219 };
220 
221 template<class T, class D, class E>
223  new_timer_(false), id_counter_(0), thread_started_(false), quit_(false)
224 {
225 
226 }
227 
228 template<class T, class D, class E>
230 {
231  quit_ = true;
232  {
233  boost::mutex::scoped_lock lock(timers_mutex_);
235  }
236  if (thread_started_)
237  {
238  thread_.join();
239  }
240 }
241 
242 template<class T, class D, class E>
243 bool TimerManager<T, D, E>::waitingCompare(int32_t lhs, int32_t rhs)
244 {
245  TimerInfoPtr infol = findTimer(lhs);
246  TimerInfoPtr infor = findTimer(rhs);
247  if (!infol || !infor)
248  {
249  return infol < infor;
250  }
251 
252  return infol->next_expected < infor->next_expected;
253 }
254 
255 template<class T, class D, class E>
257 {
258  typename V_TimerInfo::iterator it = timers_.begin();
259  typename V_TimerInfo::iterator end = timers_.end();
260  for (; it != end; ++it)
261  {
262  if ((*it)->handle == handle)
263  {
264  return *it;
265  }
266  }
267 
268  return TimerInfoPtr();
269 }
270 
271 template<class T, class D, class E>
273 {
274  boost::mutex::scoped_lock lock(timers_mutex_);
275  TimerInfoPtr info = findTimer(handle);
276 
277  if (!info)
278  {
279  return false;
280  }
281 
282  if (info->has_tracked_object)
283  {
284  VoidConstPtr tracked = info->tracked_object.lock();
285  if (!tracked)
286  {
287  return false;
288  }
289  }
290 
291  boost::mutex::scoped_lock lock2(info->waiting_mutex);
292  return info->next_expected <= T::now() || info->waiting_callbacks != 0;
293 }
294 
295 template<class T, class D, class E>
296 int32_t TimerManager<T, D, E>::add(const D& period, const boost::function<void(const E&)>& callback, CallbackQueueInterface* callback_queue,
297  const VoidConstPtr& tracked_object, bool oneshot)
298 {
299  TimerInfoPtr info(boost::make_shared<TimerInfo>());
300  info->period = period;
301  info->callback = callback;
302  info->callback_queue = callback_queue;
303  info->last_expected = T::now();
304  info->next_expected = info->last_expected + period;
305  info->removed = false;
306  info->has_tracked_object = false;
307  info->waiting_callbacks = 0;
308  info->total_calls = 0;
309  info->oneshot = oneshot;
310  if (tracked_object)
311  {
312  info->tracked_object = tracked_object;
313  info->has_tracked_object = true;
314  }
315 
316  {
317  boost::mutex::scoped_lock lock(id_mutex_);
318  info->handle = id_counter_++;
319  }
320 
321  {
322  boost::mutex::scoped_lock lock(timers_mutex_);
323  timers_.push_back(info);
324 
325  if (!thread_started_)
326  {
327  thread_ = boost::thread(boost::bind(&TimerManager::threadFunc, this));
328  thread_started_ = true;
329  }
330 
331  {
332  boost::mutex::scoped_lock lock(waiting_mutex_);
333  waiting_.push_back(info->handle);
334  waiting_.sort(boost::bind(&TimerManager::waitingCompare, this, _1, _2));
335  }
336 
337  new_timer_ = true;
339  }
340 
341  return info->handle;
342 }
343 
344 template<class T, class D, class E>
345 void TimerManager<T, D, E>::remove(int32_t handle)
346 {
347  CallbackQueueInterface* callback_queue = 0;
348  uint64_t remove_id = 0;
349 
350  {
351  boost::mutex::scoped_lock lock(timers_mutex_);
352 
353  typename V_TimerInfo::iterator it = timers_.begin();
354  typename V_TimerInfo::iterator end = timers_.end();
355  for (; it != end; ++it)
356  {
357  const TimerInfoPtr& info = *it;
358  if (info->handle == handle)
359  {
360  info->removed = true;
361  callback_queue = info->callback_queue;
362  remove_id = (uint64_t)info.get();
363  timers_.erase(it);
364  break;
365  }
366  }
367 
368  {
369  boost::mutex::scoped_lock lock2(waiting_mutex_);
370  // Remove from the waiting list if it's in it
371  L_int32::iterator it = std::find(waiting_.begin(), waiting_.end(), handle);
372  if (it != waiting_.end())
373  {
374  waiting_.erase(it);
375  }
376  }
377  }
378 
379  if (callback_queue)
380  {
381  callback_queue->removeByID(remove_id);
382  }
383 }
384 
385 template<class T, class D, class E>
386 void TimerManager<T, D, E>::schedule(const TimerInfoPtr& info)
387 {
388  boost::mutex::scoped_lock lock(timers_mutex_);
389 
390  if (info->removed)
391  {
392  return;
393  }
394 
395  updateNext(info, T::now());
396  {
397  boost::mutex::scoped_lock lock(waiting_mutex_);
398 
399  waiting_.push_back(info->handle);
400  // waitingCompare requires a lock on the timers_mutex_
401  waiting_.sort(boost::bind(&TimerManager::waitingCompare, this, _1, _2));
402  }
403 
404  new_timer_ = true;
406 }
407 
408 template<class T, class D, class E>
409 void TimerManager<T, D, E>::updateNext(const TimerInfoPtr& info, const T& current_time)
410 {
411  if (info->oneshot)
412  {
413  info->next_expected = T(INT_MAX, 999999999);
414  }
415  else
416  {
417  // Protect against someone having called setPeriod()
418  // If the next expected time is already past the current time
419  // don't update it
420  if (info->next_expected <= current_time)
421  {
422  info->last_expected = info->next_expected;
423  info->next_expected += info->period;
424  }
425 
426  // detect time jumping forward, as well as callbacks that are too slow
427  if (info->next_expected + info->period < current_time)
428  {
429  ROS_DEBUG("Time jumped forward by [%f] for timer of period [%f], resetting timer (current=%f, next_expected=%f)", (current_time - info->next_expected).toSec(), info->period.toSec(), current_time.toSec(), info->next_expected.toSec());
430  info->next_expected = current_time;
431  }
432  }
433 }
434 
435 template<class T, class D, class E>
436 void TimerManager<T, D, E>::setPeriod(int32_t handle, const D& period, bool reset)
437 {
438  boost::mutex::scoped_lock lock(timers_mutex_);
439  TimerInfoPtr info = findTimer(handle);
440 
441  if (!info)
442  {
443  return;
444  }
445 
446  {
447  boost::mutex::scoped_lock lock(waiting_mutex_);
448 
449  if(reset)
450  {
451  info->next_expected = T::now() + period;
452  }
453 
454  // else if some time has elapsed since last cb (called outside of cb)
455  else if( (T::now() - info->last_real) < info->period)
456  {
457  // if elapsed time is greater than the new period
458  // do the callback now
459  if( (T::now() - info->last_real) > period)
460  {
461  info->next_expected = T::now();
462  }
463 
464  // else, account for elapsed time by using last_real+period
465  else
466  {
467  info->next_expected = info->last_real + period;
468  }
469  }
470 
471  // Else if called in a callback, last_real has not been updated yet => (now - last_real) > period
472  // In this case, let next_expected be updated only in updateNext
473 
474  info->period = period;
475  waiting_.sort(boost::bind(&TimerManager::waitingCompare, this, _1, _2));
476  }
477 
478  new_timer_ = true;
480 }
481 
482 template<class T, class D, class E>
484 {
485  T current;
486  while (!quit_)
487  {
488  T sleep_end;
489 
490  boost::mutex::scoped_lock lock(timers_mutex_);
491 
492  // detect time jumping backwards
493  if (T::now() < current)
494  {
495  ROSCPP_LOG_DEBUG("Time jumped backward, resetting timers");
496 
497  current = T::now();
498 
499  typename V_TimerInfo::iterator it = timers_.begin();
500  typename V_TimerInfo::iterator end = timers_.end();
501  for (; it != end; ++it)
502  {
503  const TimerInfoPtr& info = *it;
504 
505  // Timer may have been added after the time jump, so also check if time has jumped past its last call time
506  if (current < info->last_expected)
507  {
508  info->last_expected = current;
509  info->next_expected = current + info->period;
510  }
511  }
512  }
513 
514  current = T::now();
515 
516  {
517  boost::mutex::scoped_lock waitlock(waiting_mutex_);
518 
519  if (waiting_.empty())
520  {
521  sleep_end = current + D(0.1);
522  }
523  else
524  {
525  TimerInfoPtr info = findTimer(waiting_.front());
526 
527  while (!waiting_.empty() && info && info->next_expected <= current)
528  {
529  current = T::now();
530 
531  //ROS_DEBUG("Scheduling timer callback for timer [%d] of period [%f], [%f] off expected", info->handle, info->period.toSec(), (current - info->next_expected).toSec());
532  CallbackInterfacePtr cb(boost::make_shared<TimerQueueCallback>(this, info, info->last_expected, info->last_real, info->next_expected));
533  info->callback_queue->addCallback(cb, (uint64_t)info.get());
534 
535  waiting_.pop_front();
536 
537  if (waiting_.empty())
538  {
539  break;
540  }
541 
542  info = findTimer(waiting_.front());
543  }
544 
545  if (info)
546  {
547  sleep_end = info->next_expected;
548  }
549  }
550  }
551 
552  while (!new_timer_ && T::now() < sleep_end && !quit_)
553  {
554  // detect backwards jumps in time
555 
556  if (T::now() < current)
557  {
558  ROSCPP_LOG_DEBUG("Time jumped backwards, breaking out of sleep");
559  break;
560  }
561 
562  current = T::now();
563 
564  if (current >= sleep_end)
565  {
566  break;
567  }
568 
569  // If we're on simulation time we need to check now() against sleep_end more often than on system time,
570  // since simulation time may be running faster than real time.
571  if (!T::isSystemTime())
572  {
573  timers_cond_.timed_wait(lock, boost::posix_time::milliseconds(1));
574  }
575  else
576  {
577  // On system time we can simply sleep for the rest of the wait time, since anything else requiring processing will
578  // signal the condition variable
579  int32_t remaining_time = std::max((int32_t)((sleep_end - current).toSec() * 1000.0f), 1);
580  timers_cond_.timed_wait(lock, boost::posix_time::milliseconds(remaining_time));
581  }
582  }
583 
584  new_timer_ = false;
585  }
586 }
587 
588 }
589 
590 #endif
CallResult
Possible results for the call() method.
boost::function< void(const E &)> callback
Definition: timer_manager.h:72
void remove(int32_t handle)
std::list< int32_t > L_int32
boost::thread thread_
f
Abstract interface for items which can be added to a CallbackQueueInterface.
Abstract interface for a queue used to handle all callbacks within roscpp.
TimerManager< T, D, E > * parent_
V_TimerInfo timers_
bool hasPending(int32_t handle)
virtual void removeByID(uint64_t owner_id)=0
Remove all callbacks associated with an owner id.
int32_t add(const D &period, const boost::function< void(const E &)> &callback, CallbackQueueInterface *callback_queue, const VoidConstPtr &tracked_object, bool oneshot)
#define ROSCPP_LOG_DEBUG(...)
Definition: file_log.h:35
static SteadyTime now()
boost::condition_variable timers_cond_
boost::weak_ptr< void const > VoidConstWPtr
Definition: forwards.h:53
volatile bool new_timer_
void updateNext(const TimerInfoPtr &info, const T &current_time)
CallbackQueueInterface * callback_queue
Definition: timer_manager.h:73
CallResult call()
Call this callback.
TimerQueueCallback(TimerManager< T, D, E > *parent, const TimerInfoPtr &info, T last_expected, T last_real, T current_expected)
bool waitingCompare(int32_t lhs, int32_t rhs)
boost::weak_ptr< TimerInfo > TimerInfoWPtr
Definition: timer_manager.h:97
boost::mutex timers_mutex_
boost::shared_ptr< TimerInfo > TimerInfoPtr
Definition: timer_manager.h:96
std::vector< TimerInfoPtr > V_TimerInfo
Definition: timer_manager.h:98
static TimerManager & global()
TimerInfoPtr findTimer(int32_t handle)
boost::mutex id_mutex_
boost::mutex waiting_mutex_
void schedule(const TimerInfoPtr &info)
#define ROS_DEBUG(...)
void setPeriod(int32_t handle, const D &period, bool reset=true)


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Wed Dec 20 2017 03:58:41