status_monitor.h
Go to the documentation of this file.
1 /*
2  * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  * http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 #pragma once
17 
18 #include <atomic>
19 #include <condition_variable> // std::condition_variable
20 #include <mutex> // std::mutex, std::unique_lock
21 #include <stdexcept>
22 #include <unordered_map>
23 #include <vector>
24 
25 namespace Aws {
26 namespace DataFlow {
27 
31 enum Status : uint {
34 };
35 
37 
42 public:
43  virtual ~StatusMonitor() = default;
44  void setStatus(const Status &status);
45 
46  inline Status getStatus() const {
47  return status_;
48  }
49 private:
51  inline void setStatusObserver(MultiStatusConditionMonitor *multi_status_cond) {
52  multi_status_cond_ = multi_status_cond;
53  }
56 };
57 
58 class MaskFactory {
59 public:
60 
66  uint64_t getNewMask() {
67  uint64_t current_mask = 0, new_mask;
68  size_t shift = 0;
69  while (current_mask == 0) {
70  new_mask = static_cast<uint64_t>(1) << shift++;
71  current_mask = !(collective_mask_ & new_mask) ? new_mask : 0;
72  if (shift > max_size) {
73  throw std::overflow_error("No more masks available");
74  }
75  }
76  collective_mask_ |= current_mask;
77  return current_mask;
78  }
79 
85  void removeMask(uint64_t mask) {
86  collective_mask_ &= ~mask;
87  }
88 
92  uint64_t getCollectiveMask() const {
93  return collective_mask_;
94  }
95 private:
96  static constexpr size_t max_size = sizeof(uint64_t) * 8;
97  uint64_t collective_mask_ = 0;
98 };
99 
101 public:
102  virtual ~ThreadMonitor() = default;
103  void waitForWork();
104  std::cv_status waitForWork(const std::chrono::microseconds &duration);
105  void notify();
106 private:
107  virtual bool hasWork() = 0;
108  std::mutex idle_mutex_;
109  std::condition_variable work_condition_;
110 };
111 
117 public:
119  mask_ = 0;
120  }
121  ~MultiStatusConditionMonitor() override = default;
122  void addStatusMonitor(std::shared_ptr<StatusMonitor> &status_monitor);
123 protected:
125  virtual void setStatus(const Status &status, StatusMonitor *status_monitor);
126  bool hasWork() override;
128  std::atomic<uint64_t> mask_{};
129  std::unordered_map<StatusMonitor*, uint64_t> status_monitors_;
130 };
131 
132 } // namespace DataFlow
133 } // namespace Aws
void setStatusObserver(MultiStatusConditionMonitor *multi_status_cond)
void removeMask(uint64_t mask)
uint64_t getCollectiveMask() const
void setStatus(const Status &status)
MultiStatusConditionMonitor * multi_status_cond_
std::condition_variable work_condition_
std::unordered_map< StatusMonitor *, uint64_t > status_monitors_
virtual ~StatusMonitor()=default


dataflow_lite
Author(s): AWS RoboMaker
autogenerated on Fri May 7 2021 02:18:22