21 #include <semaphore.h> 32 class Allocator = std::allocator<T>>
38 virtual bool empty()
const = 0;
39 virtual size_t size()
const = 0;
40 virtual void setStatusMonitor(std::shared_ptr<StatusMonitor> status_monitor) = 0;
51 class Allocator = std::allocator<T>>
64 status_monitor_ = status_monitor;
73 dequeue_.push_back(value);
84 dequeue_.push_back(value);
91 const std::chrono::microseconds&)
override 98 const std::chrono::microseconds&)
override 110 const std::chrono::microseconds&)
override 112 bool is_data =
false;
113 if (!dequeue_.empty()) {
114 data = dequeue_.front();
115 dequeue_.pop_front();
117 if (dequeue_.empty()) {
127 inline bool empty()
const override {
128 return dequeue_.empty();
134 inline size_t size()
const override {
135 return dequeue_.size();
153 if (status_monitor_) {
154 status_monitor_->setStatus(status);
178 class Allocator = std::allocator<T>>
189 std::unique_lock<DequeueMutex> lock(dequeue_mutex_);
190 return OQ::enqueue(std::move(value));
199 std::unique_lock<DequeueMutex> lock(dequeue_mutex_);
200 return OQ::enqueue(value);
205 const std::chrono::microseconds &duration)
override 207 std::unique_lock<DequeueMutex> lock(dequeue_mutex_, std::defer_lock);
208 bool result = lock.try_lock_for(duration);
217 const std::chrono::microseconds &duration)
override 219 std::unique_lock<DequeueMutex> lock(dequeue_mutex_, std::defer_lock);
220 bool result = lock.try_lock_for(duration);
222 OQ::enqueue(std::move(value));
234 const std::chrono::microseconds &duration)
override 236 std::unique_lock<DequeueMutex> lock(dequeue_mutex_, std::defer_lock);
237 bool result = lock.try_lock_for(duration);
239 result = OQ::dequeue(data, duration);
247 inline bool empty()
const override {
248 std::unique_lock<DequeueMutex> lock(dequeue_mutex_);
255 inline size_t size()
const override {
256 std::unique_lock<DequeueMutex> lock(dequeue_mutex_);
264 std::unique_lock<DequeueMutex> lock(dequeue_mutex_);
282 class Allocator = std::allocator<T>>
293 if (max_queue_size == 0) {
294 throw std::invalid_argument(
"Max queue size invalid: 0");
296 max_queue_size_ = max_queue_size;
307 bool is_queued =
false;
308 std::unique_lock<std::mutex> lk(dequeue_mutex_);
309 if (OQ::size() <= max_queue_size_) {
317 bool is_queued =
false;
318 std::unique_lock<std::mutex> lk(dequeue_mutex_);
319 if (OQ::size() <= max_queue_size_) {
335 const std::chrono::microseconds &duration)
override 337 std::cv_status (std::condition_variable::*wf)(std::unique_lock<std::mutex>&,
const std::chrono::microseconds&);
338 wf = &std::condition_variable::wait_for;
339 return enqueueOnCondition(
341 std::bind(wf, &condition_variable_, std::placeholders::_1, duration));
346 const std::chrono::microseconds &duration)
override 348 std::cv_status (std::condition_variable::*wf)(std::unique_lock<std::mutex>&,
const std::chrono::microseconds&);
349 wf = &std::condition_variable::wait_for;
350 return enqueueOnCondition(
352 std::bind(wf, &condition_variable_, std::placeholders::_1, duration));
360 inline bool dequeue(T& data,
const std::chrono::microseconds &duration)
override {
361 auto is_retrieved = OQ::dequeue(data, duration);
363 std::unique_lock<std::mutex> lck(dequeue_mutex_);
364 condition_variable_.notify_one();
372 inline bool empty()
const override {
373 std::lock_guard<std::mutex> lock(dequeue_mutex_);
380 inline size_t size()
const override {
381 std::lock_guard<std::mutex> lock(dequeue_mutex_);
389 std::lock_guard<std::mutex> lock(dequeue_mutex_);
395 using WaitFunc = std::function <std::cv_status (std::unique_lock<std::mutex>&)>;
405 std::condition_variable &condition_variable,
406 std::unique_lock<std::mutex> &lock)
408 condition_variable.wait(lock);
409 return std::cv_status::no_timeout;
422 std::unique_lock<std::mutex> lk(dequeue_mutex_);
423 bool can_enqueue =
true;
424 if (OQ::size() >= max_queue_size_) {
425 can_enqueue = wait_func(lk) == std::cv_status::no_timeout;
bool dequeue(T &data, const std::chrono::microseconds &) override
virtual void setStatusMonitor(std::shared_ptr< StatusMonitor > status_monitor)=0
bool dequeue(T &data, const std::chrono::microseconds &duration) override
void setStatusMonitor(std::shared_ptr< StatusMonitor > status_monitor) override
bool enqueue(T &value) override
bool tryEnqueue(T &&value, const std::chrono::microseconds &duration) override
bool empty() const override
size_t size() const override
bool tryEnqueue(T &value, const std::chrono::microseconds &duration) override
bool enqueue(T &value) override
bool tryEnqueue(T &&value, const std::chrono::microseconds &duration) override
virtual size_t size() const =0
bool tryEnqueue(T &value, const std::chrono::microseconds &duration) override
std::mutex dequeue_mutex_
bool enqueue(T &value) override
bool empty() const override
bool tryEnqueue(T &value, const std::chrono::microseconds &) override
bool enqueue(T &&value) override
std::condition_variable condition_variable_
std::function< std::cv_status(std::unique_lock< std::mutex > &)> WaitFunc
bool enqueueOnCondition(T &value, const WaitFunc &wait_func)
DequeueMutex dequeue_mutex_
void notifyMonitor(const Status &status)
std::deque< T, Allocator > dequeue_
std::timed_mutex DequeueMutex
std::shared_ptr< StatusMonitor > status_monitor_
size_t size() const override
bool dequeue(T &data, const std::chrono::microseconds &duration) override
virtual bool enqueue(T &&value)=0
size_t size() const override
static std::cv_status wait(std::condition_variable &condition_variable, std::unique_lock< std::mutex > &lock)
bool tryEnqueue(T &&value, const std::chrono::microseconds &) override
bool enqueue(T &&value) override
virtual bool empty() const =0
bool enqueue(T &&value) override
bool empty() const override
ObservedBlockingQueue(const size_t &max_queue_size)