observed_queue.h
Go to the documentation of this file.
1 /*
2  * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  * http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 #pragma once
17 
18 #include <deque>
19 #include <functional>
20 #include <mutex>
21 #include <semaphore.h>
22 
26 
27 namespace Aws {
28 namespace DataFlow {
29 
30 template<
31  class T,
32  class Allocator = std::allocator<T>>
34  public Sink<T>,
35  public Source<T>
36 {
37 public:
38  virtual bool empty() const = 0;
39  virtual size_t size() const = 0;
40  virtual void setStatusMonitor(std::shared_ptr<StatusMonitor> status_monitor) = 0;
41 };
42 
49 template<
50  class T,
51  class Allocator = std::allocator<T>>
53  public IObservedQueue<T, Allocator> {
54 public:
55 
56  ~ObservedQueue() override = default;
57 
63  inline void setStatusMonitor(std::shared_ptr<StatusMonitor> status_monitor) override {
64  status_monitor_ = status_monitor;
65  }
66 
72  inline bool enqueue(T&& value) override {
73  dequeue_.push_back(value);
74  notifyMonitor(AVAILABLE);
75  return true;
76  }
77 
83  inline bool enqueue(T& value) override {
84  dequeue_.push_back(value);
85  notifyMonitor(AVAILABLE);
86  return true;
87  }
88 
89  inline bool tryEnqueue(
90  T& value,
91  const std::chrono::microseconds&) override
92  {
93  return enqueue(value);
94  }
95 
96  inline bool tryEnqueue(
97  T&& value,
98  const std::chrono::microseconds&) override
99  {
100  return enqueue(value);
101  }
102 
108  inline bool dequeue(
109  T& data,
110  const std::chrono::microseconds&) override
111  {
112  bool is_data = false;
113  if (!dequeue_.empty()) {
114  data = dequeue_.front();
115  dequeue_.pop_front();
116  is_data = true;
117  if (dequeue_.empty()) {
118  notifyMonitor(UNAVAILABLE);
119  }
120  }
121  return is_data;
122  }
123 
127  inline bool empty() const override {
128  return dequeue_.empty();
129  }
130 
134  inline size_t size() const override {
135  return dequeue_.size();
136  }
137 
141  void clear() override {
142  dequeue_.clear();
143  }
144 
145 protected:
146 
152  void notifyMonitor(const Status &status) {
153  if (status_monitor_) {
154  status_monitor_->setStatus(status);
155  }
156  }
157 
161  std::shared_ptr<StatusMonitor> status_monitor_;
162 
166  std::deque<T, Allocator> dequeue_;
167 
168 };
169 
176 template<
177  class T,
178  class Allocator = std::allocator<T>>
179 class ObservedSynchronizedQueue : public ObservedQueue<T, Allocator> {
180 public:
181  ~ObservedSynchronizedQueue() override = default;
182 
188  inline bool enqueue(T&& value) override {
189  std::unique_lock<DequeueMutex> lock(dequeue_mutex_);
190  return OQ::enqueue(std::move(value));
191  }
192 
198  inline bool enqueue(T& value) override {
199  std::unique_lock<DequeueMutex> lock(dequeue_mutex_);
200  return OQ::enqueue(value);
201  }
202 
203  inline bool tryEnqueue(
204  T& value,
205  const std::chrono::microseconds &duration) override
206  {
207  std::unique_lock<DequeueMutex> lock(dequeue_mutex_, std::defer_lock);
208  bool result = lock.try_lock_for(duration);
209  if (result) {
210  OQ::enqueue(value);
211  }
212  return result;
213  }
214 
215  inline bool tryEnqueue(
216  T&& value,
217  const std::chrono::microseconds &duration) override
218  {
219  std::unique_lock<DequeueMutex> lock(dequeue_mutex_, std::defer_lock);
220  bool result = lock.try_lock_for(duration);
221  if (result) {
222  OQ::enqueue(std::move(value));
223  }
224  return result;
225  }
226 
232  inline bool dequeue(
233  T& data,
234  const std::chrono::microseconds &duration) override
235  {
236  std::unique_lock<DequeueMutex> lock(dequeue_mutex_, std::defer_lock);
237  bool result = lock.try_lock_for(duration);
238  if (result) {
239  result = OQ::dequeue(data, duration);
240  }
241  return result;
242  }
243 
247  inline bool empty() const override {
248  std::unique_lock<DequeueMutex> lock(dequeue_mutex_);
249  return OQ::empty();
250  }
251 
255  inline size_t size() const override {
256  std::unique_lock<DequeueMutex> lock(dequeue_mutex_);
257  return OQ::size();
258  }
259 
263  void clear() override {
264  std::unique_lock<DequeueMutex> lock(dequeue_mutex_);
265  OQ::clear();
266  }
267 
268 private:
270  // @todo (rddesmon): Dual semaphore for read optimization
271  using DequeueMutex = std::timed_mutex;
273 };
280 template<
281  class T,
282  class Allocator = std::allocator<T>>
283 class ObservedBlockingQueue : public ObservedQueue<T, Allocator> {
284 public:
285 
292  explicit ObservedBlockingQueue(const size_t &max_queue_size) {
293  if (max_queue_size == 0) {
294  throw std::invalid_argument("Max queue size invalid: 0");
295  }
296  max_queue_size_ = max_queue_size;
297  }
298 
299  ~ObservedBlockingQueue() override = default;
305  inline bool enqueue(T&& value) override
306  {
307  bool is_queued = false;
308  std::unique_lock<std::mutex> lk(dequeue_mutex_);
309  if (OQ::size() <= max_queue_size_) {
310  OQ::enqueue(value);
311  is_queued = true;
312  }
313  return is_queued;
314  }
315 
316  inline bool enqueue(T& value) override {
317  bool is_queued = false;
318  std::unique_lock<std::mutex> lk(dequeue_mutex_);
319  if (OQ::size() <= max_queue_size_) {
320  OQ::enqueue(value);
321  is_queued = true;
322  }
323  return is_queued;
324  }
325 
333  inline bool tryEnqueue(
334  T& value,
335  const std::chrono::microseconds &duration) override
336  {
337  std::cv_status (std::condition_variable::*wf)(std::unique_lock<std::mutex>&, const std::chrono::microseconds&);
338  wf = &std::condition_variable::wait_for;
339  return enqueueOnCondition(
340  value,
341  std::bind(wf, &condition_variable_, std::placeholders::_1, duration));
342  }
343 
344  inline bool tryEnqueue(
345  T&& value,
346  const std::chrono::microseconds &duration) override
347  {
348  std::cv_status (std::condition_variable::*wf)(std::unique_lock<std::mutex>&, const std::chrono::microseconds&);
349  wf = &std::condition_variable::wait_for;
350  return enqueueOnCondition(
351  value,
352  std::bind(wf, &condition_variable_, std::placeholders::_1, duration));
353  }
354 
360  inline bool dequeue(T& data, const std::chrono::microseconds &duration) override {
361  auto is_retrieved = OQ::dequeue(data, duration);
362  if (is_retrieved) {
363  std::unique_lock<std::mutex> lck(dequeue_mutex_);
364  condition_variable_.notify_one();
365  }
366  return is_retrieved;
367  }
368 
372  inline bool empty() const override {
373  std::lock_guard<std::mutex> lock(dequeue_mutex_);
374  return OQ::empty();
375  }
376 
380  inline size_t size() const override {
381  std::lock_guard<std::mutex> lock(dequeue_mutex_);
382  return OQ::size();
383  }
384 
388  void clear() override {
389  std::lock_guard<std::mutex> lock(dequeue_mutex_);
390  OQ::clear();
391  }
392 
393 private:
395  using WaitFunc = std::function <std::cv_status (std::unique_lock<std::mutex>&)>;
396 
404  static std::cv_status wait(
405  std::condition_variable &condition_variable,
406  std::unique_lock<std::mutex> &lock)
407  {
408  condition_variable.wait(lock);
409  return std::cv_status::no_timeout;
410  }
411 
419  inline bool enqueueOnCondition(T& value,
420  const WaitFunc &wait_func)
421  {
422  std::unique_lock<std::mutex> lk(dequeue_mutex_);
423  bool can_enqueue = true;
424  if (OQ::size() >= max_queue_size_) {
425  can_enqueue = wait_func(lk) == std::cv_status::no_timeout;
426  }
427  if (can_enqueue) {
428  OQ::enqueue(value);
429  }
430  return can_enqueue;
431  }
432 
434  std::condition_variable condition_variable_;
435  mutable std::mutex dequeue_mutex_;
436 };
437 
438 } // namespace DataFlow
439 } // namespace Aws
bool dequeue(T &data, const std::chrono::microseconds &) override
virtual void setStatusMonitor(std::shared_ptr< StatusMonitor > status_monitor)=0
bool dequeue(T &data, const std::chrono::microseconds &duration) override
void setStatusMonitor(std::shared_ptr< StatusMonitor > status_monitor) override
bool tryEnqueue(T &&value, const std::chrono::microseconds &duration) override
bool empty() const override
size_t size() const override
bool tryEnqueue(T &value, const std::chrono::microseconds &duration) override
bool tryEnqueue(T &&value, const std::chrono::microseconds &duration) override
virtual size_t size() const =0
bool tryEnqueue(T &value, const std::chrono::microseconds &duration) override
bool enqueue(T &value) override
bool tryEnqueue(T &value, const std::chrono::microseconds &) override
std::condition_variable condition_variable_
std::function< std::cv_status(std::unique_lock< std::mutex > &)> WaitFunc
bool enqueueOnCondition(T &value, const WaitFunc &wait_func)
void notifyMonitor(const Status &status)
std::deque< T, Allocator > dequeue_
std::shared_ptr< StatusMonitor > status_monitor_
bool dequeue(T &data, const std::chrono::microseconds &duration) override
virtual bool enqueue(T &&value)=0
static std::cv_status wait(std::condition_variable &condition_variable, std::unique_lock< std::mutex > &lock)
bool tryEnqueue(T &&value, const std::chrono::microseconds &) override
bool enqueue(T &&value) override
virtual bool empty() const =0
bool enqueue(T &&value) override
ObservedBlockingQueue(const size_t &max_queue_size)


dataflow_lite
Author(s): AWS RoboMaker
autogenerated on Fri May 7 2021 02:18:22