$search
00001 /* 00002 * Copyright (C) 2009, Willow Garage, Inc. 00003 * 00004 * Redistribution and use in source and binary forms, with or without 00005 * modification, are permitted provided that the following conditions are met: 00006 * * Redistributions of source code must retain the above copyright notice, 00007 * this list of conditions and the following disclaimer. 00008 * * Redistributions in binary form must reproduce the above copyright 00009 * notice, this list of conditions and the following disclaimer in the 00010 * documentation and/or other materials provided with the distribution. 00011 * * Neither the names of Stanford University or Willow Garage, Inc. nor the names of its 00012 * contributors may be used to endorse or promote products derived from 00013 * this software without specific prior written permission. 00014 * 00015 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 00016 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 00017 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 00018 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 00019 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 00020 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 00021 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00022 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 00023 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 00024 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00025 * POSSIBILITY OF SUCH DAMAGE. 00026 */ 00027 00028 #ifndef ROSCPP_TIMER_MANAGER_H 00029 #define ROSCPP_TIMER_MANAGER_H 00030 00031 #include "ros/forwards.h" 00032 #include "ros/time.h" 00033 #include "ros/file_log.h" 00034 00035 #include <boost/thread/thread.hpp> 00036 #include <boost/thread/mutex.hpp> 00037 #include <boost/thread/recursive_mutex.hpp> 00038 #include <boost/thread/condition_variable.hpp> 00039 00040 #include "ros/assert.h" 00041 #include "ros/callback_queue_interface.h" 00042 00043 #include <vector> 00044 #include <list> 00045 00046 namespace ros 00047 { 00048 00049 template<class T, class D, class E> 00050 class TimerManager 00051 { 00052 private: 00053 struct TimerInfo 00054 { 00055 int32_t handle; 00056 D period; 00057 00058 boost::function<void(const E&)> callback; 00059 CallbackQueueInterface* callback_queue; 00060 00061 WallDuration last_cb_duration; 00062 00063 T last_expected; 00064 T next_expected; 00065 00066 T last_real; 00067 00068 bool removed; 00069 00070 VoidConstWPtr tracked_object; 00071 bool has_tracked_object; 00072 00073 // TODO: atomicize 00074 boost::mutex waiting_mutex; 00075 uint32_t waiting_callbacks; 00076 00077 bool oneshot; 00078 00079 // debugging info 00080 uint32_t total_calls; 00081 }; 00082 typedef boost::shared_ptr<TimerInfo> TimerInfoPtr; 00083 typedef boost::weak_ptr<TimerInfo> TimerInfoWPtr; 00084 typedef std::vector<TimerInfoPtr> V_TimerInfo; 00085 00086 typedef std::list<int32_t> L_int32; 00087 00088 public: 00089 TimerManager(); 00090 ~TimerManager(); 00091 00092 int32_t add(const D& period, const boost::function<void(const E&)>& callback, CallbackQueueInterface* callback_queue, const VoidConstPtr& tracked_object, bool oneshot); 00093 void remove(int32_t handle); 00094 00095 bool hasPending(int32_t handle); 00096 void setPeriod(int32_t handle, const D& period); 00097 00098 static TimerManager& global() 00099 { 00100 static TimerManager<T, D, E> global; 00101 return global; 00102 } 00103 00104 private: 00105 void threadFunc(); 00106 00107 bool waitingCompare(int32_t lhs, int32_t rhs); 00108 TimerInfoPtr findTimer(int32_t handle); 00109 void schedule(const TimerInfoPtr& info); 00110 void updateNext(const TimerInfoPtr& info, const T& current_time); 00111 00112 V_TimerInfo timers_; 00113 boost::mutex timers_mutex_; 00114 boost::condition_variable timers_cond_; 00115 volatile bool new_timer_; 00116 00117 boost::mutex waiting_mutex_; 00118 L_int32 waiting_; 00119 00120 uint32_t id_counter_; 00121 boost::mutex id_mutex_; 00122 00123 bool thread_started_; 00124 00125 boost::thread thread_; 00126 00127 bool quit_; 00128 00129 class TimerQueueCallback : public CallbackInterface 00130 { 00131 public: 00132 TimerQueueCallback(TimerManager<T, D, E>* parent, const TimerInfoPtr& info, T last_expected, T last_real, T current_expected) 00133 : parent_(parent) 00134 , info_(info) 00135 , last_expected_(last_expected) 00136 , last_real_(last_real) 00137 , current_expected_(current_expected) 00138 , called_(false) 00139 { 00140 boost::mutex::scoped_lock lock(info->waiting_mutex); 00141 ++info->waiting_callbacks; 00142 } 00143 00144 ~TimerQueueCallback() 00145 { 00146 TimerInfoPtr info = info_.lock(); 00147 if (info) 00148 { 00149 boost::mutex::scoped_lock lock(info->waiting_mutex); 00150 --info->waiting_callbacks; 00151 } 00152 } 00153 00154 CallResult call() 00155 { 00156 TimerInfoPtr info = info_.lock(); 00157 if (!info) 00158 { 00159 return Invalid; 00160 } 00161 00162 { 00163 ++info->total_calls; 00164 called_ = true; 00165 00166 VoidConstPtr tracked; 00167 if (info->has_tracked_object) 00168 { 00169 tracked = info->tracked_object.lock(); 00170 if (!tracked) 00171 { 00172 return Invalid; 00173 } 00174 } 00175 00176 E event; 00177 event.last_expected = last_expected_; 00178 event.last_real = last_real_; 00179 event.current_expected = current_expected_; 00180 event.current_real = T::now(); 00181 event.profile.last_duration = info->last_cb_duration; 00182 00183 WallTime cb_start = WallTime::now(); 00184 info->callback(event); 00185 WallTime cb_end = WallTime::now(); 00186 info->last_cb_duration = cb_end - cb_start; 00187 00188 info->last_real = event.current_real; 00189 00190 parent_->schedule(info); 00191 } 00192 00193 return Success; 00194 } 00195 00196 private: 00197 TimerManager<T, D, E>* parent_; 00198 TimerInfoWPtr info_; 00199 T last_expected_; 00200 T last_real_; 00201 T current_expected_; 00202 00203 bool called_; 00204 }; 00205 }; 00206 00207 template<class T, class D, class E> 00208 TimerManager<T, D, E>::TimerManager() : 00209 new_timer_(false), id_counter_(0), thread_started_(false), quit_(false) 00210 { 00211 00212 } 00213 00214 template<class T, class D, class E> 00215 TimerManager<T, D, E>::~TimerManager() 00216 { 00217 quit_ = true; 00218 { 00219 boost::mutex::scoped_lock lock(timers_mutex_); 00220 timers_cond_.notify_all(); 00221 } 00222 if (thread_started_) 00223 { 00224 thread_.join(); 00225 } 00226 } 00227 00228 template<class T, class D, class E> 00229 bool TimerManager<T, D, E>::waitingCompare(int32_t lhs, int32_t rhs) 00230 { 00231 TimerInfoPtr infol = findTimer(lhs); 00232 TimerInfoPtr infor = findTimer(rhs); 00233 if (!infol || !infor) 00234 { 00235 return infol < infor; 00236 } 00237 00238 return infol->next_expected < infor->next_expected; 00239 } 00240 00241 template<class T, class D, class E> 00242 typename TimerManager<T, D, E>::TimerInfoPtr TimerManager<T, D, E>::findTimer(int32_t handle) 00243 { 00244 typename V_TimerInfo::iterator it = timers_.begin(); 00245 typename V_TimerInfo::iterator end = timers_.end(); 00246 for (; it != end; ++it) 00247 { 00248 if ((*it)->handle == handle) 00249 { 00250 return *it; 00251 } 00252 } 00253 00254 return TimerInfoPtr(); 00255 } 00256 00257 template<class T, class D, class E> 00258 bool TimerManager<T, D, E>::hasPending(int32_t handle) 00259 { 00260 boost::mutex::scoped_lock lock(timers_mutex_); 00261 TimerInfoPtr info = findTimer(handle); 00262 00263 if (!info) 00264 { 00265 return false; 00266 } 00267 00268 if (info->has_tracked_object) 00269 { 00270 VoidConstPtr tracked = info->tracked_object.lock(); 00271 if (!tracked) 00272 { 00273 return false; 00274 } 00275 } 00276 00277 boost::mutex::scoped_lock lock2(info->waiting_mutex); 00278 return info->next_expected <= T::now() || info->waiting_callbacks != 0; 00279 } 00280 00281 template<class T, class D, class E> 00282 int32_t TimerManager<T, D, E>::add(const D& period, const boost::function<void(const E&)>& callback, CallbackQueueInterface* callback_queue, 00283 const VoidConstPtr& tracked_object, bool oneshot) 00284 { 00285 TimerInfoPtr info(new TimerInfo); 00286 info->period = period; 00287 info->callback = callback; 00288 info->callback_queue = callback_queue; 00289 info->last_expected = T::now(); 00290 info->next_expected = info->last_expected + period; 00291 info->removed = false; 00292 info->has_tracked_object = false; 00293 info->waiting_callbacks = 0; 00294 info->total_calls = 0; 00295 info->oneshot = oneshot; 00296 if (tracked_object) 00297 { 00298 info->tracked_object = tracked_object; 00299 info->has_tracked_object = true; 00300 } 00301 00302 { 00303 boost::mutex::scoped_lock lock(id_mutex_); 00304 info->handle = id_counter_++; 00305 } 00306 00307 { 00308 boost::mutex::scoped_lock lock(timers_mutex_); 00309 timers_.push_back(info); 00310 00311 if (!thread_started_) 00312 { 00313 thread_ = boost::thread(boost::bind(&TimerManager::threadFunc, this)); 00314 thread_started_ = true; 00315 } 00316 00317 { 00318 boost::mutex::scoped_lock lock(waiting_mutex_); 00319 waiting_.push_back(info->handle); 00320 waiting_.sort(boost::bind(&TimerManager::waitingCompare, this, _1, _2)); 00321 } 00322 00323 new_timer_ = true; 00324 timers_cond_.notify_all(); 00325 } 00326 00327 return info->handle; 00328 } 00329 00330 template<class T, class D, class E> 00331 void TimerManager<T, D, E>::remove(int32_t handle) 00332 { 00333 CallbackQueueInterface* callback_queue = 0; 00334 uint64_t remove_id = 0; 00335 00336 { 00337 boost::mutex::scoped_lock lock(timers_mutex_); 00338 00339 typename V_TimerInfo::iterator it = timers_.begin(); 00340 typename V_TimerInfo::iterator end = timers_.end(); 00341 for (; it != end; ++it) 00342 { 00343 const TimerInfoPtr& info = *it; 00344 if (info->handle == handle) 00345 { 00346 info->removed = true; 00347 callback_queue = info->callback_queue; 00348 remove_id = (uint64_t)info.get(); 00349 timers_.erase(it); 00350 break; 00351 } 00352 } 00353 00354 { 00355 boost::mutex::scoped_lock lock2(waiting_mutex_); 00356 // Remove from the waiting list if it's in it 00357 L_int32::iterator it = std::find(waiting_.begin(), waiting_.end(), handle); 00358 if (it != waiting_.end()) 00359 { 00360 waiting_.erase(it); 00361 } 00362 } 00363 } 00364 00365 if (callback_queue) 00366 { 00367 callback_queue->removeByID(remove_id); 00368 } 00369 } 00370 00371 template<class T, class D, class E> 00372 void TimerManager<T, D, E>::schedule(const TimerInfoPtr& info) 00373 { 00374 { 00375 boost::mutex::scoped_lock lock(waiting_mutex_); 00376 00377 if (info->removed) 00378 { 00379 return; 00380 } 00381 00382 updateNext(info, T::now()); 00383 waiting_.push_back(info->handle); 00384 waiting_.sort(boost::bind(&TimerManager::waitingCompare, this, _1, _2)); 00385 } 00386 00387 { 00388 boost::mutex::scoped_lock lock(timers_mutex_); 00389 new_timer_ = true; 00390 timers_cond_.notify_one(); 00391 } 00392 } 00393 00394 template<class T, class D, class E> 00395 void TimerManager<T, D, E>::updateNext(const TimerInfoPtr& info, const T& current_time) 00396 { 00397 if (info->oneshot) 00398 { 00399 info->next_expected = T(INT_MAX, 999999999); 00400 } 00401 else 00402 { 00403 // Protect against someone having called setPeriod() 00404 // If the next expected time is already past the current time 00405 // don't update it 00406 if (info->next_expected <= current_time) 00407 { 00408 info->last_expected = info->next_expected; 00409 info->next_expected += info->period; 00410 } 00411 00412 // detect time jumping forward, as well as callbacks that are too slow 00413 if (info->next_expected + info->period < current_time) 00414 { 00415 ROS_DEBUG("Time jumped forward by [%f] for timer of period [%f], resetting timer (current=%f, next_expected=%f)", (current_time - info->next_expected).toSec(), info->period.toSec(), current_time.toSec(), info->next_expected.toSec()); 00416 info->next_expected = current_time; 00417 } 00418 } 00419 } 00420 00421 template<class T, class D, class E> 00422 void TimerManager<T, D, E>::setPeriod(int32_t handle, const D& period) 00423 { 00424 boost::mutex::scoped_lock lock(timers_mutex_); 00425 TimerInfoPtr info = findTimer(handle); 00426 00427 if (!info) 00428 { 00429 return; 00430 } 00431 00432 { 00433 boost::mutex::scoped_lock lock(waiting_mutex_); 00434 info->period = period; 00435 info->next_expected = T::now() + period; 00436 00437 waiting_.sort(boost::bind(&TimerManager::waitingCompare, this, _1, _2)); 00438 } 00439 00440 new_timer_ = true; 00441 timers_cond_.notify_one(); 00442 } 00443 00444 template<class T, class D, class E> 00445 void TimerManager<T, D, E>::threadFunc() 00446 { 00447 T current; 00448 while (!quit_) 00449 { 00450 T sleep_end; 00451 00452 boost::mutex::scoped_lock lock(timers_mutex_); 00453 00454 // detect time jumping backwards 00455 if (T::now() < current) 00456 { 00457 ROSCPP_LOG_DEBUG("Time jumped backward, resetting timers"); 00458 00459 current = T::now(); 00460 00461 typename V_TimerInfo::iterator it = timers_.begin(); 00462 typename V_TimerInfo::iterator end = timers_.end(); 00463 for (; it != end; ++it) 00464 { 00465 const TimerInfoPtr& info = *it; 00466 00467 // Timer may have been added after the time jump, so also check if time has jumped past its last call time 00468 if (current < info->last_expected) 00469 { 00470 info->last_expected = current; 00471 info->next_expected = current + info->period; 00472 } 00473 } 00474 } 00475 00476 current = T::now(); 00477 00478 { 00479 boost::mutex::scoped_lock waitlock(waiting_mutex_); 00480 00481 if (waiting_.empty()) 00482 { 00483 sleep_end = current + D(0.1); 00484 } 00485 else 00486 { 00487 TimerInfoPtr info = findTimer(waiting_.front()); 00488 00489 while (!waiting_.empty() && info && info->next_expected <= current) 00490 { 00491 current = T::now(); 00492 00493 //ROS_DEBUG("Scheduling timer callback for timer [%d] of period [%f], [%f] off expected", info->handle, info->period.toSec(), (current - info->next_expected).toSec()); 00494 CallbackInterfacePtr cb(new TimerQueueCallback(this, info, info->last_expected, info->last_real, info->next_expected)); 00495 info->callback_queue->addCallback(cb, (uint64_t)info.get()); 00496 00497 waiting_.pop_front(); 00498 00499 if (waiting_.empty()) 00500 { 00501 break; 00502 } 00503 00504 info = findTimer(waiting_.front()); 00505 } 00506 00507 if (info) 00508 { 00509 sleep_end = info->next_expected; 00510 } 00511 } 00512 } 00513 00514 while (!new_timer_ && T::now() < sleep_end && !quit_) 00515 { 00516 // detect backwards jumps in time 00517 00518 if (T::now() < current) 00519 { 00520 ROSCPP_LOG_DEBUG("Time jumped backwards, breaking out of sleep"); 00521 break; 00522 } 00523 00524 current = T::now(); 00525 00526 if (current >= sleep_end) 00527 { 00528 break; 00529 } 00530 00531 // If we're on simulation time we need to check now() against sleep_end more often than on system time, 00532 // since simulation time may be running faster than real time. 00533 if (!T::isSystemTime()) 00534 { 00535 timers_cond_.timed_wait(lock, boost::posix_time::milliseconds(1)); 00536 } 00537 else 00538 { 00539 // On system time we can simply sleep for the rest of the wait time, since anything else requiring processing will 00540 // signal the condition variable 00541 int32_t remaining_time = std::max((int32_t)((sleep_end - current).toSec() * 1000.0f), 1); 00542 timers_cond_.timed_wait(lock, boost::posix_time::milliseconds(remaining_time)); 00543 } 00544 } 00545 00546 new_timer_ = false; 00547 } 00548 } 00549 00550 } 00551 00552 #endif