timer_queue.h
Go to the documentation of this file.
1 #ifndef TIMERQUEUE_H
2 #define TIMERQUEUE_H
3 
4 #include <mutex>
5 #include <condition_variable>
6 #include <thread>
7 #include <queue>
8 #include <chrono>
9 #include <assert.h>
10 
11 namespace BT
12 {
13 // http://www.crazygaze.com/blog/2016/03/24/portable-c-timer-queue/
14 
15 namespace details
16 {
17 class Semaphore
18 {
19  public:
20  Semaphore(unsigned int count = 0) : m_count(count)
21  {
22  }
23 
24  void notify()
25  {
26  std::unique_lock<std::mutex> lock(m_mtx);
27  m_count++;
28  m_cv.notify_one();
29  }
30 
31  void wait()
32  {
33  std::unique_lock<std::mutex> lock(m_mtx);
34  m_cv.wait(lock, [this]() { return m_count > 0; });
35  m_count--;
36  }
37 
38  template <class Clock, class Duration>
39  bool waitUntil(const std::chrono::time_point<Clock, Duration>& point)
40  {
41  std::unique_lock<std::mutex> lock(m_mtx);
42  if (!m_cv.wait_until(lock, point, [this]() { return m_count > 0; }))
43  return false;
44  m_count--;
45  return true;
46  }
47 
48  private:
50  std::condition_variable m_cv;
51  unsigned int m_count;
52 };
53 }
54 
55 // Timer Queue
56 //
57 // Allows execution of handlers at a specified time in the future
58 // Guarantees:
59 // - All handlers are executed ONCE, even if canceled (aborted parameter will
60 //be set to true)
61 // - If TimerQueue is destroyed, it will cancel all handlers.
62 // - Handlers are ALWAYS executed in the Timer Queue worker thread.
63 // - Handlers execution order is NOT guaranteed
64 //
65 template <typename _Clock = std::chrono::steady_clock, typename _Duration = std::chrono::steady_clock::duration>
67 {
68  public:
70  {
71  m_th = std::thread([this] { run(); });
72  }
73 
75  {
76  cancelAll();
77  // Abusing the timer queue to trigger the shutdown.
78  add(std::chrono::milliseconds(0), [this](bool) { m_finish = true; });
79  m_th.join();
80  }
81 
83  // \return
84  // Returns the ID of the new timer. You can use this ID to cancel the
85  // timer
86  uint64_t add(std::chrono::milliseconds milliseconds, std::function<void(bool)> handler)
87  {
88  WorkItem item;
89  item.end = _Clock::now() + milliseconds;
90  item.handler = std::move(handler);
91 
92  std::unique_lock<std::mutex> lk(m_mtx);
93  uint64_t id = ++m_idcounter;
94  item.id = id;
95  m_items.push(std::move(item));
96  lk.unlock();
97 
98  // Something changed, so wake up timer thread
99  m_checkWork.notify();
100  return id;
101  }
102 
104  // \return
105  // 1 if the timer was cancelled.
106  // 0 if you were too late to cancel (or the timer ID was never valid to
107  // start with)
108  size_t cancel(uint64_t id)
109  {
110  // Instead of removing the item from the container (thus breaking the
111  // heap integrity), we set the item as having no handler, and put
112  // that handler on a new item at the top for immediate execution
113  // The timer thread will then ignore the original item, since it has no
114  // handler.
115  std::unique_lock<std::mutex> lk(m_mtx);
116  for (auto&& item : m_items.getContainer())
117  {
118  if (item.id == id && item.handler)
119  {
120  WorkItem newItem;
121  // Zero time, so it stays at the top for immediate execution
122  newItem.end = std::chrono::time_point<_Clock, _Duration>();
123  newItem.id = 0; // Means it is a canceled item
124  // Move the handler from item to newitem.
125  // Also, we need to manually set the handler to nullptr, since
126  // the standard does not guarantee moving an std::function will
127  // empty it. Some STL implementation will empty it, others will
128  // not.
129  newItem.handler = std::move(item.handler);
130  item.handler = nullptr;
131  m_items.push(std::move(newItem));
132 
133  lk.unlock();
134  // Something changed, so wake up timer thread
135  m_checkWork.notify();
136  return 1;
137  }
138  }
139  return 0;
140  }
141 
143  // \return
144  // The number of timers cancelled
145  size_t cancelAll()
146  {
147  // Setting all "end" to 0 (for immediate execution) is ok,
148  // since it maintains the heap integrity
149  std::unique_lock<std::mutex> lk(m_mtx);
150  for (auto&& item : m_items.getContainer())
151  {
152  if (item.id)
153  {
154  item.end = std::chrono::time_point<_Clock, _Duration>();
155  item.id = 0;
156  }
157  }
158  auto ret = m_items.size();
159 
160  lk.unlock();
161  m_checkWork.notify();
162  return ret;
163  }
164 
165  private:
166  TimerQueue(const TimerQueue&) = delete;
167  TimerQueue& operator=(const TimerQueue&) = delete;
168 
169  void run()
170  {
171  while (!m_finish)
172  {
173  auto end = calcWaitTime();
174  if (end.first)
175  {
176  // Timers found, so wait until it expires (or something else
177  // changes)
178  m_checkWork.waitUntil(end.second);
179  }
180  else
181  {
182  // No timers exist, so wait forever until something changes
183  m_checkWork.wait();
184  }
185 
186  // Check and execute as much work as possible, such as, all expired
187  // timers
188  checkWork();
189  }
190 
191  // If we are shutting down, we should not have any items left,
192  // since the shutdown cancels all items
193  assert(m_items.size() == 0);
194  }
195 
196  std::pair<bool, std::chrono::time_point<_Clock, _Duration>> calcWaitTime()
197  {
198  std::lock_guard<std::mutex> lk(m_mtx);
199  while (m_items.size())
200  {
201  if (m_items.top().handler)
202  {
203  // Item present, so return the new wait time
204  return std::make_pair(true, m_items.top().end);
205  }
206  else
207  {
208  // Discard empty handlers (they were cancelled)
209  m_items.pop();
210  }
211  }
212 
213  // No items found, so return no wait time (causes the thread to wait
214  // indefinitely)
215  return std::make_pair(false, std::chrono::time_point<_Clock, _Duration>());
216  }
217 
218  void checkWork()
219  {
220  std::unique_lock<std::mutex> lk(m_mtx);
221  while (m_items.size() && m_items.top().end <= _Clock::now())
222  {
223  WorkItem item(std::move(m_items.top()));
224  m_items.pop();
225 
226  lk.unlock();
227  if (item.handler)
228  item.handler(item.id == 0);
229  lk.lock();
230  }
231  }
232 
234  std::thread m_th;
235  bool m_finish = false;
236  uint64_t m_idcounter = 0;
237 
238  struct WorkItem
239  {
240  std::chrono::time_point<_Clock, _Duration> end;
241  uint64_t id; // id==0 means it was cancelled
242  std::function<void(bool)> handler;
243  bool operator>(const WorkItem& other) const
244  {
245  return end > other.end;
246  }
247  };
248 
250  // Inheriting from priority_queue, so we can access the internal container
251  class Queue
252  : public std::priority_queue<WorkItem, std::vector<WorkItem>, std::greater<WorkItem>>
253  {
254  public:
255  std::vector<WorkItem>& getContainer()
256  {
257  return this->c;
258  }
259  } m_items;
260 };
261 }
262 
263 #endif // TIMERQUEUE_H
std::condition_variable m_cv
Definition: timer_queue.h:50
bool operator>(const WorkItem &other) const
Definition: timer_queue.h:243
std::function< void(bool)> handler
Definition: timer_queue.h:242
bool waitUntil(const std::chrono::time_point< Clock, Duration > &point)
Definition: timer_queue.h:39
details::Semaphore m_checkWork
Definition: timer_queue.h:233
size_t cancel(uint64_t id)
Cancels the specified timer.
Definition: timer_queue.h:108
static pthread_mutex_t mutex
Definition: minitrace.cpp:61
std::vector< WorkItem > & getContainer()
Definition: timer_queue.h:255
Semaphore(unsigned int count=0)
Definition: timer_queue.h:20
std::chrono::time_point< _Clock, _Duration > end
Definition: timer_queue.h:240
void checkWork()
Definition: timer_queue.h:218
unsigned int m_count
Definition: timer_queue.h:51
size_t cancelAll()
Cancels all timers.
Definition: timer_queue.h:145
std::mutex m_mtx
Definition: timer_queue.h:249
static volatile int count
Definition: minitrace.cpp:55
uint64_t add(std::chrono::milliseconds milliseconds, std::function< void(bool)> handler)
Adds a new timer.
Definition: timer_queue.h:86
std::pair< bool, std::chrono::time_point< _Clock, _Duration > > calcWaitTime()
Definition: timer_queue.h:196
std::thread m_th
Definition: timer_queue.h:234


behaviotree_cpp_v3
Author(s): Michele Colledanchise, Davide Faconti
autogenerated on Tue May 4 2021 02:56:25