timer_manager.h
Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2009, Willow Garage, Inc.
00003  *
00004  * Redistribution and use in source and binary forms, with or without
00005  * modification, are permitted provided that the following conditions are met:
00006  *   * Redistributions of source code must retain the above copyright notice,
00007  *     this list of conditions and the following disclaimer.
00008  *   * Redistributions in binary form must reproduce the above copyright
00009  *     notice, this list of conditions and the following disclaimer in the
00010  *     documentation and/or other materials provided with the distribution.
00011  *   * Neither the names of Stanford University or Willow Garage, Inc. nor the names of its
00012  *     contributors may be used to endorse or promote products derived from
00013  *     this software without specific prior written permission.
00014  *
00015  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00016  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00017  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00018  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00019  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00020  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00021  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00022  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00023  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00024  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00025  * POSSIBILITY OF SUCH DAMAGE.
00026  */
00027 
00028 #ifndef ROSCPP_TIMER_MANAGER_H
00029 #define ROSCPP_TIMER_MANAGER_H
00030 
00031 #include "ros/forwards.h"
00032 #include "ros/time.h"
00033 #include "ros/file_log.h"
00034 
00035 #include <boost/thread/thread.hpp>
00036 #include <boost/thread/mutex.hpp>
00037 #include <boost/thread/recursive_mutex.hpp>
00038 #include <boost/thread/condition_variable.hpp>
00039 
00040 #include "ros/assert.h"
00041 #include "ros/callback_queue_interface.h"
00042 
00043 #include <vector>
00044 #include <list>
00045 
00046 namespace ros
00047 {
00048 
00049 template<class T, class D, class E>
00050 class TimerManager
00051 {
00052 private:
00053   struct TimerInfo
00054   {
00055     int32_t handle;
00056     D period;
00057 
00058     boost::function<void(const E&)> callback;
00059     CallbackQueueInterface* callback_queue;
00060 
00061     WallDuration last_cb_duration;
00062 
00063     T last_expected;
00064     T next_expected;
00065 
00066     T last_real;
00067 
00068     bool removed;
00069 
00070     VoidConstWPtr tracked_object;
00071     bool has_tracked_object;
00072 
00073     // TODO: atomicize
00074     boost::mutex waiting_mutex;
00075     uint32_t waiting_callbacks;
00076 
00077     bool oneshot;
00078 
00079     // debugging info
00080     uint32_t total_calls;
00081   };
00082   typedef boost::shared_ptr<TimerInfo> TimerInfoPtr;
00083   typedef boost::weak_ptr<TimerInfo> TimerInfoWPtr;
00084   typedef std::vector<TimerInfoPtr> V_TimerInfo;
00085 
00086   typedef std::list<int32_t> L_int32;
00087 
00088 public:
00089   TimerManager();
00090   ~TimerManager();
00091 
00092   int32_t add(const D& period, const boost::function<void(const E&)>& callback, CallbackQueueInterface* callback_queue, const VoidConstPtr& tracked_object, bool oneshot);
00093   void remove(int32_t handle);
00094 
00095   bool hasPending(int32_t handle);
00096   void setPeriod(int32_t handle, const D& period);
00097 
00098   static TimerManager& global()
00099   {
00100     static TimerManager<T, D, E> global;
00101     return global;
00102   }
00103 
00104 private:
00105   void threadFunc();
00106 
00107   bool waitingCompare(int32_t lhs, int32_t rhs);
00108   TimerInfoPtr findTimer(int32_t handle);
00109   void schedule(const TimerInfoPtr& info);
00110   void updateNext(const TimerInfoPtr& info, const T& current_time);
00111 
00112   V_TimerInfo timers_;
00113   boost::mutex timers_mutex_;
00114   boost::condition_variable timers_cond_;
00115   volatile bool new_timer_;
00116 
00117   boost::mutex waiting_mutex_;
00118   L_int32 waiting_;
00119 
00120   uint32_t id_counter_;
00121   boost::mutex id_mutex_;
00122 
00123   bool thread_started_;
00124 
00125   boost::thread thread_;
00126 
00127   bool quit_;
00128 
00129   class TimerQueueCallback : public CallbackInterface
00130   {
00131   public:
00132     TimerQueueCallback(TimerManager<T, D, E>* parent, const TimerInfoPtr& info, T last_expected, T last_real, T current_expected)
00133     : parent_(parent)
00134     , info_(info)
00135     , last_expected_(last_expected)
00136     , last_real_(last_real)
00137     , current_expected_(current_expected)
00138     , called_(false)
00139     {
00140       boost::mutex::scoped_lock lock(info->waiting_mutex);
00141       ++info->waiting_callbacks;
00142     }
00143 
00144     ~TimerQueueCallback()
00145     {
00146       TimerInfoPtr info = info_.lock();
00147       if (info)
00148       {
00149         boost::mutex::scoped_lock lock(info->waiting_mutex);
00150         --info->waiting_callbacks;
00151       }
00152     }
00153 
00154     CallResult call()
00155     {
00156       TimerInfoPtr info = info_.lock();
00157       if (!info)
00158       {
00159         return Invalid;
00160       }
00161 
00162       {
00163         ++info->total_calls;
00164         called_ = true;
00165 
00166         VoidConstPtr tracked;
00167         if (info->has_tracked_object)
00168         {
00169           tracked = info->tracked_object.lock();
00170           if (!tracked)
00171           {
00172             return Invalid;
00173           }
00174         }
00175 
00176         E event;
00177         event.last_expected = last_expected_;
00178         event.last_real = last_real_;
00179         event.current_expected = current_expected_;
00180         event.current_real = T::now();
00181         event.profile.last_duration = info->last_cb_duration;
00182 
00183         WallTime cb_start = WallTime::now();
00184         info->callback(event);
00185         WallTime cb_end = WallTime::now();
00186         info->last_cb_duration = cb_end - cb_start;
00187 
00188         info->last_real = event.current_real;
00189 
00190         parent_->schedule(info);
00191       }
00192 
00193       return Success;
00194     }
00195 
00196   private:
00197     TimerManager<T, D, E>* parent_;
00198     TimerInfoWPtr info_;
00199     T last_expected_;
00200     T last_real_;
00201     T current_expected_;
00202 
00203     bool called_;
00204   };
00205 };
00206 
00207 template<class T, class D, class E>
00208 TimerManager<T, D, E>::TimerManager() :
00209   new_timer_(false), id_counter_(0), thread_started_(false), quit_(false)
00210 {
00211 
00212 }
00213 
00214 template<class T, class D, class E>
00215 TimerManager<T, D, E>::~TimerManager()
00216 {
00217   quit_ = true;
00218   {
00219     boost::mutex::scoped_lock lock(timers_mutex_);
00220     timers_cond_.notify_all();
00221   }
00222   if (thread_started_)
00223   {
00224     thread_.join();
00225   }
00226 }
00227 
00228 template<class T, class D, class E>
00229 bool TimerManager<T, D, E>::waitingCompare(int32_t lhs, int32_t rhs)
00230 {
00231   TimerInfoPtr infol = findTimer(lhs);
00232   TimerInfoPtr infor = findTimer(rhs);
00233   if (!infol || !infor)
00234   {
00235     return infol < infor;
00236   }
00237 
00238   return infol->next_expected < infor->next_expected;
00239 }
00240 
00241 template<class T, class D, class E>
00242 typename TimerManager<T, D, E>::TimerInfoPtr TimerManager<T, D, E>::findTimer(int32_t handle)
00243 {
00244   typename V_TimerInfo::iterator it = timers_.begin();
00245   typename V_TimerInfo::iterator end = timers_.end();
00246   for (; it != end; ++it)
00247   {
00248     if ((*it)->handle == handle)
00249     {
00250       return *it;
00251     }
00252   }
00253 
00254   return TimerInfoPtr();
00255 }
00256 
00257 template<class T, class D, class E>
00258 bool TimerManager<T, D, E>::hasPending(int32_t handle)
00259 {
00260   boost::mutex::scoped_lock lock(timers_mutex_);
00261   TimerInfoPtr info = findTimer(handle);
00262 
00263   if (!info)
00264   {
00265     return false;
00266   }
00267 
00268   if (info->has_tracked_object)
00269   {
00270     VoidConstPtr tracked = info->tracked_object.lock();
00271     if (!tracked)
00272     {
00273       return false;
00274     }
00275   }
00276 
00277   boost::mutex::scoped_lock lock2(info->waiting_mutex);
00278   return info->next_expected <= T::now() || info->waiting_callbacks != 0;
00279 }
00280 
00281 template<class T, class D, class E>
00282 int32_t TimerManager<T, D, E>::add(const D& period, const boost::function<void(const E&)>& callback, CallbackQueueInterface* callback_queue,
00283                                    const VoidConstPtr& tracked_object, bool oneshot)
00284 {
00285   TimerInfoPtr info(new TimerInfo);
00286   info->period = period;
00287   info->callback = callback;
00288   info->callback_queue = callback_queue;
00289   info->last_expected = T::now();
00290   info->next_expected = info->last_expected + period;
00291   info->removed = false;
00292   info->has_tracked_object = false;
00293   info->waiting_callbacks = 0;
00294   info->total_calls = 0;
00295   info->oneshot = oneshot;
00296   if (tracked_object)
00297   {
00298     info->tracked_object = tracked_object;
00299     info->has_tracked_object = true;
00300   }
00301 
00302   {
00303     boost::mutex::scoped_lock lock(id_mutex_);
00304     info->handle = id_counter_++;
00305   }
00306 
00307   {
00308     boost::mutex::scoped_lock lock(timers_mutex_);
00309     timers_.push_back(info);
00310 
00311     if (!thread_started_)
00312     {
00313       thread_ = boost::thread(boost::bind(&TimerManager::threadFunc, this));
00314       thread_started_ = true;
00315     }
00316 
00317     {
00318       boost::mutex::scoped_lock lock(waiting_mutex_);
00319       waiting_.push_back(info->handle);
00320       waiting_.sort(boost::bind(&TimerManager::waitingCompare, this, _1, _2));
00321     }
00322 
00323     new_timer_ = true;
00324     timers_cond_.notify_all();
00325   }
00326 
00327   return info->handle;
00328 }
00329 
00330 template<class T, class D, class E>
00331 void TimerManager<T, D, E>::remove(int32_t handle)
00332 {
00333   CallbackQueueInterface* callback_queue = 0;
00334   uint64_t remove_id = 0;
00335 
00336   {
00337     boost::mutex::scoped_lock lock(timers_mutex_);
00338 
00339     typename V_TimerInfo::iterator it = timers_.begin();
00340     typename V_TimerInfo::iterator end = timers_.end();
00341     for (; it != end; ++it)
00342     {
00343       const TimerInfoPtr& info = *it;
00344       if (info->handle == handle)
00345       {
00346         info->removed = true;
00347         callback_queue = info->callback_queue;
00348         remove_id = (uint64_t)info.get();
00349         timers_.erase(it);
00350         break;
00351       }
00352     }
00353 
00354     {
00355       boost::mutex::scoped_lock lock2(waiting_mutex_);
00356       // Remove from the waiting list if it's in it
00357       L_int32::iterator it = std::find(waiting_.begin(), waiting_.end(), handle);
00358       if (it != waiting_.end())
00359       {
00360         waiting_.erase(it);
00361       }
00362     }
00363   }
00364 
00365   if (callback_queue)
00366   {
00367     callback_queue->removeByID(remove_id);
00368   }
00369 }
00370 
00371 template<class T, class D, class E>
00372 void TimerManager<T, D, E>::schedule(const TimerInfoPtr& info)
00373 {
00374   boost::mutex::scoped_lock lock(timers_mutex_);
00375 
00376   if (info->removed)
00377   {
00378     return;
00379   }
00380 
00381   updateNext(info, T::now());
00382   {
00383     boost::mutex::scoped_lock lock(waiting_mutex_);
00384 
00385     waiting_.push_back(info->handle);
00386     // waitingCompare requires a lock on the timers_mutex_
00387     waiting_.sort(boost::bind(&TimerManager::waitingCompare, this, _1, _2));
00388   }
00389 
00390   new_timer_ = true;
00391   timers_cond_.notify_one();
00392 }
00393 
00394 template<class T, class D, class E>
00395 void TimerManager<T, D, E>::updateNext(const TimerInfoPtr& info, const T& current_time)
00396 {
00397   if (info->oneshot)
00398   {
00399     info->next_expected = T(INT_MAX, 999999999);
00400   }
00401   else
00402   {
00403     // Protect against someone having called setPeriod()
00404     // If the next expected time is already past the current time
00405     // don't update it
00406     if (info->next_expected <= current_time)
00407     {
00408       info->last_expected = info->next_expected;
00409       info->next_expected += info->period;
00410     }
00411 
00412     // detect time jumping forward, as well as callbacks that are too slow
00413     if (info->next_expected + info->period < current_time)
00414     {
00415       ROS_DEBUG("Time jumped forward by [%f] for timer of period [%f], resetting timer (current=%f, next_expected=%f)", (current_time - info->next_expected).toSec(), info->period.toSec(), current_time.toSec(), info->next_expected.toSec());
00416       info->next_expected = current_time;
00417     }
00418   }
00419 }
00420 
00421 template<class T, class D, class E>
00422 void TimerManager<T, D, E>::setPeriod(int32_t handle, const D& period)
00423 {
00424   boost::mutex::scoped_lock lock(timers_mutex_);
00425   TimerInfoPtr info = findTimer(handle);
00426 
00427   if (!info)
00428   {
00429     return;
00430   }
00431 
00432   {
00433     boost::mutex::scoped_lock lock(waiting_mutex_);
00434     info->period = period;
00435     info->next_expected = T::now() + period;
00436 
00437     waiting_.sort(boost::bind(&TimerManager::waitingCompare, this, _1, _2));
00438   }
00439 
00440   new_timer_ = true;
00441   timers_cond_.notify_one();
00442 }
00443 
00444 template<class T, class D, class E>
00445 void TimerManager<T, D, E>::threadFunc()
00446 {
00447   T current;
00448   while (!quit_)
00449   {
00450     T sleep_end;
00451 
00452     boost::mutex::scoped_lock lock(timers_mutex_);
00453 
00454     // detect time jumping backwards
00455     if (T::now() < current)
00456     {
00457       ROSCPP_LOG_DEBUG("Time jumped backward, resetting timers");
00458 
00459       current = T::now();
00460 
00461       typename V_TimerInfo::iterator it = timers_.begin();
00462       typename V_TimerInfo::iterator end = timers_.end();
00463       for (; it != end; ++it)
00464       {
00465         const TimerInfoPtr& info = *it;
00466 
00467         // Timer may have been added after the time jump, so also check if time has jumped past its last call time
00468         if (current < info->last_expected)
00469         {
00470           info->last_expected = current;
00471           info->next_expected = current + info->period;
00472         }
00473       }
00474     }
00475 
00476     current = T::now();
00477 
00478     {
00479       boost::mutex::scoped_lock waitlock(waiting_mutex_);
00480 
00481       if (waiting_.empty())
00482       {
00483         sleep_end = current + D(0.1);
00484       }
00485       else
00486       {
00487         TimerInfoPtr info = findTimer(waiting_.front());
00488 
00489         while (!waiting_.empty() && info && info->next_expected <= current)
00490         {
00491           current = T::now();
00492 
00493           //ROS_DEBUG("Scheduling timer callback for timer [%d] of period [%f], [%f] off expected", info->handle, info->period.toSec(), (current - info->next_expected).toSec());
00494           CallbackInterfacePtr cb(new TimerQueueCallback(this, info, info->last_expected, info->last_real, info->next_expected));
00495           info->callback_queue->addCallback(cb, (uint64_t)info.get());
00496 
00497           waiting_.pop_front();
00498 
00499           if (waiting_.empty())
00500           {
00501             break;
00502           }
00503 
00504           info = findTimer(waiting_.front());
00505         }
00506 
00507         if (info)
00508         {
00509           sleep_end = info->next_expected;
00510         }
00511       }
00512     }
00513 
00514     while (!new_timer_ && T::now() < sleep_end && !quit_)
00515     {
00516       // detect backwards jumps in time
00517 
00518       if (T::now() < current)
00519       {
00520         ROSCPP_LOG_DEBUG("Time jumped backwards, breaking out of sleep");
00521         break;
00522       }
00523 
00524       current = T::now();
00525 
00526       if (current >= sleep_end)
00527       {
00528         break;
00529       }
00530 
00531       // If we're on simulation time we need to check now() against sleep_end more often than on system time,
00532       // since simulation time may be running faster than real time.
00533       if (!T::isSystemTime())
00534       {
00535         timers_cond_.timed_wait(lock, boost::posix_time::milliseconds(1));
00536       }
00537       else
00538       {
00539         // On system time we can simply sleep for the rest of the wait time, since anything else requiring processing will
00540         // signal the condition variable
00541         int32_t remaining_time = std::max((int32_t)((sleep_end - current).toSec() * 1000.0f), 1);
00542         timers_cond_.timed_wait(lock, boost::posix_time::milliseconds(remaining_time));
00543       }
00544     }
00545 
00546     new_timer_ = false;
00547   }
00548 }
00549 
00550 }
00551 
00552 #endif


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Fri Aug 28 2015 12:33:11