7 #include <condition_variable> 37 std::unique_lock<std::mutex>
lock(_mutex);
41 if (_queue.size() >
_cap)
43 if (_on_drop_callback)
56 auto pred = [
this]()->
bool {
return _queue.size() < _cap ||
_need_to_flush; };
58 std::unique_lock<std::mutex>
lock(_mutex);
61 _enq_cv.wait(lock, pred);
71 std::unique_lock<std::mutex>
lock(_mutex);
74 const auto ready = [
this]() {
return (_queue.size() > 0) || _need_to_flush; };
75 if (!ready() && !_deq_cv.wait_for(lock, std::chrono::milliseconds(timeout_ms), ready))
80 if (_queue.size() <= 0)
92 std::unique_lock<std::mutex>
lock(_mutex);
94 if (_queue.size() > 0)
107 std::unique_lock<std::mutex>
lock(_mutex);
109 if (_queue.size() <= 0)
113 *item = &_queue.front();
119 std::unique_lock<std::mutex>
lock(_mutex);
122 _need_to_flush =
true;
124 _enq_cv.notify_all();
125 while (_queue.size() > 0)
130 _deq_cv.notify_all();
135 std::unique_lock<std::mutex>
lock(_mutex);
136 _need_to_flush =
false;
142 std::unique_lock<std::mutex>
lock(_mutex);
143 return _queue.size();
157 if (item.is_blocking())
165 return _queue.
dequeue(item, timeout_ms);
170 return _queue.
peek(item);
190 return _queue.
size();
208 std::unique_lock<std::mutex>
lock(_owner->_was_stopped_mutex);
209 auto good = [&]() {
return _owner->_was_stopped.load(); };
210 return !(_owner->_was_stopped_cv.wait_for(lock, milliseconds(ms), good));
216 typedef std::function<void(cancellable_timer const &)>
action;
217 dispatcher(
unsigned int cap, std::function <
void(action)> on_drop_callback =
nullptr)
218 :
_queue(cap, on_drop_callback),
223 _thread = std::thread([&]()
225 int timeout_ms = 5000;
228 std::function<void(cancellable_timer)> item;
230 if (
_queue.dequeue(&item, timeout_ms))
242 std::unique_lock<std::mutex>
lock(_was_flushed_mutex);
245 _was_flushed_cv.notify_all();
266 void invoke_and_wait(
T item, std::function<
bool()> exit_condition,
bool is_blocking =
false)
274 std::lock_guard<std::mutex> lk(_blocking_invoke_mutex);
278 _blocking_invoke_cv.notify_one();
282 std::unique_lock<std::mutex> lk(_blocking_invoke_mutex);
283 _blocking_invoke_cv.wait(lk, [&](){
return done || exit_condition(); });
288 std::unique_lock<std::mutex>
lock(_was_stopped_mutex);
289 _was_stopped =
false;
297 std::unique_lock<std::mutex>
lock(_was_stopped_mutex);
299 if (_was_stopped.load())
return;
302 _was_stopped_cv.notify_all();
308 std::unique_lock<std::mutex>
lock(_was_flushed_mutex);
312 std::unique_lock<std::mutex> lock_was_flushed(_was_flushed_mutex);
313 _was_flushed_cv.wait_for(lock_was_flushed, std::chrono::hours(999999), [&]() {
return _was_flushed.load(); });
324 if (_thread.joinable())
331 std::condition_variable
cv;
332 bool invoked =
false;
333 auto wait_sucess = std::make_shared<std::atomic_bool>(
true);
337 if (_was_stopped || !(*wait_sucess))
341 std::lock_guard<std::mutex> locker(m);
346 std::unique_lock<std::mutex> locker(m);
347 *wait_sucess = cv.wait_for(locker, std::chrono::seconds(10), [&]() {
return invoked || _was_stopped; });
353 return _queue.size() == 0;
375 template<
class T = std::function<
void(dispatcher::cancellable_timer)>>
380 : _operation(
std::
move(operation)), _dispatcher(1), _stopped(true)
394 if (!_stopped.load()) {
431 _timeout_ms(timeout_ms), _operation(
std::
move(operation))
435 if(cancellable_timer.try_sleep(_timeout_ms))
439 std::lock_guard<std::mutex> lk(_m);
451 void start() { std::lock_guard<std::mutex> lk(_m); _watcher->start(); _running =
true; }
452 void stop() { { std::lock_guard<std::mutex> lk(_m); _running =
false; } _watcher->stop(); }
453 bool running() { std::lock_guard<std::mutex> lk(_m);
return _running; }
455 void kick() { std::lock_guard<std::mutex> lk(_m); _kicked =
true; }
460 bool _kicked =
false;
461 bool _running =
false;
static const textual_icon lock
single_consumer_queue< std::function< void(cancellable_timer)> > _queue
active_object(T operation)
dispatcher(unsigned int cap, std::function< void(action)> on_drop_callback=nullptr)
std::mutex _was_stopped_mutex
std::condition_variable _was_flushed_cv
void set_timeout(uint64_t timeout_ms)
watchdog(std::function< void()> operation, uint64_t timeout_ms)
cancellable_timer(dispatcher *owner)
bool dequeue(T *item, unsigned int timeout_ms)
void blocking_enqueue(T &&item)
std::condition_variable _was_stopped_cv
single_consumer_queue< T > _queue
static const textual_icon stop
std::mutex _blocking_invoke_mutex
std::function< void()> _operation
std::atomic< bool > _stopped
std::condition_variable _deq_cv
std::function< void(cancellable_timer const &)> action
std::atomic< bool > _was_flushed
std::atomic< bool > _is_alive
bool dequeue(T *item, unsigned int timeout_ms)
std::atomic< bool > _need_to_flush
std::function< void(T const &)> _on_drop_callback
bool try_dequeue(T *item)
std::condition_variable _enq_cv
std::atomic< bool > _was_stopped
std::condition_variable _blocking_invoke_cv
static std::condition_variable cv
unsigned __int64 uint64_t
bool try_dequeue(T *item)
std::mutex _was_flushed_mutex
void invoke(T item, bool is_blocking=false)
typename::boost::move_detail::remove_reference< T >::type && move(T &&t) BOOST_NOEXCEPT
std::atomic< bool > _was_flushed
std::shared_ptr< active_object<> > _watcher
bool try_sleep(std::chrono::milliseconds::rep ms)
void invoke_and_wait(T item, std::function< bool()> exit_condition, bool is_blocking=false)