timer_queue.h
Go to the documentation of this file.
1 #ifndef TIMERQUEUE_H
2 #define TIMERQUEUE_H
3 
4 #include <mutex>
5 #include <condition_variable>
6 #include <thread>
7 #include <queue>
8 #include <chrono>
9 #include <assert.h>
10 
11 namespace BT
12 {
13 // http://www.crazygaze.com/blog/2016/03/24/portable-c-timer-queue/
14 
15 namespace details
16 {
17 class Semaphore
18 {
19  public:
20  Semaphore(unsigned int count = 0) : m_count(count)
21  {
22  }
23 
24  void notify()
25  {
26  std::unique_lock<std::mutex> lock(m_mtx);
27  m_count++;
28  m_cv.notify_one();
29  }
30 
31  void wait()
32  {
33  std::unique_lock<std::mutex> lock(m_mtx);
34  m_cv.wait(lock, [this]() { return m_count > 0; });
35  m_count--;
36  }
37 
38  template <class Clock, class Duration>
39  bool waitUntil(const std::chrono::time_point<Clock, Duration>& point)
40  {
41  std::unique_lock<std::mutex> lock(m_mtx);
42  if (!m_cv.wait_until(lock, point, [this]() { return m_count > 0; }))
43  return false;
44  m_count--;
45  return true;
46  }
47 
48  private:
50  std::condition_variable m_cv;
51  unsigned int m_count;
52 };
53 }
54 
55 // Timer Queue
56 //
57 // Allows execution of handlers at a specified time in the future
58 // Guarantees:
59 // - All handlers are executed ONCE, even if canceled (aborted parameter will
60 //be set to true)
61 // - If TimerQueue is destroyed, it will cancel all handlers.
62 // - Handlers are ALWAYS executed in the Timer Queue worker thread.
63 // - Handlers execution order is NOT guaranteed
64 //
66 {
67  public:
69  {
70  m_th = std::thread([this] { run(); });
71  }
72 
74  {
75  cancelAll();
76  // Abusing the timer queue to trigger the shutdown.
77  add(std::chrono::milliseconds(0), [this](bool) { m_finish = true; });
78  m_th.join();
79  }
80 
82  // \return
83  // Returns the ID of the new timer. You can use this ID to cancel the
84  // timer
85  uint64_t add(std::chrono::milliseconds milliseconds, std::function<void(bool)> handler)
86  {
87  WorkItem item;
88  item.end = Clock::now() + milliseconds;
89  item.handler = std::move(handler);
90 
91  std::unique_lock<std::mutex> lk(m_mtx);
92  uint64_t id = ++m_idcounter;
93  item.id = id;
94  m_items.push(std::move(item));
95  lk.unlock();
96 
97  // Something changed, so wake up timer thread
98  m_checkWork.notify();
99  return id;
100  }
101 
103  // \return
104  // 1 if the timer was cancelled.
105  // 0 if you were too late to cancel (or the timer ID was never valid to
106  // start with)
107  size_t cancel(uint64_t id)
108  {
109  // Instead of removing the item from the container (thus breaking the
110  // heap integrity), we set the item as having no handler, and put
111  // that handler on a new item at the top for immediate execution
112  // The timer thread will then ignore the original item, since it has no
113  // handler.
114  std::unique_lock<std::mutex> lk(m_mtx);
115  for (auto&& item : m_items.getContainer())
116  {
117  if (item.id == id && item.handler)
118  {
119  WorkItem newItem;
120  // Zero time, so it stays at the top for immediate execution
121  newItem.end = Clock::time_point();
122  newItem.id = 0; // Means it is a canceled item
123  // Move the handler from item to newitem.
124  // Also, we need to manually set the handler to nullptr, since
125  // the standard does not guarantee moving an std::function will
126  // empty it. Some STL implementation will empty it, others will
127  // not.
128  newItem.handler = std::move(item.handler);
129  item.handler = nullptr;
130  m_items.push(std::move(newItem));
131 
132  lk.unlock();
133  // Something changed, so wake up timer thread
134  m_checkWork.notify();
135  return 1;
136  }
137  }
138  return 0;
139  }
140 
142  // \return
143  // The number of timers cancelled
144  size_t cancelAll()
145  {
146  // Setting all "end" to 0 (for immediate execution) is ok,
147  // since it maintains the heap integrity
148  std::unique_lock<std::mutex> lk(m_mtx);
149  for (auto&& item : m_items.getContainer())
150  {
151  if (item.id)
152  {
153  item.end = Clock::time_point();
154  item.id = 0;
155  }
156  }
157  auto ret = m_items.size();
158 
159  lk.unlock();
160  m_checkWork.notify();
161  return ret;
162  }
163 
164  private:
165  using Clock = std::chrono::steady_clock;
166  TimerQueue(const TimerQueue&) = delete;
167  TimerQueue& operator=(const TimerQueue&) = delete;
168 
169  void run()
170  {
171  while (!m_finish)
172  {
173  auto end = calcWaitTime();
174  if (end.first)
175  {
176  // Timers found, so wait until it expires (or something else
177  // changes)
178  m_checkWork.waitUntil(end.second);
179  }
180  else
181  {
182  // No timers exist, so wait forever until something changes
183  m_checkWork.wait();
184  }
185 
186  // Check and execute as much work as possible, such as, all expired
187  // timers
188  checkWork();
189  }
190 
191  // If we are shutting down, we should not have any items left,
192  // since the shutdown cancels all items
193  assert(m_items.size() == 0);
194  }
195 
196  std::pair<bool, Clock::time_point> calcWaitTime()
197  {
198  std::lock_guard<std::mutex> lk(m_mtx);
199  while (m_items.size())
200  {
201  if (m_items.top().handler)
202  {
203  // Item present, so return the new wait time
204  return std::make_pair(true, m_items.top().end);
205  }
206  else
207  {
208  // Discard empty handlers (they were cancelled)
209  m_items.pop();
210  }
211  }
212 
213  // No items found, so return no wait time (causes the thread to wait
214  // indefinitely)
215  return std::make_pair(false, Clock::time_point());
216  }
217 
218  void checkWork()
219  {
220  std::unique_lock<std::mutex> lk(m_mtx);
221  while (m_items.size() && m_items.top().end <= Clock::now())
222  {
223  WorkItem item(std::move(m_items.top()));
224  m_items.pop();
225 
226  lk.unlock();
227  if (item.handler)
228  item.handler(item.id == 0);
229  lk.lock();
230  }
231  }
232 
234  std::thread m_th;
235  bool m_finish = false;
236  uint64_t m_idcounter = 0;
237 
238  struct WorkItem
239  {
240  Clock::time_point end;
241  uint64_t id; // id==0 means it was cancelled
242  std::function<void(bool)> handler;
243  bool operator>(const WorkItem& other) const
244  {
245  return end > other.end;
246  }
247  };
248 
250  // Inheriting from priority_queue, so we can access the internal container
251  class Queue
252  : public std::priority_queue<WorkItem, std::vector<WorkItem>, std::greater<WorkItem>>
253  {
254  public:
255  std::vector<WorkItem>& getContainer()
256  {
257  return this->c;
258  }
259  } m_items;
260 };
261 }
262 
263 #endif // TIMERQUEUE_H
Clock::time_point end
Definition: timer_queue.h:240
size_t cancelAll()
Cancels all timers.
Definition: timer_queue.h:144
std::condition_variable m_cv
Definition: timer_queue.h:50
bool waitUntil(const std::chrono::time_point< Clock, Duration > &point)
Definition: timer_queue.h:39
static pthread_mutex_t mutex
Definition: minitrace.cpp:61
Semaphore(unsigned int count=0)
Definition: timer_queue.h:20
std::function< void(bool)> handler
Definition: timer_queue.h:242
std::thread m_th
Definition: timer_queue.h:234
uint64_t add(std::chrono::milliseconds milliseconds, std::function< void(bool)> handler)
Adds a new timer.
Definition: timer_queue.h:85
std::chrono::steady_clock Clock
Definition: timer_queue.h:165
unsigned int m_count
Definition: timer_queue.h:51
details::Semaphore m_checkWork
Definition: timer_queue.h:233
std::mutex m_mtx
Definition: timer_queue.h:249
static volatile int count
Definition: minitrace.cpp:55
std::pair< bool, Clock::time_point > calcWaitTime()
Definition: timer_queue.h:196
std::vector< WorkItem > & getContainer()
Definition: timer_queue.h:255
bool operator>(const WorkItem &other) const
Definition: timer_queue.h:243
size_t cancel(uint64_t id)
Cancels the specified timer.
Definition: timer_queue.h:107
void checkWork()
Definition: timer_queue.h:218


behaviortree_cpp
Author(s): Michele Colledanchise, Davide Faconti
autogenerated on Sun Feb 3 2019 03:14:32