concurrency.h
Go to the documentation of this file.
1 // License: Apache 2.0. See LICENSE file in root directory.
2 // Copyright(c) 2015 Intel Corporation. All Rights Reserved.
3 
4 #pragma once
5 #include <queue>
6 #include <mutex>
7 #include <condition_variable>
8 #include <thread>
9 #include <atomic>
10 #include <functional>
11 
12 const int QUEUE_MAX_SIZE = 10;
13 // Simplest implementation of a blocking concurrent queue for thread messaging
14 template<class T>
16 {
17  std::deque<T> _queue;
18  std::mutex _mutex;
19  std::condition_variable _deq_cv; // not empty signal
20  std::condition_variable _enq_cv; // not empty signal
21 
22  unsigned int _cap;
23  bool _accepting;
24 
25  // flush mechanism is required to abort wait on cv
26  // when need to stop
27  std::atomic<bool> _need_to_flush;
28  std::atomic<bool> _was_flushed;
29  std::function<void(T const &)> _on_drop_callback;
30 public:
31  explicit single_consumer_queue<T>(unsigned int cap = QUEUE_MAX_SIZE, std::function<void(T const &)> on_drop_callback = nullptr)
32  : _queue(), _mutex(), _deq_cv(), _enq_cv(), _cap(cap), _accepting(true), _need_to_flush(false), _was_flushed(false), _on_drop_callback(on_drop_callback)
33  {}
34 
35  void enqueue(T&& item)
36  {
37  std::unique_lock<std::mutex> lock(_mutex);
38  if (_accepting)
39  {
40  _queue.push_back(std::move(item));
41  if (_queue.size() > _cap)
42  {
43  if (_on_drop_callback)
44  {
45  _on_drop_callback(_queue.front());
46  }
47  _queue.pop_front();
48  }
49  }
50  lock.unlock();
51  _deq_cv.notify_one();
52  }
53 
54  void blocking_enqueue(T&& item)
55  {
56  auto pred = [this]()->bool { return _queue.size() < _cap || _need_to_flush; };
57 
58  std::unique_lock<std::mutex> lock(_mutex);
59  if (_accepting)
60  {
61  _enq_cv.wait(lock, pred);
62  _queue.push_back(std::move(item));
63  }
64  lock.unlock();
65  _deq_cv.notify_one();
66  }
67 
68 
69  bool dequeue(T* item ,unsigned int timeout_ms)
70  {
71  std::unique_lock<std::mutex> lock(_mutex);
72  _accepting = true;
73  _was_flushed = false;
74  const auto ready = [this]() { return (_queue.size() > 0) || _need_to_flush; };
75  if (!ready() && !_deq_cv.wait_for(lock, std::chrono::milliseconds(timeout_ms), ready))
76  {
77  return false;
78  }
79 
80  if (_queue.size() <= 0)
81  {
82  return false;
83  }
84  *item = std::move(_queue.front());
85  _queue.pop_front();
86  _enq_cv.notify_one();
87  return true;
88  }
89 
90  bool try_dequeue(T* item)
91  {
92  std::unique_lock<std::mutex> lock(_mutex);
93  _accepting = true;
94  if (_queue.size() > 0)
95  {
96  auto val = std::move(_queue.front());
97  _queue.pop_front();
98  *item = std::move(val);
99  _enq_cv.notify_one();
100  return true;
101  }
102  return false;
103  }
104 
105  bool peek(T** item)
106  {
107  std::unique_lock<std::mutex> lock(_mutex);
108 
109  if (_queue.size() <= 0)
110  {
111  return false;
112  }
113  *item = &_queue.front();
114  return true;
115  }
116 
117  void clear()
118  {
119  std::unique_lock<std::mutex> lock(_mutex);
120 
121  _accepting = false;
122  _need_to_flush = true;
123 
124  _enq_cv.notify_all();
125  while (_queue.size() > 0)
126  {
127  auto item = std::move(_queue.front());
128  _queue.pop_front();
129  }
130  _deq_cv.notify_all();
131  }
132 
133  void start()
134  {
135  std::unique_lock<std::mutex> lock(_mutex);
136  _need_to_flush = false;
137  _accepting = true;
138  }
139 
140  size_t size()
141  {
142  std::unique_lock<std::mutex> lock(_mutex);
143  return _queue.size();
144  }
145 };
146 
147 template<class T>
149 {
151 
152 public:
154 
155  void enqueue(T&& item)
156  {
157  if (item.is_blocking())
158  _queue.blocking_enqueue(std::move(item));
159  else
160  _queue.enqueue(std::move(item));
161  }
162 
163  bool dequeue(T* item, unsigned int timeout_ms)
164  {
165  return _queue.dequeue(item, timeout_ms);
166  }
167 
168  bool peek(T** item)
169  {
170  return _queue.peek(item);
171  }
172 
173  bool try_dequeue(T* item)
174  {
175  return _queue.try_dequeue(item);
176  }
177 
178  void clear()
179  {
180  _queue.clear();
181  }
182 
183  void start()
184  {
185  _queue.start();
186  }
187 
188  size_t size()
189  {
190  return _queue.size();
191  }
192 };
193 
195 {
196 public:
198  {
199  public:
201  : _owner(owner)
202  {}
203 
204  bool try_sleep(std::chrono::milliseconds::rep ms)
205  {
206  using namespace std::chrono;
207 
208  std::unique_lock<std::mutex> lock(_owner->_was_stopped_mutex);
209  auto good = [&]() { return _owner->_was_stopped.load(); };
210  return !(_owner->_was_stopped_cv.wait_for(lock, milliseconds(ms), good));
211  }
212 
213  private:
215  };
216  typedef std::function<void(cancellable_timer const &)> action;
217  dispatcher(unsigned int cap, std::function <void(action)> on_drop_callback = nullptr)
218  : _queue(cap, on_drop_callback),
219  _was_stopped(true),
220  _was_flushed(false),
221  _is_alive(true)
222  {
223  _thread = std::thread([&]()
224  {
225  int timeout_ms = 5000;
226  while (_is_alive)
227  {
228  std::function<void(cancellable_timer)> item;
229 
230  if (_queue.dequeue(&item, timeout_ms))
231  {
232  cancellable_timer time(this);
233 
234  try
235  {
236  item(time);
237  }
238  catch(...){}
239  }
240 
241 #ifndef ANDROID
242  std::unique_lock<std::mutex> lock(_was_flushed_mutex);
243 #endif
244  _was_flushed = true;
245  _was_flushed_cv.notify_all();
246 #ifndef ANDROID
247  lock.unlock();
248 #endif
249  }
250  });
251  }
252 
253  template<class T>
254  void invoke(T item, bool is_blocking = false)
255  {
256  if (!_was_stopped)
257  {
258  if(is_blocking)
259  _queue.blocking_enqueue(std::move(item));
260  else
261  _queue.enqueue(std::move(item));
262  }
263  }
264 
265  template<class T>
266  void invoke_and_wait(T item, std::function<bool()> exit_condition, bool is_blocking = false)
267  {
268  bool done = false;
269 
270  //action
271  auto func = std::move(item);
273  {
274  std::lock_guard<std::mutex> lk(_blocking_invoke_mutex);
275  func(c);
276 
277  done = true;
278  _blocking_invoke_cv.notify_one();
279  }, is_blocking);
280 
281  //wait
282  std::unique_lock<std::mutex> lk(_blocking_invoke_mutex);
283  _blocking_invoke_cv.wait(lk, [&](){ return done || exit_condition(); });
284  }
285 
286  void start()
287  {
288  std::unique_lock<std::mutex> lock(_was_stopped_mutex);
289  _was_stopped = false;
290 
291  _queue.start();
292  }
293 
294  void stop()
295  {
296  {
297  std::unique_lock<std::mutex> lock(_was_stopped_mutex);
298 
299  if (_was_stopped.load()) return;
300 
301  _was_stopped = true;
302  _was_stopped_cv.notify_all();
303  }
304 
305  _queue.clear();
306 
307  {
308  std::unique_lock<std::mutex> lock(_was_flushed_mutex);
309  _was_flushed = false;
310  }
311 
312  std::unique_lock<std::mutex> lock_was_flushed(_was_flushed_mutex);
313  _was_flushed_cv.wait_for(lock_was_flushed, std::chrono::hours(999999), [&]() { return _was_flushed.load(); });
314 
315  _queue.start();
316  }
317 
319  {
320  stop();
321  _queue.clear();
322  _is_alive = false;
323 
324  if (_thread.joinable())
325  _thread.join();
326  }
327 
328  bool flush()
329  {
330  std::mutex m;
331  std::condition_variable cv;
332  bool invoked = false;
333  auto wait_sucess = std::make_shared<std::atomic_bool>(true);
334  invoke([&, wait_sucess](cancellable_timer t)
335  {
337  if (_was_stopped || !(*wait_sucess))
338  return;
339 
340  {
341  std::lock_guard<std::mutex> locker(m);
342  invoked = true;
343  }
344  cv.notify_one();
345  });
346  std::unique_lock<std::mutex> locker(m);
347  *wait_sucess = cv.wait_for(locker, std::chrono::seconds(10), [&]() { return invoked || _was_stopped; });
348  return *wait_sucess;
349  }
350 
351  bool empty()
352  {
353  return _queue.size() == 0;
354  }
355 
356 private:
359  std::thread _thread;
360 
361  std::atomic<bool> _was_stopped;
362  std::condition_variable _was_stopped_cv;
363  std::mutex _was_stopped_mutex;
364 
365  std::atomic<bool> _was_flushed;
366  std::condition_variable _was_flushed_cv;
367  std::mutex _was_flushed_mutex;
368 
369  std::condition_variable _blocking_invoke_cv;
371 
372  std::atomic<bool> _is_alive;
373 };
374 
375 template<class T = std::function<void(dispatcher::cancellable_timer)>>
377 {
378 public:
379  active_object(T operation)
380  : _operation(std::move(operation)), _dispatcher(1), _stopped(true)
381  {
382  }
383 
384  void start()
385  {
386  _stopped = false;
387  _dispatcher.start();
388 
389  do_loop();
390  }
391 
392  void stop()
393  {
394  if (!_stopped.load()) {
395  _stopped = true;
396  _dispatcher.stop();
397  }
398  }
399 
401  {
402  stop();
403  }
404 
405  bool is_active() const
406  {
407  return !_stopped;
408  }
409 private:
410  void do_loop()
411  {
412  _dispatcher.invoke([this](dispatcher::cancellable_timer ct)
413  {
414  _operation(ct);
415  if (!_stopped)
416  {
417  do_loop();
418  }
419  });
420  }
421 
424  std::atomic<bool> _stopped;
425 };
426 
427 class watchdog
428 {
429 public:
430  watchdog(std::function<void()> operation, uint64_t timeout_ms) :
431  _timeout_ms(timeout_ms), _operation(std::move(operation))
432  {
433  _watcher = std::make_shared<active_object<>>([this](dispatcher::cancellable_timer cancellable_timer)
434  {
435  if(cancellable_timer.try_sleep(_timeout_ms))
436  {
437  if(!_kicked)
438  _operation();
439  std::lock_guard<std::mutex> lk(_m);
440  _kicked = false;
441  }
442  });
443  }
444 
446  {
447  if(_running)
448  stop();
449  }
450 
451  void start() { std::lock_guard<std::mutex> lk(_m); _watcher->start(); _running = true; }
452  void stop() { { std::lock_guard<std::mutex> lk(_m); _running = false; } _watcher->stop(); }
453  bool running() { std::lock_guard<std::mutex> lk(_m); return _running; }
454  void set_timeout(uint64_t timeout_ms) { std::lock_guard<std::mutex> lk(_m); _timeout_ms = timeout_ms; }
455  void kick() { std::lock_guard<std::mutex> lk(_m); _kicked = true; }
456 
457 private:
458  std::mutex _m;
460  bool _kicked = false;
461  bool _running = false;
462  std::function<void()> _operation;
463  std::shared_ptr<active_object<>> _watcher;
464 };
void enqueue(T &&item)
Definition: concurrency.h:35
static const textual_icon lock
Definition: model-views.h:218
single_consumer_queue< std::function< void(cancellable_timer)> > _queue
Definition: concurrency.h:358
std::mutex _m
Definition: concurrency.h:458
void do_loop()
Definition: concurrency.h:410
active_object(T operation)
Definition: concurrency.h:379
dispatcher(unsigned int cap, std::function< void(action)> on_drop_callback=nullptr)
Definition: concurrency.h:217
void start()
Definition: concurrency.h:286
const GLfloat * m
Definition: glext.h:6814
std::mutex _was_stopped_mutex
Definition: concurrency.h:363
std::condition_variable _was_flushed_cv
Definition: concurrency.h:366
void set_timeout(uint64_t timeout_ms)
Definition: concurrency.h:454
bool is_active() const
Definition: concurrency.h:405
watchdog(std::function< void()> operation, uint64_t timeout_ms)
Definition: concurrency.h:430
bool running()
Definition: concurrency.h:453
cancellable_timer(dispatcher *owner)
Definition: concurrency.h:200
bool dequeue(T *item, unsigned int timeout_ms)
Definition: concurrency.h:69
void blocking_enqueue(T &&item)
Definition: concurrency.h:54
std::condition_variable _was_stopped_cv
Definition: concurrency.h:362
single_consumer_queue< T > _queue
Definition: concurrency.h:150
static const textual_icon stop
Definition: model-views.h:225
std::mutex _blocking_invoke_mutex
Definition: concurrency.h:370
std::function< void()> _operation
Definition: concurrency.h:462
std::atomic< bool > _stopped
Definition: concurrency.h:424
GLdouble t
GLenum cap
Definition: glext.h:8882
GLuint GLfloat * val
std::condition_variable _deq_cv
Definition: concurrency.h:19
std::function< void(cancellable_timer const &)> action
Definition: concurrency.h:216
std::atomic< bool > _was_flushed
Definition: concurrency.h:28
std::atomic< bool > _is_alive
Definition: concurrency.h:372
friend cancellable_timer
Definition: concurrency.h:357
bool dequeue(T *item, unsigned int timeout_ms)
Definition: concurrency.h:163
void stop()
Definition: concurrency.h:294
std::atomic< bool > _need_to_flush
Definition: concurrency.h:27
bool peek(T **item)
Definition: concurrency.h:105
std::function< void(T const &)> _on_drop_callback
Definition: concurrency.h:29
std::thread _thread
Definition: concurrency.h:359
const GLubyte * c
Definition: glext.h:12690
std::condition_variable _enq_cv
Definition: concurrency.h:20
std::atomic< bool > _was_stopped
Definition: concurrency.h:361
unsigned int _cap
Definition: concurrency.h:22
std::condition_variable _blocking_invoke_cv
Definition: concurrency.h:369
static std::condition_variable cv
bool flush()
Definition: concurrency.h:328
unsigned __int64 uint64_t
Definition: stdint.h:90
dispatcher _dispatcher
Definition: concurrency.h:423
static int done
uint64_t _timeout_ms
Definition: concurrency.h:459
GLenum func
bool try_dequeue(T *item)
Definition: concurrency.h:90
auto dispatcher
void stop()
Definition: concurrency.h:452
bool empty()
Definition: concurrency.h:351
std::deque< T > _queue
Definition: concurrency.h:17
const int QUEUE_MAX_SIZE
Definition: concurrency.h:12
std::mutex _was_flushed_mutex
Definition: concurrency.h:367
void invoke(T item, bool is_blocking=false)
Definition: concurrency.h:254
typename::boost::move_detail::remove_reference< T >::type && move(T &&t) BOOST_NOEXCEPT
void start()
Definition: concurrency.h:451
std::atomic< bool > _was_flushed
Definition: concurrency.h:365
std::shared_ptr< active_object<> > _watcher
Definition: concurrency.h:463
bool try_sleep(std::chrono::milliseconds::rep ms)
Definition: concurrency.h:204
void kick()
Definition: concurrency.h:455
void invoke_and_wait(T item, std::function< bool()> exit_condition, bool is_blocking=false)
Definition: concurrency.h:266


librealsense2
Author(s): Sergey Dorodnicov , Doron Hirshberg , Mark Horn , Reagan Lopez , Itay Carpis
autogenerated on Mon May 3 2021 02:47:12