timer_queue.h
Go to the documentation of this file.
00001 #ifndef TIMERQUEUE_H
00002 #define TIMERQUEUE_H
00003 
00004 #include <mutex>
00005 #include <condition_variable>
00006 #include <thread>
00007 #include <queue>
00008 #include <chrono>
00009 #include <assert.h>
00010 
00011 namespace BT
00012 {
00013 // http://www.crazygaze.com/blog/2016/03/24/portable-c-timer-queue/
00014 
00015 namespace details
00016 {
00017 class Semaphore
00018 {
00019   public:
00020     Semaphore(unsigned int count = 0) : m_count(count)
00021     {
00022     }
00023 
00024     void notify()
00025     {
00026         std::unique_lock<std::mutex> lock(m_mtx);
00027         m_count++;
00028         m_cv.notify_one();
00029     }
00030 
00031     void wait()
00032     {
00033         std::unique_lock<std::mutex> lock(m_mtx);
00034         m_cv.wait(lock, [this]() { return m_count > 0; });
00035         m_count--;
00036     }
00037 
00038     template <class Clock, class Duration>
00039     bool waitUntil(const std::chrono::time_point<Clock, Duration>& point)
00040     {
00041         std::unique_lock<std::mutex> lock(m_mtx);
00042         if (!m_cv.wait_until(lock, point, [this]() { return m_count > 0; }))
00043             return false;
00044         m_count--;
00045         return true;
00046     }
00047 
00048   private:
00049     std::mutex m_mtx;
00050     std::condition_variable m_cv;
00051     unsigned int m_count;
00052 };
00053 }
00054 
00055 // Timer Queue
00056 //
00057 // Allows execution of handlers at a specified time in the future
00058 // Guarantees:
00059 //  - All handlers are executed ONCE, even if canceled (aborted parameter will
00060 //be set to true)
00061 //      - If TimerQueue is destroyed, it will cancel all handlers.
00062 //  - Handlers are ALWAYS executed in the Timer Queue worker thread.
00063 //  - Handlers execution order is NOT guaranteed
00064 //
00065 class TimerQueue
00066 {
00067   public:
00068     TimerQueue()
00069     {
00070         m_th = std::thread([this] { run(); });
00071     }
00072 
00073     ~TimerQueue()
00074     {
00075         cancelAll();
00076         // Abusing the timer queue to trigger the shutdown.
00077         add(std::chrono::milliseconds(0), [this](bool) { m_finish = true; });
00078         m_th.join();
00079     }
00080 
00082     // \return
00083     //  Returns the ID of the new timer. You can use this ID to cancel the
00084     // timer
00085     uint64_t add(std::chrono::milliseconds milliseconds, std::function<void(bool)> handler)
00086     {
00087         WorkItem item;
00088         item.end = Clock::now() + milliseconds;
00089         item.handler = std::move(handler);
00090 
00091         std::unique_lock<std::mutex> lk(m_mtx);
00092         uint64_t id = ++m_idcounter;
00093         item.id = id;
00094         m_items.push(std::move(item));
00095         lk.unlock();
00096 
00097         // Something changed, so wake up timer thread
00098         m_checkWork.notify();
00099         return id;
00100     }
00101 
00103     // \return
00104     //  1 if the timer was cancelled.
00105     //  0 if you were too late to cancel (or the timer ID was never valid to
00106     // start with)
00107     size_t cancel(uint64_t id)
00108     {
00109         // Instead of removing the item from the container (thus breaking the
00110         // heap integrity), we set the item as having no handler, and put
00111         // that handler on a new item at the top for immediate execution
00112         // The timer thread will then ignore the original item, since it has no
00113         // handler.
00114         std::unique_lock<std::mutex> lk(m_mtx);
00115         for (auto&& item : m_items.getContainer())
00116         {
00117             if (item.id == id && item.handler)
00118             {
00119                 WorkItem newItem;
00120                 // Zero time, so it stays at the top for immediate execution
00121                 newItem.end = Clock::time_point();
00122                 newItem.id = 0;   // Means it is a canceled item
00123                 // Move the handler from item to newitem.
00124                 // Also, we need to manually set the handler to nullptr, since
00125                 // the standard does not guarantee moving an std::function will
00126                 // empty it. Some STL implementation will empty it, others will
00127                 // not.
00128                 newItem.handler = std::move(item.handler);
00129                 item.handler = nullptr;
00130                 m_items.push(std::move(newItem));
00131 
00132                 lk.unlock();
00133                 // Something changed, so wake up timer thread
00134                 m_checkWork.notify();
00135                 return 1;
00136             }
00137         }
00138         return 0;
00139     }
00140 
00142     // \return
00143     //  The number of timers cancelled
00144     size_t cancelAll()
00145     {
00146         // Setting all "end" to 0 (for immediate execution) is ok,
00147         // since it maintains the heap integrity
00148         std::unique_lock<std::mutex> lk(m_mtx);
00149         for (auto&& item : m_items.getContainer())
00150         {
00151             if (item.id)
00152             {
00153                 item.end = Clock::time_point();
00154                 item.id = 0;
00155             }
00156         }
00157         auto ret = m_items.size();
00158 
00159         lk.unlock();
00160         m_checkWork.notify();
00161         return ret;
00162     }
00163 
00164   private:
00165     using Clock = std::chrono::steady_clock;
00166     TimerQueue(const TimerQueue&) = delete;
00167     TimerQueue& operator=(const TimerQueue&) = delete;
00168 
00169     void run()
00170     {
00171         while (!m_finish)
00172         {
00173             auto end = calcWaitTime();
00174             if (end.first)
00175             {
00176                 // Timers found, so wait until it expires (or something else
00177                 // changes)
00178                 m_checkWork.waitUntil(end.second);
00179             }
00180             else
00181             {
00182                 // No timers exist, so wait forever until something changes
00183                 m_checkWork.wait();
00184             }
00185 
00186             // Check and execute as much work as possible, such as, all expired
00187             // timers
00188             checkWork();
00189         }
00190 
00191         // If we are shutting down, we should not have any items left,
00192         // since the shutdown cancels all items
00193         assert(m_items.size() == 0);
00194     }
00195 
00196     std::pair<bool, Clock::time_point> calcWaitTime()
00197     {
00198         std::lock_guard<std::mutex> lk(m_mtx);
00199         while (m_items.size())
00200         {
00201             if (m_items.top().handler)
00202             {
00203                 // Item present, so return the new wait time
00204                 return std::make_pair(true, m_items.top().end);
00205             }
00206             else
00207             {
00208                 // Discard empty handlers (they were cancelled)
00209                 m_items.pop();
00210             }
00211         }
00212 
00213         // No items found, so return no wait time (causes the thread to wait
00214         // indefinitely)
00215         return std::make_pair(false, Clock::time_point());
00216     }
00217 
00218     void checkWork()
00219     {
00220         std::unique_lock<std::mutex> lk(m_mtx);
00221         while (m_items.size() && m_items.top().end <= Clock::now())
00222         {
00223             WorkItem item(std::move(m_items.top()));
00224             m_items.pop();
00225 
00226             lk.unlock();
00227             if (item.handler)
00228                 item.handler(item.id == 0);
00229             lk.lock();
00230         }
00231     }
00232 
00233     details::Semaphore m_checkWork;
00234     std::thread m_th;
00235     bool m_finish = false;
00236     uint64_t m_idcounter = 0;
00237 
00238     struct WorkItem
00239     {
00240         Clock::time_point end;
00241         uint64_t id;   // id==0 means it was cancelled
00242         std::function<void(bool)> handler;
00243         bool operator>(const WorkItem& other) const
00244         {
00245             return end > other.end;
00246         }
00247     };
00248 
00249     std::mutex m_mtx;
00250     // Inheriting from priority_queue, so we can access the internal container
00251     class Queue
00252       : public std::priority_queue<WorkItem, std::vector<WorkItem>, std::greater<WorkItem>>
00253     {
00254       public:
00255         std::vector<WorkItem>& getContainer()
00256         {
00257             return this->c;
00258         }
00259     } m_items;
00260 };
00261 }
00262 
00263 #endif   // TIMERQUEUE_H


behaviortree_cpp
Author(s): Michele Colledanchise, Davide Faconti
autogenerated on Sat Jun 8 2019 20:17:15