7 #include <condition_variable> 30 std::function< void( T const & ) > on_drop_callback = nullptr )
41 std::unique_lock<std::mutex>
lock(_mutex);
44 if( _on_drop_callback )
51 if( _queue.size() >
_cap )
53 if( _on_drop_callback )
71 std::unique_lock<std::mutex>
lock(_mutex);
72 _enq_cv.wait( lock, [
this]() {
73 return _queue.size() <
_cap; } );
77 if( _on_drop_callback )
94 bool dequeue(
T * item,
unsigned int timeout_ms )
96 std::unique_lock<std::mutex>
lock(_mutex);
97 if( ! _deq_cv.wait_for( lock,
98 std::chrono::milliseconds( timeout_ms ),
99 [
this]() { return ! _accepting || ! _queue.empty(); } )
109 _enq_cv.notify_one();
118 std::lock_guard< std::mutex >
lock( _mutex );
126 _enq_cv.notify_one();
134 std::lock_guard< std::mutex >
lock( _mutex );
137 fn( _queue.front() );
144 std::lock_guard< std::mutex >
lock( _mutex );
147 fn( _queue.front() );
153 std::lock_guard< std::mutex >
lock( _mutex );
163 std::lock_guard< std::mutex >
lock( _mutex );
173 _enq_cv.notify_all();
174 _deq_cv.notify_all();
180 std::lock_guard< std::mutex >
lock( _mutex );
189 std::lock_guard< std::mutex >
lock( _mutex );
190 return _queue.size();
204 std::function< void( T const & ) > on_drop_callback = nullptr )
205 :
_queue( cap, on_drop_callback )
211 if( item->is_blocking() )
219 return _queue.
dequeue(item, timeout_ms);
230 return _queue.
peek( fn );
236 return _queue.
peek( fn );
256 return _queue.
size();
261 return _queue.
empty();
295 template<
class Duration >
306 _owner->
_was_stopped_cv.wait_for( lock, sleep_time, [&]() {
return was_stopped(); } ) );
311 typedef std::function<void(cancellable_timer const &)>
action;
318 std::function<
void( action ) > on_drop_callback =
nullptr );
346 void invoke_and_wait(
T item, std::function<
bool()> exit_condition,
bool is_blocking =
false)
354 std::lock_guard<std::mutex> lk(_blocking_invoke_mutex);
358 _blocking_invoke_cv.notify_one();
362 std::unique_lock<std::mutex> lk(_blocking_invoke_mutex);
363 _blocking_invoke_cv.wait(lk, [&](){
return done || exit_condition(); });
379 bool flush(std::chrono::steady_clock::duration
timeout = std::chrono::seconds(10) );
386 bool _wait_for_start(
int timeout_ms );
405 template<
class T = std::function<
void(dispatcher::cancellable_timer)>>
410 : _operation(
std::
move(operation)), _dispatcher(1), _stopped(true)
424 if (!_stopped.load()) {
461 _timeout_ms(timeout_ms), _operation(
std::
move(operation))
465 if(cancellable_timer.try_sleep( std::chrono::milliseconds( _timeout_ms )))
469 std::lock_guard<std::mutex> lk(_m);
481 void start() { std::lock_guard<std::mutex> lk(_m); _watcher->start(); _running =
true; }
482 void stop() { { std::lock_guard<std::mutex> lk(_m); _running =
false; } _watcher->stop(); }
483 bool running() { std::lock_guard<std::mutex> lk(_m);
return _running; }
485 void kick() { std::lock_guard<std::mutex> lk(_m); _kicked =
true; }
490 bool _kicked =
false;
491 bool _running =
false;
static const textual_icon lock
single_consumer_queue< std::function< void(cancellable_timer)> > _queue
active_object(T operation)
bool blocking_enqueue(T &&item)
std::function< void(T const &)> const _on_drop_callback
std::mutex _was_stopped_mutex
void set_timeout(uint64_t timeout_ms)
watchdog(std::function< void()> operation, uint64_t timeout_ms)
dispatcher invoke([&](dispatcher::cancellable_timer c) { std::this_thread::sleep_for(std::chrono::seconds(3));dispatched_end_verifier=true;})
cancellable_timer(dispatcher *owner)
bool dequeue(T *item, unsigned int timeout_ms)
std::condition_variable _was_stopped_cv
single_consumer_queue< T > _queue
std::mutex _blocking_invoke_mutex
std::function< void()> _operation
std::atomic< bool > _stopped
::std_msgs::Duration_< std::allocator< void > > Duration
std::condition_variable _deq_cv
std::function< void(cancellable_timer const &)> action
std::atomic< bool > _is_alive
bool dequeue(T *item, unsigned int timeout_ms)
bool try_dequeue(T *item)
std::condition_variable _enq_cv
std::atomic< bool > _was_stopped
std::condition_variable _blocking_invoke_cv
unsigned __int64 uint64_t
bool try_dequeue(T *item)
GLbitfield GLuint64 timeout
void invoke(T item, bool is_blocking=false)
typename ::boost::move_detail::remove_reference< T >::type && move(T &&t) BOOST_NOEXCEPT
std::shared_ptr< active_object<> > _watcher
bool try_sleep(Duration sleep_time)
std::mutex _dispatch_mutex
void invoke_and_wait(T item, std::function< bool()> exit_condition, bool is_blocking=false)