00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #ifndef ROSCPP_TIMER_MANAGER_H
00029 #define ROSCPP_TIMER_MANAGER_H
00030
00031 #include "ros/forwards.h"
00032 #include "ros/time.h"
00033 #include "ros/file_log.h"
00034
00035 #include <boost/thread/thread.hpp>
00036 #include <boost/thread/mutex.hpp>
00037 #include <boost/thread/recursive_mutex.hpp>
00038 #include <boost/thread/condition_variable.hpp>
00039
00040 #include "ros/assert.h"
00041 #include "ros/callback_queue_interface.h"
00042
00043 #include <vector>
00044 #include <list>
00045
00046 namespace ros
00047 {
00048
00049 template<class T, class D, class E>
00050 class TimerManager
00051 {
00052 private:
00053 struct TimerInfo
00054 {
00055 int32_t handle;
00056 D period;
00057
00058 boost::function<void(const E&)> callback;
00059 CallbackQueueInterface* callback_queue;
00060
00061 WallDuration last_cb_duration;
00062
00063 T last_expected;
00064 T next_expected;
00065
00066 T last_real;
00067
00068 bool removed;
00069
00070 VoidConstWPtr tracked_object;
00071 bool has_tracked_object;
00072
00073
00074 boost::mutex waiting_mutex;
00075 uint32_t waiting_callbacks;
00076
00077 bool oneshot;
00078
00079
00080 uint32_t total_calls;
00081 };
00082 typedef boost::shared_ptr<TimerInfo> TimerInfoPtr;
00083 typedef boost::weak_ptr<TimerInfo> TimerInfoWPtr;
00084 typedef std::vector<TimerInfoPtr> V_TimerInfo;
00085
00086 typedef std::list<int32_t> L_int32;
00087
00088 public:
00089 TimerManager();
00090 ~TimerManager();
00091
00092 int32_t add(const D& period, const boost::function<void(const E&)>& callback, CallbackQueueInterface* callback_queue, const VoidConstPtr& tracked_object, bool oneshot);
00093 void remove(int32_t handle);
00094
00095 bool hasPending(int32_t handle);
00096 void setPeriod(int32_t handle, const D& period);
00097
00098 static TimerManager& global()
00099 {
00100 static TimerManager<T, D, E> global;
00101 return global;
00102 }
00103
00104 private:
00105 void threadFunc();
00106
00107 bool waitingCompare(int32_t lhs, int32_t rhs);
00108 TimerInfoPtr findTimer(int32_t handle);
00109 void schedule(const TimerInfoPtr& info);
00110 void updateNext(const TimerInfoPtr& info, const T& current_time);
00111
00112 V_TimerInfo timers_;
00113 boost::mutex timers_mutex_;
00114 boost::condition_variable timers_cond_;
00115 volatile bool new_timer_;
00116
00117 boost::mutex waiting_mutex_;
00118 L_int32 waiting_;
00119
00120 uint32_t id_counter_;
00121 boost::mutex id_mutex_;
00122
00123 bool thread_started_;
00124
00125 boost::thread thread_;
00126
00127 bool quit_;
00128
00129 class TimerQueueCallback : public CallbackInterface
00130 {
00131 public:
00132 TimerQueueCallback(TimerManager<T, D, E>* parent, const TimerInfoPtr& info, T last_expected, T last_real, T current_expected)
00133 : parent_(parent)
00134 , info_(info)
00135 , last_expected_(last_expected)
00136 , last_real_(last_real)
00137 , current_expected_(current_expected)
00138 , called_(false)
00139 {
00140 boost::mutex::scoped_lock lock(info->waiting_mutex);
00141 ++info->waiting_callbacks;
00142 }
00143
00144 ~TimerQueueCallback()
00145 {
00146 TimerInfoPtr info = info_.lock();
00147 if (info)
00148 {
00149 boost::mutex::scoped_lock lock(info->waiting_mutex);
00150 --info->waiting_callbacks;
00151 }
00152 }
00153
00154 CallResult call()
00155 {
00156 TimerInfoPtr info = info_.lock();
00157 if (!info)
00158 {
00159 return Invalid;
00160 }
00161
00162 {
00163 ++info->total_calls;
00164 called_ = true;
00165
00166 VoidConstPtr tracked;
00167 if (info->has_tracked_object)
00168 {
00169 tracked = info->tracked_object.lock();
00170 if (!tracked)
00171 {
00172 return Invalid;
00173 }
00174 }
00175
00176 E event;
00177 event.last_expected = last_expected_;
00178 event.last_real = last_real_;
00179 event.current_expected = current_expected_;
00180 event.current_real = T::now();
00181 event.profile.last_duration = info->last_cb_duration;
00182
00183 WallTime cb_start = WallTime::now();
00184 info->callback(event);
00185 WallTime cb_end = WallTime::now();
00186 info->last_cb_duration = cb_end - cb_start;
00187
00188 info->last_real = event.current_real;
00189
00190 parent_->schedule(info);
00191 }
00192
00193 return Success;
00194 }
00195
00196 private:
00197 TimerManager<T, D, E>* parent_;
00198 TimerInfoWPtr info_;
00199 T last_expected_;
00200 T last_real_;
00201 T current_expected_;
00202
00203 bool called_;
00204 };
00205 };
00206
00207 template<class T, class D, class E>
00208 TimerManager<T, D, E>::TimerManager() :
00209 new_timer_(false), id_counter_(0), thread_started_(false), quit_(false)
00210 {
00211
00212 }
00213
00214 template<class T, class D, class E>
00215 TimerManager<T, D, E>::~TimerManager()
00216 {
00217 quit_ = true;
00218 {
00219 boost::mutex::scoped_lock lock(timers_mutex_);
00220 timers_cond_.notify_all();
00221 }
00222 if (thread_started_)
00223 {
00224 thread_.join();
00225 }
00226 }
00227
00228 template<class T, class D, class E>
00229 bool TimerManager<T, D, E>::waitingCompare(int32_t lhs, int32_t rhs)
00230 {
00231 TimerInfoPtr infol = findTimer(lhs);
00232 TimerInfoPtr infor = findTimer(rhs);
00233 if (!infol || !infor)
00234 {
00235 return infol < infor;
00236 }
00237
00238 return infol->next_expected < infor->next_expected;
00239 }
00240
00241 template<class T, class D, class E>
00242 typename TimerManager<T, D, E>::TimerInfoPtr TimerManager<T, D, E>::findTimer(int32_t handle)
00243 {
00244 typename V_TimerInfo::iterator it = timers_.begin();
00245 typename V_TimerInfo::iterator end = timers_.end();
00246 for (; it != end; ++it)
00247 {
00248 if ((*it)->handle == handle)
00249 {
00250 return *it;
00251 }
00252 }
00253
00254 return TimerInfoPtr();
00255 }
00256
00257 template<class T, class D, class E>
00258 bool TimerManager<T, D, E>::hasPending(int32_t handle)
00259 {
00260 boost::mutex::scoped_lock lock(timers_mutex_);
00261 TimerInfoPtr info = findTimer(handle);
00262
00263 if (!info)
00264 {
00265 return false;
00266 }
00267
00268 if (info->has_tracked_object)
00269 {
00270 VoidConstPtr tracked = info->tracked_object.lock();
00271 if (!tracked)
00272 {
00273 return false;
00274 }
00275 }
00276
00277 boost::mutex::scoped_lock lock2(info->waiting_mutex);
00278 return info->next_expected <= T::now() || info->waiting_callbacks != 0;
00279 }
00280
00281 template<class T, class D, class E>
00282 int32_t TimerManager<T, D, E>::add(const D& period, const boost::function<void(const E&)>& callback, CallbackQueueInterface* callback_queue,
00283 const VoidConstPtr& tracked_object, bool oneshot)
00284 {
00285 TimerInfoPtr info(new TimerInfo);
00286 info->period = period;
00287 info->callback = callback;
00288 info->callback_queue = callback_queue;
00289 info->last_expected = T::now();
00290 info->next_expected = info->last_expected + period;
00291 info->removed = false;
00292 info->has_tracked_object = false;
00293 info->waiting_callbacks = 0;
00294 info->total_calls = 0;
00295 info->oneshot = oneshot;
00296 if (tracked_object)
00297 {
00298 info->tracked_object = tracked_object;
00299 info->has_tracked_object = true;
00300 }
00301
00302 {
00303 boost::mutex::scoped_lock lock(id_mutex_);
00304 info->handle = id_counter_++;
00305 }
00306
00307 {
00308 boost::mutex::scoped_lock lock(timers_mutex_);
00309 timers_.push_back(info);
00310
00311 if (!thread_started_)
00312 {
00313 thread_ = boost::thread(boost::bind(&TimerManager::threadFunc, this));
00314 thread_started_ = true;
00315 }
00316
00317 {
00318 boost::mutex::scoped_lock lock(waiting_mutex_);
00319 waiting_.push_back(info->handle);
00320 waiting_.sort(boost::bind(&TimerManager::waitingCompare, this, _1, _2));
00321 }
00322
00323 new_timer_ = true;
00324 timers_cond_.notify_all();
00325 }
00326
00327 return info->handle;
00328 }
00329
00330 template<class T, class D, class E>
00331 void TimerManager<T, D, E>::remove(int32_t handle)
00332 {
00333 boost::mutex::scoped_lock lock(timers_mutex_);
00334
00335 typename V_TimerInfo::iterator it = timers_.begin();
00336 typename V_TimerInfo::iterator end = timers_.end();
00337 for (; it != end; ++it)
00338 {
00339 const TimerInfoPtr& info = *it;
00340 if (info->handle == handle)
00341 {
00342 info->removed = true;
00343 info->callback_queue->removeByID((uint64_t)info.get());
00344 timers_.erase(it);
00345 break;
00346 }
00347 }
00348
00349 {
00350 boost::mutex::scoped_lock lock2(waiting_mutex_);
00351
00352 L_int32::iterator it = std::find(waiting_.begin(), waiting_.end(), handle);
00353 if (it != waiting_.end())
00354 {
00355 waiting_.erase(it);
00356 }
00357 }
00358 }
00359
00360 template<class T, class D, class E>
00361 void TimerManager<T, D, E>::schedule(const TimerManager<T, D, E>::TimerInfoPtr& info)
00362 {
00363 {
00364 boost::mutex::scoped_lock lock(waiting_mutex_);
00365
00366 if (info->removed)
00367 {
00368 return;
00369 }
00370
00371 updateNext(info, T::now());
00372 waiting_.push_back(info->handle);
00373 waiting_.sort(boost::bind(&TimerManager::waitingCompare, this, _1, _2));
00374 }
00375
00376 {
00377 boost::mutex::scoped_lock lock(timers_mutex_);
00378 new_timer_ = true;
00379 timers_cond_.notify_one();
00380 }
00381 }
00382
00383 template<class T, class D, class E>
00384 void TimerManager<T, D, E>::updateNext(const TimerManager<T, D, E>::TimerInfoPtr& info, const T& current_time)
00385 {
00386 if (info->oneshot)
00387 {
00388 info->next_expected = T(INT_MAX, 999999999);
00389 }
00390 else
00391 {
00392
00393
00394
00395 if (info->next_expected <= current_time)
00396 {
00397 info->last_expected = info->next_expected;
00398 info->next_expected += info->period;
00399 }
00400
00401
00402 if (info->next_expected + info->period < current_time)
00403 {
00404 ROS_DEBUG("Time jumped forward by [%f] for timer of period [%f], resetting timer (current=%f, next_expected=%f)", (current_time - info->next_expected).toSec(), info->period.toSec(), current_time.toSec(), info->next_expected.toSec());
00405 info->next_expected = current_time;
00406 }
00407 }
00408 }
00409
00410 template<class T, class D, class E>
00411 void TimerManager<T, D, E>::setPeriod(int32_t handle, const D& period)
00412 {
00413 boost::mutex::scoped_lock lock(timers_mutex_);
00414 TimerInfoPtr info = findTimer(handle);
00415
00416 if (!info)
00417 {
00418 return;
00419 }
00420
00421 {
00422 boost::mutex::scoped_lock lock(waiting_mutex_);
00423 info->period = period;
00424 info->next_expected = T::now() + period;
00425
00426 waiting_.sort(boost::bind(&TimerManager::waitingCompare, this, _1, _2));
00427 }
00428
00429 new_timer_ = true;
00430 timers_cond_.notify_one();
00431 }
00432
00433 template<class T, class D, class E>
00434 void TimerManager<T, D, E>::threadFunc()
00435 {
00436 T current;
00437 while (!quit_)
00438 {
00439 T sleep_end;
00440
00441 boost::mutex::scoped_lock lock(timers_mutex_);
00442
00443
00444 if (T::now() < current)
00445 {
00446 ROSCPP_LOG_DEBUG("Time jumped backward, resetting timers");
00447
00448 current = T::now();
00449
00450 typename V_TimerInfo::iterator it = timers_.begin();
00451 typename V_TimerInfo::iterator end = timers_.end();
00452 for (; it != end; ++it)
00453 {
00454 const TimerInfoPtr& info = *it;
00455
00456
00457 if (current < info->last_expected)
00458 {
00459 info->last_expected = current;
00460 info->next_expected = current + info->period;
00461 }
00462 }
00463 }
00464
00465 current = T::now();
00466
00467 {
00468 boost::mutex::scoped_lock waitlock(waiting_mutex_);
00469
00470 if (waiting_.empty())
00471 {
00472 sleep_end = current + D(0.1);
00473 }
00474 else
00475 {
00476 TimerInfoPtr info = findTimer(waiting_.front());
00477
00478 while (!waiting_.empty() && info && info->next_expected <= current)
00479 {
00480 current = T::now();
00481
00482
00483 CallbackInterfacePtr cb(new TimerQueueCallback(this, info, info->last_expected, info->last_real, info->next_expected));
00484 info->callback_queue->addCallback(cb, (uint64_t)info.get());
00485
00486 waiting_.pop_front();
00487
00488 if (waiting_.empty())
00489 {
00490 break;
00491 }
00492
00493 info = findTimer(waiting_.front());
00494 }
00495
00496 if (info)
00497 {
00498 sleep_end = info->next_expected;
00499 }
00500 }
00501 }
00502
00503 while (!new_timer_ && T::now() < sleep_end && !quit_)
00504 {
00505
00506
00507 if (T::now() < current)
00508 {
00509 ROSCPP_LOG_DEBUG("Time jumped backwards, breaking out of sleep");
00510 break;
00511 }
00512
00513 current = T::now();
00514
00515 if (current >= sleep_end)
00516 {
00517 break;
00518 }
00519
00520
00521
00522 if (!T::isSystemTime())
00523 {
00524 timers_cond_.timed_wait(lock, boost::posix_time::milliseconds(1));
00525 }
00526 else
00527 {
00528
00529
00530 int32_t remaining_time = std::max((int32_t)((sleep_end - current).toSec() * 1000.0f), 1);
00531 timers_cond_.timed_wait(lock, boost::posix_time::milliseconds(remaining_time));
00532 }
00533 }
00534
00535 new_timer_ = false;
00536 }
00537 }
00538
00539 }
00540
00541 #endif