00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #ifndef ROSCPP_TIMER_MANAGER_H
00029 #define ROSCPP_TIMER_MANAGER_H
00030
00031 #include "ros/forwards.h"
00032 #include "ros/time.h"
00033 #include "ros/file_log.h"
00034
00035 #include <boost/thread/thread.hpp>
00036 #include <boost/thread/mutex.hpp>
00037 #include <boost/thread/recursive_mutex.hpp>
00038 #include <boost/thread/condition_variable.hpp>
00039
00040 #include "ros/assert.h"
00041 #include "ros/callback_queue_interface.h"
00042
00043 #include <vector>
00044 #include <list>
00045
00046 namespace ros
00047 {
00048
00049 template<class T, class D, class E>
00050 class TimerManager
00051 {
00052 private:
00053 struct TimerInfo
00054 {
00055 int32_t handle;
00056 D period;
00057
00058 boost::function<void(const E&)> callback;
00059 CallbackQueueInterface* callback_queue;
00060
00061 WallDuration last_cb_duration;
00062
00063 T last_expected;
00064 T next_expected;
00065
00066 T last_real;
00067
00068 bool removed;
00069
00070 VoidConstWPtr tracked_object;
00071 bool has_tracked_object;
00072
00073
00074 boost::mutex waiting_mutex;
00075 uint32_t waiting_callbacks;
00076
00077 bool oneshot;
00078
00079
00080 uint32_t total_calls;
00081 };
00082 typedef boost::shared_ptr<TimerInfo> TimerInfoPtr;
00083 typedef boost::weak_ptr<TimerInfo> TimerInfoWPtr;
00084 typedef std::vector<TimerInfoPtr> V_TimerInfo;
00085
00086 typedef std::list<int32_t> L_int32;
00087
00088 public:
00089 TimerManager();
00090 ~TimerManager();
00091
00092 int32_t add(const D& period, const boost::function<void(const E&)>& callback, CallbackQueueInterface* callback_queue, const VoidConstPtr& tracked_object, bool oneshot);
00093 void remove(int32_t handle);
00094
00095 bool hasPending(int32_t handle);
00096 void setPeriod(int32_t handle, const D& period);
00097
00098 static TimerManager& global()
00099 {
00100 static TimerManager<T, D, E> global;
00101 return global;
00102 }
00103
00104 private:
00105 void threadFunc();
00106
00107 bool waitingCompare(int32_t lhs, int32_t rhs);
00108 TimerInfoPtr findTimer(int32_t handle);
00109 void schedule(const TimerInfoPtr& info);
00110 void updateNext(const TimerInfoPtr& info, const T& current_time);
00111
00112 V_TimerInfo timers_;
00113 boost::mutex timers_mutex_;
00114 boost::condition_variable timers_cond_;
00115 volatile bool new_timer_;
00116
00117 boost::mutex waiting_mutex_;
00118 L_int32 waiting_;
00119
00120 uint32_t id_counter_;
00121 boost::mutex id_mutex_;
00122
00123 bool thread_started_;
00124
00125 boost::thread thread_;
00126
00127 bool quit_;
00128
00129 class TimerQueueCallback : public CallbackInterface
00130 {
00131 public:
00132 TimerQueueCallback(TimerManager<T, D, E>* parent, const TimerInfoPtr& info, T last_expected, T last_real, T current_expected)
00133 : parent_(parent)
00134 , info_(info)
00135 , last_expected_(last_expected)
00136 , last_real_(last_real)
00137 , current_expected_(current_expected)
00138 , called_(false)
00139 {
00140 boost::mutex::scoped_lock lock(info->waiting_mutex);
00141 ++info->waiting_callbacks;
00142 }
00143
00144 ~TimerQueueCallback()
00145 {
00146 TimerInfoPtr info = info_.lock();
00147 if (info)
00148 {
00149 boost::mutex::scoped_lock lock(info->waiting_mutex);
00150 --info->waiting_callbacks;
00151 }
00152 }
00153
00154 CallResult call()
00155 {
00156 TimerInfoPtr info = info_.lock();
00157 if (!info)
00158 {
00159 return Invalid;
00160 }
00161
00162 {
00163 ++info->total_calls;
00164 called_ = true;
00165
00166 VoidConstPtr tracked;
00167 if (info->has_tracked_object)
00168 {
00169 tracked = info->tracked_object.lock();
00170 if (!tracked)
00171 {
00172 return Invalid;
00173 }
00174 }
00175
00176 E event;
00177 event.last_expected = last_expected_;
00178 event.last_real = last_real_;
00179 event.current_expected = current_expected_;
00180 event.current_real = T::now();
00181 event.profile.last_duration = info->last_cb_duration;
00182
00183 WallTime cb_start = WallTime::now();
00184 info->callback(event);
00185 WallTime cb_end = WallTime::now();
00186 info->last_cb_duration = cb_end - cb_start;
00187
00188 info->last_real = event.current_real;
00189
00190 parent_->schedule(info);
00191 }
00192
00193 return Success;
00194 }
00195
00196 private:
00197 TimerManager<T, D, E>* parent_;
00198 TimerInfoWPtr info_;
00199 T last_expected_;
00200 T last_real_;
00201 T current_expected_;
00202
00203 bool called_;
00204 };
00205 };
00206
00207 template<class T, class D, class E>
00208 TimerManager<T, D, E>::TimerManager() :
00209 new_timer_(false), id_counter_(0), thread_started_(false), quit_(false)
00210 {
00211
00212 }
00213
00214 template<class T, class D, class E>
00215 TimerManager<T, D, E>::~TimerManager()
00216 {
00217 quit_ = true;
00218 {
00219 boost::mutex::scoped_lock lock(timers_mutex_);
00220 timers_cond_.notify_all();
00221 }
00222 if (thread_started_)
00223 {
00224 thread_.join();
00225 }
00226 }
00227
00228 template<class T, class D, class E>
00229 bool TimerManager<T, D, E>::waitingCompare(int32_t lhs, int32_t rhs)
00230 {
00231 TimerInfoPtr infol = findTimer(lhs);
00232 TimerInfoPtr infor = findTimer(rhs);
00233 if (!infol || !infor)
00234 {
00235 return infol < infor;
00236 }
00237
00238 return infol->next_expected < infor->next_expected;
00239 }
00240
00241 template<class T, class D, class E>
00242 typename TimerManager<T, D, E>::TimerInfoPtr TimerManager<T, D, E>::findTimer(int32_t handle)
00243 {
00244 typename V_TimerInfo::iterator it = timers_.begin();
00245 typename V_TimerInfo::iterator end = timers_.end();
00246 for (; it != end; ++it)
00247 {
00248 if ((*it)->handle == handle)
00249 {
00250 return *it;
00251 }
00252 }
00253
00254 return TimerInfoPtr();
00255 }
00256
00257 template<class T, class D, class E>
00258 bool TimerManager<T, D, E>::hasPending(int32_t handle)
00259 {
00260 boost::mutex::scoped_lock lock(timers_mutex_);
00261 TimerInfoPtr info = findTimer(handle);
00262
00263 if (!info)
00264 {
00265 return false;
00266 }
00267
00268 if (info->has_tracked_object)
00269 {
00270 VoidConstPtr tracked = info->tracked_object.lock();
00271 if (!tracked)
00272 {
00273 return false;
00274 }
00275 }
00276
00277 boost::mutex::scoped_lock lock2(info->waiting_mutex);
00278 return info->next_expected <= T::now() || info->waiting_callbacks != 0;
00279 }
00280
00281 template<class T, class D, class E>
00282 int32_t TimerManager<T, D, E>::add(const D& period, const boost::function<void(const E&)>& callback, CallbackQueueInterface* callback_queue,
00283 const VoidConstPtr& tracked_object, bool oneshot)
00284 {
00285 TimerInfoPtr info(new TimerInfo);
00286 info->period = period;
00287 info->callback = callback;
00288 info->callback_queue = callback_queue;
00289 info->last_expected = T::now();
00290 info->next_expected = info->last_expected + period;
00291 info->removed = false;
00292 info->has_tracked_object = false;
00293 info->waiting_callbacks = 0;
00294 info->total_calls = 0;
00295 info->oneshot = oneshot;
00296 if (tracked_object)
00297 {
00298 info->tracked_object = tracked_object;
00299 info->has_tracked_object = true;
00300 }
00301
00302 {
00303 boost::mutex::scoped_lock lock(id_mutex_);
00304 info->handle = id_counter_++;
00305 }
00306
00307 {
00308 boost::mutex::scoped_lock lock(timers_mutex_);
00309 timers_.push_back(info);
00310
00311 if (!thread_started_)
00312 {
00313 thread_ = boost::thread(boost::bind(&TimerManager::threadFunc, this));
00314 thread_started_ = true;
00315 }
00316
00317 {
00318 boost::mutex::scoped_lock lock(waiting_mutex_);
00319 waiting_.push_back(info->handle);
00320 waiting_.sort(boost::bind(&TimerManager::waitingCompare, this, _1, _2));
00321 }
00322
00323 new_timer_ = true;
00324 timers_cond_.notify_all();
00325 }
00326
00327 return info->handle;
00328 }
00329
00330 template<class T, class D, class E>
00331 void TimerManager<T, D, E>::remove(int32_t handle)
00332 {
00333 CallbackQueueInterface* callback_queue = 0;
00334 uint64_t remove_id = 0;
00335
00336 {
00337 boost::mutex::scoped_lock lock(timers_mutex_);
00338
00339 typename V_TimerInfo::iterator it = timers_.begin();
00340 typename V_TimerInfo::iterator end = timers_.end();
00341 for (; it != end; ++it)
00342 {
00343 const TimerInfoPtr& info = *it;
00344 if (info->handle == handle)
00345 {
00346 info->removed = true;
00347 callback_queue = info->callback_queue;
00348 remove_id = (uint64_t)info.get();
00349 timers_.erase(it);
00350 break;
00351 }
00352 }
00353
00354 {
00355 boost::mutex::scoped_lock lock2(waiting_mutex_);
00356
00357 L_int32::iterator it = std::find(waiting_.begin(), waiting_.end(), handle);
00358 if (it != waiting_.end())
00359 {
00360 waiting_.erase(it);
00361 }
00362 }
00363 }
00364
00365 if (callback_queue)
00366 {
00367 callback_queue->removeByID(remove_id);
00368 }
00369 }
00370
00371 template<class T, class D, class E>
00372 void TimerManager<T, D, E>::schedule(const TimerInfoPtr& info)
00373 {
00374 boost::mutex::scoped_lock lock(timers_mutex_);
00375
00376 if (info->removed)
00377 {
00378 return;
00379 }
00380
00381 updateNext(info, T::now());
00382 {
00383 boost::mutex::scoped_lock lock(waiting_mutex_);
00384
00385 waiting_.push_back(info->handle);
00386
00387 waiting_.sort(boost::bind(&TimerManager::waitingCompare, this, _1, _2));
00388 }
00389
00390 new_timer_ = true;
00391 timers_cond_.notify_one();
00392 }
00393
00394 template<class T, class D, class E>
00395 void TimerManager<T, D, E>::updateNext(const TimerInfoPtr& info, const T& current_time)
00396 {
00397 if (info->oneshot)
00398 {
00399 info->next_expected = T(INT_MAX, 999999999);
00400 }
00401 else
00402 {
00403
00404
00405
00406 if (info->next_expected <= current_time)
00407 {
00408 info->last_expected = info->next_expected;
00409 info->next_expected += info->period;
00410 }
00411
00412
00413 if (info->next_expected + info->period < current_time)
00414 {
00415 ROS_DEBUG("Time jumped forward by [%f] for timer of period [%f], resetting timer (current=%f, next_expected=%f)", (current_time - info->next_expected).toSec(), info->period.toSec(), current_time.toSec(), info->next_expected.toSec());
00416 info->next_expected = current_time;
00417 }
00418 }
00419 }
00420
00421 template<class T, class D, class E>
00422 void TimerManager<T, D, E>::setPeriod(int32_t handle, const D& period)
00423 {
00424 boost::mutex::scoped_lock lock(timers_mutex_);
00425 TimerInfoPtr info = findTimer(handle);
00426
00427 if (!info)
00428 {
00429 return;
00430 }
00431
00432 {
00433 boost::mutex::scoped_lock lock(waiting_mutex_);
00434 info->period = period;
00435 info->next_expected = T::now() + period;
00436
00437 waiting_.sort(boost::bind(&TimerManager::waitingCompare, this, _1, _2));
00438 }
00439
00440 new_timer_ = true;
00441 timers_cond_.notify_one();
00442 }
00443
00444 template<class T, class D, class E>
00445 void TimerManager<T, D, E>::threadFunc()
00446 {
00447 T current;
00448 while (!quit_)
00449 {
00450 T sleep_end;
00451
00452 boost::mutex::scoped_lock lock(timers_mutex_);
00453
00454
00455 if (T::now() < current)
00456 {
00457 ROSCPP_LOG_DEBUG("Time jumped backward, resetting timers");
00458
00459 current = T::now();
00460
00461 typename V_TimerInfo::iterator it = timers_.begin();
00462 typename V_TimerInfo::iterator end = timers_.end();
00463 for (; it != end; ++it)
00464 {
00465 const TimerInfoPtr& info = *it;
00466
00467
00468 if (current < info->last_expected)
00469 {
00470 info->last_expected = current;
00471 info->next_expected = current + info->period;
00472 }
00473 }
00474 }
00475
00476 current = T::now();
00477
00478 {
00479 boost::mutex::scoped_lock waitlock(waiting_mutex_);
00480
00481 if (waiting_.empty())
00482 {
00483 sleep_end = current + D(0.1);
00484 }
00485 else
00486 {
00487 TimerInfoPtr info = findTimer(waiting_.front());
00488
00489 while (!waiting_.empty() && info && info->next_expected <= current)
00490 {
00491 current = T::now();
00492
00493
00494 CallbackInterfacePtr cb(new TimerQueueCallback(this, info, info->last_expected, info->last_real, info->next_expected));
00495 info->callback_queue->addCallback(cb, (uint64_t)info.get());
00496
00497 waiting_.pop_front();
00498
00499 if (waiting_.empty())
00500 {
00501 break;
00502 }
00503
00504 info = findTimer(waiting_.front());
00505 }
00506
00507 if (info)
00508 {
00509 sleep_end = info->next_expected;
00510 }
00511 }
00512 }
00513
00514 while (!new_timer_ && T::now() < sleep_end && !quit_)
00515 {
00516
00517
00518 if (T::now() < current)
00519 {
00520 ROSCPP_LOG_DEBUG("Time jumped backwards, breaking out of sleep");
00521 break;
00522 }
00523
00524 current = T::now();
00525
00526 if (current >= sleep_end)
00527 {
00528 break;
00529 }
00530
00531
00532
00533 if (!T::isSystemTime())
00534 {
00535 timers_cond_.timed_wait(lock, boost::posix_time::milliseconds(1));
00536 }
00537 else
00538 {
00539
00540
00541 int32_t remaining_time = std::max((int32_t)((sleep_end - current).toSec() * 1000.0f), 1);
00542 timers_cond_.timed_wait(lock, boost::posix_time::milliseconds(remaining_time));
00543 }
00544 }
00545
00546 new_timer_ = false;
00547 }
00548 }
00549
00550 }
00551
00552 #endif