timer_queue.h
Go to the documentation of this file.
1 #pragma once
2 
3 #include <atomic>
4 #include <mutex>
5 #include <condition_variable>
6 #include <thread>
7 #include <queue>
8 #include <chrono>
9 #include <functional>
10 #include <assert.h>
11 
12 namespace BT
13 {
14 // http://www.crazygaze.com/blog/2016/03/24/portable-c-timer-queue/
15 
16 namespace details
17 {
18 class Semaphore
19 {
20 public:
21  Semaphore(unsigned int count = 0) : m_count(count)
22  {}
23 
24  void notify()
25  {
26  {
27  std::lock_guard<std::mutex> lock(m_mtx);
28  m_count++;
29  }
30  m_cv.notify_one();
31  }
32 
33  template <class Clock, class Duration>
34  bool waitUntil(const std::chrono::time_point<Clock, Duration>& point)
35  {
36  std::unique_lock<std::mutex> lock(m_mtx);
37  if(!m_cv.wait_until(lock, point, [this]() { return m_count > 0 || m_unlock; }))
38  {
39  return false;
40  }
41  m_count--;
42  m_unlock = false;
43  return true;
44  }
45 
46  void manualUnlock()
47  {
48  m_unlock = true;
49  m_cv.notify_one();
50  }
51 
52 private:
54  std::condition_variable m_cv;
55  unsigned m_count = 0;
56  std::atomic_bool m_unlock = false;
57 };
58 } // namespace details
59 
60 // Timer Queue
61 //
62 // Allows execution of handlers at a specified time in the future
63 // Guarantees:
64 // - All handlers are executed ONCE, even if canceled (aborted parameter will
65 //be set to true)
66 // - If TimerQueue is destroyed, it will cancel all handlers.
67 // - Handlers are ALWAYS executed in the Timer Queue worker thread.
68 // - Handlers execution order is NOT guaranteed
69 //
70 template <typename _Clock = std::chrono::steady_clock,
71  typename _Duration = std::chrono::steady_clock::duration>
73 {
74 public:
76  {
77  m_th = std::thread([this] { run(); });
78  }
79 
81  {
82  m_finish = true;
83  cancelAll();
85  m_th.join();
86  }
87 
89  // \return
90  // Returns the ID of the new timer. You can use this ID to cancel the
91  // timer
92  uint64_t add(std::chrono::milliseconds milliseconds, std::function<void(bool)> handler)
93  {
94  WorkItem item;
95  item.end = _Clock::now() + milliseconds;
96  item.handler = std::move(handler);
97 
98  std::unique_lock<std::mutex> lk(m_mtx);
99  uint64_t id = ++m_idcounter;
100  item.id = id;
101  m_items.push(std::move(item));
102  lk.unlock();
103 
104  // Something changed, so wake up timer thread
106  return id;
107  }
108 
110  // \return
111  // 1 if the timer was cancelled.
112  // 0 if you were too late to cancel (or the timer ID was never valid to
113  // start with)
114  size_t cancel(uint64_t id)
115  {
116  // Instead of removing the item from the container (thus breaking the
117  // heap integrity), we set the item as having no handler, and put
118  // that handler on a new item at the top for immediate execution
119  // The timer thread will then ignore the original item, since it has no
120  // handler.
121  std::unique_lock<std::mutex> lk(m_mtx);
122  for(auto&& item : m_items.getContainer())
123  {
124  if(item.id == id && item.handler)
125  {
126  WorkItem newItem;
127  // Zero time, so it stays at the top for immediate execution
128  newItem.end = std::chrono::time_point<_Clock, _Duration>();
129  newItem.id = 0; // Means it is a canceled item
130  // Move the handler from item to newItem.
131  // Also, we need to manually set the handler to nullptr, since
132  // the standard does not guarantee moving an std::function will
133  // empty it. Some STL implementation will empty it, others will
134  // not.
135  newItem.handler = std::move(item.handler);
136  item.handler = nullptr;
137  m_items.push(std::move(newItem));
138 
139  lk.unlock();
140  // Something changed, so wake up timer thread
142  return 1;
143  }
144  }
145  return 0;
146  }
147 
149  // \return
150  // The number of timers cancelled
151  size_t cancelAll()
152  {
153  // Setting all "end" to 0 (for immediate execution) is ok,
154  // since it maintains the heap integrity
155  std::unique_lock<std::mutex> lk(m_mtx);
156  for(auto&& item : m_items.getContainer())
157  {
158  if(item.id)
159  {
160  item.end = std::chrono::time_point<_Clock, _Duration>();
161  item.id = 0;
162  }
163  }
164  auto ret = m_items.size();
165 
166  lk.unlock();
168  return ret;
169  }
170 
171 private:
172  TimerQueue(const TimerQueue&) = delete;
173  TimerQueue& operator=(const TimerQueue&) = delete;
174 
175  void run()
176  {
177  while(!m_finish)
178  {
179  auto end = calcWaitTime();
180  if(end.first)
181  {
182  // Timers found, so wait until it expires (or something else
183  // changes)
184  m_checkWork.waitUntil(end.second);
185  }
186  else
187  {
188  // No timers exist, so wait an arbitrary amount of time
189  m_checkWork.waitUntil(_Clock::now() + std::chrono::milliseconds(10));
190  }
191 
192  // Check and execute as much work as possible, such as, all expired
193  // timers
194  checkWork();
195  }
196 
197  // If we are shutting down, we should not have any items left,
198  // since the shutdown cancels all items
199  assert(m_items.size() == 0);
200  }
201 
202  std::pair<bool, std::chrono::time_point<_Clock, _Duration>> calcWaitTime()
203  {
204  std::lock_guard<std::mutex> lk(m_mtx);
205  while(m_items.size())
206  {
207  if(m_items.top().handler)
208  {
209  // Item present, so return the new wait time
210  return std::make_pair(true, m_items.top().end);
211  }
212  else
213  {
214  // Discard empty handlers (they were cancelled)
215  m_items.pop();
216  }
217  }
218 
219  // No items found, so return no wait time (causes the thread to wait
220  // indefinitely)
221  return std::make_pair(false, std::chrono::time_point<_Clock, _Duration>());
222  }
223 
224  void checkWork()
225  {
226  std::unique_lock<std::mutex> lk(m_mtx);
227  while(m_items.size() && m_items.top().end <= _Clock::now())
228  {
229  WorkItem item(std::move(m_items.top()));
230  m_items.pop();
231 
232  lk.unlock();
233  if(item.handler)
234  {
235  item.handler(item.id == 0);
236  }
237  lk.lock();
238  }
239  }
240 
242  std::thread m_th;
243  bool m_finish = false;
244  uint64_t m_idcounter = 0;
245 
246  struct WorkItem
247  {
248  std::chrono::time_point<_Clock, _Duration> end;
249  uint64_t id; // id==0 means it was cancelled
250  std::function<void(bool)> handler;
251  bool operator>(const WorkItem& other) const
252  {
253  return end > other.end;
254  }
255  };
256 
258  // Inheriting from priority_queue, so we can access the internal container
259  class Queue
260  : public std::priority_queue<WorkItem, std::vector<WorkItem>, std::greater<WorkItem>>
261  {
262  public:
263  std::vector<WorkItem>& getContainer()
264  {
265  return this->c;
266  }
267  } m_items;
268 };
269 } // namespace BT
BT
Definition: ex01_wrap_legacy.cpp:29
BT::TimerQueue::add
uint64_t add(std::chrono::milliseconds milliseconds, std::function< void(bool)> handler)
Adds a new timer.
Definition: timer_queue.h:92
BT::TimerQueue
Definition: timer_queue.h:72
BT::TimerQueue::m_th
std::thread m_th
Definition: timer_queue.h:242
BT::TimerQueue::m_finish
bool m_finish
Definition: timer_queue.h:243
BT::details::Semaphore::m_count
unsigned m_count
Definition: timer_queue.h:55
BT::TimerQueue::WorkItem::id
uint64_t id
Definition: timer_queue.h:249
BT::details::Semaphore::waitUntil
bool waitUntil(const std::chrono::time_point< Clock, Duration > &point)
Definition: timer_queue.h:34
BT::TimerQueue::WorkItem::handler
std::function< void(bool)> handler
Definition: timer_queue.h:250
BT::details::Semaphore
Definition: timer_queue.h:18
BT::TimerQueue::WorkItem::operator>
bool operator>(const WorkItem &other) const
Definition: timer_queue.h:251
BT::TimerQueue::run
void run()
Definition: timer_queue.h:175
cx::end
constexpr auto end(const C &c) -> decltype(c.end())
Definition: wildcards.hpp:686
detail::void
j template void())
Definition: json.hpp:4893
mutex
static pthread_mutex_t mutex
Definition: minitrace.cpp:77
BT::TimerQueue::cancel
size_t cancel(uint64_t id)
Cancels the specified timer.
Definition: timer_queue.h:114
lexy::count
constexpr auto count
Sink that counts all arguments.
Definition: fold.hpp:88
BT::TimerQueue::WorkItem::end
std::chrono::time_point< _Clock, _Duration > end
Definition: timer_queue.h:248
BT::TimerQueue::m_mtx
std::mutex m_mtx
Definition: timer_queue.h:257
BT::TimerQueue::WorkItem
Definition: timer_queue.h:246
BT::TimerQueue::Queue
Definition: timer_queue.h:259
BT::TimerQueue::operator=
TimerQueue & operator=(const TimerQueue &)=delete
BT::details::Semaphore::notify
void notify()
Definition: timer_queue.h:24
BT::TimerQueue::m_checkWork
details::Semaphore m_checkWork
Definition: timer_queue.h:241
BT::TimerQueue::m_items
BT::TimerQueue::Queue m_items
BT::details::Semaphore::Semaphore
Semaphore(unsigned int count=0)
Definition: timer_queue.h:21
BT::details::Semaphore::m_unlock
std::atomic_bool m_unlock
Definition: timer_queue.h:56
BT::TimerQueue::Queue::getContainer
std::vector< WorkItem > & getContainer()
Definition: timer_queue.h:263
BT::TimerQueue::m_idcounter
uint64_t m_idcounter
Definition: timer_queue.h:244
BT::details::Semaphore::m_mtx
std::mutex m_mtx
Definition: timer_queue.h:53
BT::details::Semaphore::m_cv
std::condition_variable m_cv
Definition: timer_queue.h:54
BT::TimerQueue::checkWork
void checkWork()
Definition: timer_queue.h:224
BT::TimerQueue::~TimerQueue
~TimerQueue()
Definition: timer_queue.h:80
BT::TimerQueue::TimerQueue
TimerQueue()
Definition: timer_queue.h:75
BT::TimerQueue::cancelAll
size_t cancelAll()
Cancels all timers.
Definition: timer_queue.h:151
BT::TimerQueue::calcWaitTime
std::pair< bool, std::chrono::time_point< _Clock, _Duration > > calcWaitTime()
Definition: timer_queue.h:202
BT::details::Semaphore::manualUnlock
void manualUnlock()
Definition: timer_queue.h:46


behaviortree_cpp_v4
Author(s): Davide Faconti
autogenerated on Fri Jun 28 2024 02:20:08