concurrency.h
Go to the documentation of this file.
1 // License: Apache 2.0. See LICENSE file in root directory.
2 // Copyright(c) 2015 Intel Corporation. All Rights Reserved.
3 
4 #pragma once
5 #include <queue>
6 #include <mutex>
7 #include <condition_variable>
8 #include <thread>
9 #include <atomic>
10 #include <functional>
11 #include <cassert>
12 
13 const int QUEUE_MAX_SIZE = 10;
14 // Simplest implementation of a blocking concurrent queue for thread messaging
15 template<class T>
17 {
18  std::deque<T> _queue;
19  mutable std::mutex _mutex;
20  std::condition_variable _deq_cv; // not empty signal
21  std::condition_variable _enq_cv; // not full signal
22 
23  unsigned int const _cap;
24  bool _accepting;
25 
26  std::function<void(T const &)> const _on_drop_callback;
27 
28 public:
29  explicit single_consumer_queue< T >( unsigned int cap = QUEUE_MAX_SIZE,
30  std::function< void( T const & ) > on_drop_callback = nullptr )
31  : _cap( cap )
32  , _accepting( true )
33  , _on_drop_callback( on_drop_callback )
34  {
35  }
36 
37  // Enqueue an item onto the queue.
38  // If the queue grows beyond capacity, the front will be removed, losing whatever was there!
39  bool enqueue(T&& item)
40  {
41  std::unique_lock<std::mutex> lock(_mutex);
42  if( ! _accepting )
43  {
44  if( _on_drop_callback )
45  _on_drop_callback( item );
46  return false;
47  }
48 
49  _queue.push_back(std::move(item));
50 
51  if( _queue.size() > _cap )
52  {
53  if( _on_drop_callback )
54  _on_drop_callback( _queue.front() );
55  _queue.pop_front();
56  }
57 
58  lock.unlock();
59 
60  // We pushed something -- let others know there's something to dequeue
61  _deq_cv.notify_one();
62 
63  return true;
64  }
65 
66 
67  // Enqueue an item, but wait for room if there isn't any
68  // Returns true if the enqueue succeeded
69  bool blocking_enqueue(T&& item)
70  {
71  std::unique_lock<std::mutex> lock(_mutex);
72  _enq_cv.wait( lock, [this]() {
73  return _queue.size() < _cap; } );
74  if( ! _accepting )
75  {
76  // We shouldn't be adding anything to the queue when we're stopping
77  if( _on_drop_callback )
78  _on_drop_callback( item );
79  return false;
80  }
81 
82  _queue.push_back(std::move(item));
83  lock.unlock();
84 
85  // We pushed something -- let another know there's something to dequeue
86  _deq_cv.notify_one();
87 
88  return true;
89  }
90 
91 
92  // Remove one item; if unavailable, wait for it
93  // Return true if an item was removed -- otherwise, false
94  bool dequeue( T * item, unsigned int timeout_ms )
95  {
96  std::unique_lock<std::mutex> lock(_mutex);
97  if( ! _deq_cv.wait_for( lock,
98  std::chrono::milliseconds( timeout_ms ),
99  [this]() { return ! _accepting || ! _queue.empty(); } )
100  || _queue.empty() )
101  {
102  return false;
103  }
104 
105  *item = std::move(_queue.front());
106  _queue.pop_front();
107 
108  // We've made room -- let whoever is waiting for room know about it
109  _enq_cv.notify_one();
110 
111  return true;
112  }
113 
114  // Remove one item if available; do not wait for one
115  // Return true if an item was removed -- otherwise, false
116  bool try_dequeue(T* item)
117  {
118  std::lock_guard< std::mutex > lock( _mutex );
119  if( _queue.empty() )
120  return false;
121 
122  *item = std::move(_queue.front());
123  _queue.pop_front();
124 
125  // We've made room -- let whoever is waiting for room know about it
126  _enq_cv.notify_one();
127 
128  return true;
129  }
130 
131  template< class Fn >
132  bool peek( Fn fn ) const
133  {
134  std::lock_guard< std::mutex > lock( _mutex );
135  if( _queue.empty() )
136  return false;
137  fn( _queue.front() );
138  return true;
139  }
140 
141  template< class Fn >
142  bool peek( Fn fn )
143  {
144  std::lock_guard< std::mutex > lock( _mutex );
145  if( _queue.empty() )
146  return false;
147  fn( _queue.front() );
148  return true;
149  }
150 
151  void stop()
152  {
153  std::lock_guard< std::mutex > lock( _mutex );
154 
155  // We no longer accept any more items!
156  _accepting = false;
157 
158  _clear();
159  }
160 
161  void clear()
162  {
163  std::lock_guard< std::mutex > lock( _mutex );
164  _clear();
165  }
166 
167 protected:
168  void _clear()
169  {
170  _queue.clear();
171 
172  // Wake up anyone who is waiting for room to enqueue, or waiting for something to dequeue -- there's nothing now
173  _enq_cv.notify_all();
174  _deq_cv.notify_all();
175  }
176 
177 public:
178  void start()
179  {
180  std::lock_guard< std::mutex > lock( _mutex );
181  _accepting = true;
182  }
183 
184  bool started() const { return _accepting; }
185  bool stopped() const { return ! started(); }
186 
187  size_t size() const
188  {
189  std::lock_guard< std::mutex > lock( _mutex );
190  return _queue.size();
191  }
192 
193  bool empty() const { return ! size(); }
194 };
195 
196 // A single_consumer_queue meant to hold frame_holder objects
197 template<class T>
199 {
201 
202 public:
204  std::function< void( T const & ) > on_drop_callback = nullptr )
205  : _queue( cap, on_drop_callback )
206  {
207  }
208 
209  bool enqueue( T && item )
210  {
211  if( item->is_blocking() )
212  return _queue.blocking_enqueue( std::move( item ) );
213  else
214  return _queue.enqueue( std::move( item ) );
215  }
216 
217  bool dequeue(T* item, unsigned int timeout_ms)
218  {
219  return _queue.dequeue(item, timeout_ms);
220  }
221 
222  bool try_dequeue(T* item)
223  {
224  return _queue.try_dequeue(item);
225  }
226 
227  template< class Fn >
228  bool peek( Fn fn ) const
229  {
230  return _queue.peek( fn );
231  }
232 
233  template< class Fn >
234  bool peek( Fn fn )
235  {
236  return _queue.peek( fn );
237  }
238 
239  void clear()
240  {
241  _queue.clear();
242  }
243 
244  void stop()
245  {
246  _queue.stop();
247  }
248 
249  void start()
250  {
251  _queue.start();
252  }
253 
254  size_t size() const
255  {
256  return _queue.size();
257  }
258 
259  bool empty() const
260  {
261  return _queue.empty();
262  }
263 
264  bool started() const { return _queue.started(); }
265  bool stopped() const { return _queue.stopped(); }
266 };
267 
268 // The dispatcher is responsible for dispatching generic 'actions': any thread can queue an action
269 // (lambda) for dispatch, while the dispatcher maintains a single thread that runs these actions one
270 // at a time.
271 //
273 {
274 public:
275  // An action, when run, takes a 'cancellable_timer', which it may ignore. This class allows the
276  // action to perform some sleep and know that, if the dispatcher is shutting down, its sleep
277  // will still be interrupted.
278  //
280  {
282 
283  public:
285  : _owner(owner)
286  {}
287 
288  bool was_stopped() const { return _owner->_was_stopped.load(); }
289 
290  // Replacement for sleep() -- try to sleep for a time, but stop if the
291  // dispatcher is stopped
292  //
293  // Return false if the dispatcher was stopped, true otherwise
294  //
295  template< class Duration >
296  bool try_sleep( Duration sleep_time )
297  {
298  using namespace std::chrono;
299 
300  std::unique_lock<std::mutex> lock(_owner->_was_stopped_mutex);
301  if( was_stopped() )
302  return false;
303  // wait_for() returns "false if the predicate pred still evaluates to false after the
304  // rel_time timeout expired, otherwise true"
305  return ! (
306  _owner->_was_stopped_cv.wait_for( lock, sleep_time, [&]() { return was_stopped(); } ) );
307  }
308  };
309 
310  // An action is any functor that accepts a cancellable timer
311  typedef std::function<void(cancellable_timer const &)> action;
312 
313  // Certain conditions (see invoke()) may cause actions to be lost, e.g. when the queue gets full
314  // and we're non-blocking. The on_drop_callback allows caputring of these instances, if we
315  // want...
316  //
317  dispatcher( unsigned int queue_capacity,
318  std::function< void( action ) > on_drop_callback = nullptr );
319 
320  ~dispatcher();
321 
322  bool empty() const { return _queue.empty(); }
323 
324  // Main invocation of an action: this will be called from any thread, and basically just queues
325  // up the actions for our dispatching thread to handle them.
326  //
327  // A blocking invocation means that it will wait until there's room in the queue: if not
328  // blocking and the queue is full, the action will be queued at the expense of the next action
329  // in line (the oldest) for dispatch!
330  //
331  template<class T>
332  void invoke(T item, bool is_blocking = false)
333  {
334  if (!_was_stopped)
335  {
336  if(is_blocking)
337  _queue.blocking_enqueue(std::move(item));
338  else
339  _queue.enqueue(std::move(item));
340  }
341  }
342 
343  // Like above, but synchronous: will return only when the action has actually been dispatched.
344  //
345  template<class T>
346  void invoke_and_wait(T item, std::function<bool()> exit_condition, bool is_blocking = false)
347  {
348  bool done = false;
349 
350  //action
351  auto func = std::move(item);
353  {
354  std::lock_guard<std::mutex> lk(_blocking_invoke_mutex);
355  func(c);
356 
357  done = true;
358  _blocking_invoke_cv.notify_one();
359  }, is_blocking);
360 
361  //wait
362  std::unique_lock<std::mutex> lk(_blocking_invoke_mutex);
363  _blocking_invoke_cv.wait(lk, [&](){ return done || exit_condition(); });
364  }
365 
366  // Stops the dispatcher. This is not a pause: it will clear out the queue, losing any pending
367  // actions!
368  //
369  void stop();
370 
371  // A dispatcher starts out 'started' after construction. It can then be stopped and started
372  // again if needed.
373  //
374  void start();
375 
376  // Return when all items in the queue are finished (within a timeout).
377  // If additional items are added while we're waiting, those will not be waited on!
378  //
379  bool flush(std::chrono::steady_clock::duration timeout = std::chrono::seconds(10) );
380 
381 
382 private:
383  // Return true if dispatcher is started (within a timeout).
384  // false if not or the dispatcher is no longer alive
385  //
386  bool _wait_for_start( int timeout_ms );
387 
389 
391  std::thread _thread;
392 
393  std::atomic<bool> _was_stopped;
394  std::condition_variable _was_stopped_cv;
395  std::mutex _was_stopped_mutex;
396 
397  std::mutex _dispatch_mutex;
398 
399  std::condition_variable _blocking_invoke_cv;
401 
402  std::atomic<bool> _is_alive;
403 };
404 
405 template<class T = std::function<void(dispatcher::cancellable_timer)>>
407 {
408 public:
409  active_object(T operation)
410  : _operation(std::move(operation)), _dispatcher(1), _stopped(true)
411  {
412  }
413 
414  void start()
415  {
416  _stopped = false;
417  _dispatcher.start();
418 
419  do_loop();
420  }
421 
422  void stop()
423  {
424  if (!_stopped.load()) {
425  _stopped = true;
426  _dispatcher.stop();
427  }
428  }
429 
431  {
432  stop();
433  }
434 
435  bool is_active() const
436  {
437  return !_stopped;
438  }
439 private:
440  void do_loop()
441  {
442  _dispatcher.invoke([this](dispatcher::cancellable_timer ct)
443  {
444  _operation(ct);
445  if (!_stopped)
446  {
447  do_loop();
448  }
449  });
450  }
451 
454  std::atomic<bool> _stopped;
455 };
456 
457 class watchdog
458 {
459 public:
460  watchdog(std::function<void()> operation, uint64_t timeout_ms) :
461  _timeout_ms(timeout_ms), _operation(std::move(operation))
462  {
463  _watcher = std::make_shared<active_object<>>([this](dispatcher::cancellable_timer cancellable_timer)
464  {
465  if(cancellable_timer.try_sleep( std::chrono::milliseconds( _timeout_ms )))
466  {
467  if(!_kicked)
468  _operation();
469  std::lock_guard<std::mutex> lk(_m);
470  _kicked = false;
471  }
472  });
473  }
474 
476  {
477  if(_running)
478  stop();
479  }
480 
481  void start() { std::lock_guard<std::mutex> lk(_m); _watcher->start(); _running = true; }
482  void stop() { { std::lock_guard<std::mutex> lk(_m); _running = false; } _watcher->stop(); }
483  bool running() { std::lock_guard<std::mutex> lk(_m); return _running; }
484  void set_timeout(uint64_t timeout_ms) { std::lock_guard<std::mutex> lk(_m); _timeout_ms = timeout_ms; }
485  void kick() { std::lock_guard<std::mutex> lk(_m); _kicked = true; }
486 
487 private:
488  std::mutex _m;
490  bool _kicked = false;
491  bool _running = false;
492  std::function<void()> _operation;
493  std::shared_ptr<active_object<>> _watcher;
494 };
static const textual_icon lock
Definition: model-views.h:219
single_consumer_queue< std::function< void(cancellable_timer)> > _queue
Definition: concurrency.h:390
std::mutex _m
Definition: concurrency.h:488
void do_loop()
Definition: concurrency.h:440
active_object(T operation)
Definition: concurrency.h:409
bool blocking_enqueue(T &&item)
Definition: concurrency.h:69
std::function< void(T const &)> const _on_drop_callback
Definition: concurrency.h:26
size_t size() const
Definition: concurrency.h:187
std::mutex _was_stopped_mutex
Definition: concurrency.h:395
void set_timeout(uint64_t timeout_ms)
Definition: concurrency.h:484
bool peek(Fn fn) const
Definition: concurrency.h:132
watchdog(std::function< void()> operation, uint64_t timeout_ms)
Definition: concurrency.h:460
bool running()
Definition: concurrency.h:483
dispatcher invoke([&](dispatcher::cancellable_timer c) { std::this_thread::sleep_for(std::chrono::seconds(3));dispatched_end_verifier=true;})
cancellable_timer(dispatcher *owner)
Definition: concurrency.h:284
bool dequeue(T *item, unsigned int timeout_ms)
Definition: concurrency.h:94
std::condition_variable _was_stopped_cv
Definition: concurrency.h:394
single_consumer_queue< T > _queue
Definition: concurrency.h:200
std::mutex _blocking_invoke_mutex
Definition: concurrency.h:400
std::function< void()> _operation
Definition: concurrency.h:492
unsigned int const _cap
Definition: concurrency.h:23
std::atomic< bool > _stopped
Definition: concurrency.h:454
::std_msgs::Duration_< std::allocator< void > > Duration
Definition: Duration.h:47
std::condition_variable _deq_cv
Definition: concurrency.h:20
std::function< void(cancellable_timer const &)> action
Definition: concurrency.h:311
bool empty() const
Definition: concurrency.h:322
std::atomic< bool > _is_alive
Definition: concurrency.h:402
friend cancellable_timer
Definition: concurrency.h:388
bool dequeue(T *item, unsigned int timeout_ms)
Definition: concurrency.h:217
bool enqueue(T &&item)
Definition: concurrency.h:39
std::thread _thread
Definition: concurrency.h:391
std::condition_variable _enq_cv
Definition: concurrency.h:21
std::atomic< bool > _was_stopped
Definition: concurrency.h:393
std::condition_variable _blocking_invoke_cv
Definition: concurrency.h:399
unsigned __int64 uint64_t
Definition: stdint.h:90
dispatcher _dispatcher
Definition: concurrency.h:453
bool started() const
Definition: concurrency.h:184
static int done
uint64_t _timeout_ms
Definition: concurrency.h:489
bool empty() const
Definition: concurrency.h:193
GLenum func
bool try_dequeue(T *item)
Definition: concurrency.h:116
auto dispatcher
void stop()
Definition: concurrency.h:482
bool is_active() const
Definition: concurrency.h:435
bool peek(Fn fn) const
Definition: concurrency.h:228
GLbitfield GLuint64 timeout
std::deque< T > _queue
Definition: concurrency.h:18
const int QUEUE_MAX_SIZE
Definition: concurrency.h:13
void invoke(T item, bool is_blocking=false)
Definition: concurrency.h:332
typename ::boost::move_detail::remove_reference< T >::type && move(T &&t) BOOST_NOEXCEPT
void start()
Definition: concurrency.h:481
bool stopped() const
Definition: concurrency.h:185
std::shared_ptr< active_object<> > _watcher
Definition: concurrency.h:493
void kick()
Definition: concurrency.h:485
bool try_sleep(Duration sleep_time)
Definition: concurrency.h:296
std::mutex _dispatch_mutex
Definition: concurrency.h:397
void invoke_and_wait(T item, std::function< bool()> exit_condition, bool is_blocking=false)
Definition: concurrency.h:346


librealsense2
Author(s): LibRealSense ROS Team
autogenerated on Thu Dec 22 2022 03:43:16