queue_monitor.h
Go to the documentation of this file.
1 /*
2  * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  * http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 #pragma once
17 
18 #include <vector>
19 
24 
25 namespace Aws {
26 namespace DataFlow {
27 
28 template <typename T>
29 class QueueDemux {
30 public:
31  virtual ~QueueDemux() = default;
32  virtual void addSource(std::shared_ptr<IObservedQueue < T>>, PriorityOptions) = 0;
33 };
34 
41 template <typename T>
42 class QueueMonitor :
43  public QueueDemux<T>,
45  public Source<T>
46 {
47 public:
48  QueueMonitor() = default;
49  ~QueueMonitor() override = default;
50 
51  inline void addSource(
52  std::shared_ptr<IObservedQueue < T>>observed_queue,
53  PriorityOptions priority_options) override
54  {
55  auto status_monitor = std::make_shared<StatusMonitor>();
56  addStatusMonitor(status_monitor);
57  observed_queue->setStatusMonitor(status_monitor);
58  priority_vector_.push_back(QueuePriorityPair(observed_queue, priority_options));
59  std::sort(priority_vector_.begin(), priority_vector_.end(), std::greater<QueuePriorityPair>());
60  }
61 
67  inline bool dequeue(
68  T& data,
69  const std::chrono::microseconds &duration) override
70 {
72  bool is_dequeued = false;
73  for (auto &queue : priority_vector_)
74  {
75  is_dequeued = queue.observed_queue->dequeue(data, std::chrono::microseconds(0));
76  if (is_dequeued)
77  {
78  break;
79  }
80  }
81  return is_dequeued;
82  }
83 
84 protected:
88  inline bool hasWork() override {
89  return static_cast<bool>(mask_);
90  }
91 
92 private:
93 
98  std::shared_ptr<IObservedQueue<T>> observed_queue;
100 
102  std::shared_ptr<IObservedQueue<T>> queue,
103  PriorityOptions options)
104  {
105  observed_queue = queue;
106  priority_options = options;
107  }
108 
109  inline bool operator > (const QueuePriorityPair &pair) const {
110  return priority_options > pair.priority_options;
111  }
112 
113  inline bool operator < (const QueuePriorityPair &pair) const {
114  return priority_options < pair.priority_options;
115  }
116  };
117 
121  std::vector<QueuePriorityPair> priority_vector_;
122 };
123 
124 } // namespace DataFlow
125 } // namespace Aws
virtual ~QueueDemux()=default
std::vector< QueuePriorityPair > priority_vector_
virtual void addSource(std::shared_ptr< IObservedQueue< T >>, PriorityOptions)=0
QueuePriorityPair(std::shared_ptr< IObservedQueue< T >> queue, PriorityOptions options)
std::shared_ptr< IObservedQueue< T > > observed_queue
Definition: queue_monitor.h:98
void addSource(std::shared_ptr< IObservedQueue< T >>observed_queue, PriorityOptions priority_options) override
Definition: queue_monitor.h:51
bool dequeue(T &data, const std::chrono::microseconds &duration) override
Definition: queue_monitor.h:67


dataflow_lite
Author(s): AWS RoboMaker
autogenerated on Fri May 7 2021 02:18:22