LockingQueue.hpp
Go to the documentation of this file.
1 #pragma once
2 #include <atomic>
3 #include <condition_variable>
4 #include <functional>
5 #include <limits>
6 #include <mutex>
7 #include <queue>
8 
9 namespace dai {
10 
11 template <typename T>
12 class LockingQueue {
13  public:
14  LockingQueue() = default;
15  explicit LockingQueue(unsigned maxSize, bool blocking = true) : maxSize(maxSize), blocking(blocking) {}
16 
17  void setMaxSize(unsigned sz) {
18  // Lock first
19  std::unique_lock<std::mutex> lock(guard);
20  maxSize = sz;
21  }
22 
23  void setBlocking(bool bl) {
24  // Lock first
25  std::unique_lock<std::mutex> lock(guard);
26  blocking = bl;
27  }
28 
29  unsigned getMaxSize() const {
30  // Lock first
31  std::unique_lock<std::mutex> lock(guard);
32  return maxSize;
33  }
34 
35  bool getBlocking() const {
36  // Lock first
37  std::unique_lock<std::mutex> lock(guard);
38  return blocking;
39  }
40 
41  void destruct() {
42  std::unique_lock<std::mutex> lock(guard);
43  if(!destructed) {
44  signalPop.notify_all();
45  signalPush.notify_all();
46  destructed = true;
47  }
48  }
49  ~LockingQueue() = default;
50 
51  template <typename Rep, typename Period>
52  bool waitAndConsumeAll(std::function<void(T&)> callback, std::chrono::duration<Rep, Period> timeout) {
53  {
54  std::unique_lock<std::mutex> lock(guard);
55 
56  // First checks predicate, then waits
57  bool pred = signalPush.wait_for(lock, timeout, [this]() { return !queue.empty() || destructed; });
58  if(!pred) return false;
59  if(destructed) return false;
60 
61  // Continue here if and only if queue has any elements
62  while(!queue.empty()) {
63  callback(queue.front());
64  queue.pop();
65  }
66  }
67 
68  signalPop.notify_all();
69  return true;
70  }
71 
72  bool waitAndConsumeAll(std::function<void(T&)> callback) {
73  {
74  std::unique_lock<std::mutex> lock(guard);
75 
76  signalPush.wait(lock, [this]() { return !queue.empty() || destructed; });
77  if(queue.empty()) return false;
78  if(destructed) return false;
79 
80  while(!queue.empty()) {
81  callback(queue.front());
82  queue.pop();
83  }
84  }
85 
86  signalPop.notify_all();
87  return true;
88  }
89 
90  bool consumeAll(std::function<void(T&)> callback) {
91  {
92  std::lock_guard<std::mutex> lock(guard);
93 
94  if(queue.empty()) return false;
95 
96  while(!queue.empty()) {
97  callback(queue.front());
98  queue.pop();
99  }
100  }
101 
102  signalPop.notify_all();
103  return true;
104  }
105 
106  bool push(T const& data) {
107  {
108  std::unique_lock<std::mutex> lock(guard);
109  if(maxSize == 0) {
110  // necessary if maxSize was changed
111  while(!queue.empty()) {
112  queue.pop();
113  }
114  return true;
115  }
116  if(!blocking) {
117  // if non blocking, remove as many oldest elements as necessary, so next one will fit
118  // necessary if maxSize was changed
119  while(queue.size() >= maxSize) {
120  queue.pop();
121  }
122  } else {
123  signalPop.wait(lock, [this]() { return queue.size() < maxSize || destructed; });
124  if(destructed) return false;
125  }
126 
127  queue.push(data);
128  }
129  signalPush.notify_all();
130  return true;
131  }
132 
133  template <typename Rep, typename Period>
134  bool tryWaitAndPush(T const& data, std::chrono::duration<Rep, Period> timeout) {
135  {
136  std::unique_lock<std::mutex> lock(guard);
137  if(maxSize == 0) {
138  // necessary if maxSize was changed
139  while(!queue.empty()) {
140  queue.pop();
141  }
142  return true;
143  }
144  if(!blocking) {
145  // if non blocking, remove as many oldest elements as necessary, so next one will fit
146  // necessary if maxSize was changed
147  while(queue.size() >= maxSize) {
148  queue.pop();
149  }
150  } else {
151  // First checks predicate, then waits
152  bool pred = signalPop.wait_for(lock, timeout, [this]() { return queue.size() < maxSize || destructed; });
153  if(!pred) return false;
154  if(destructed) return false;
155  }
156 
157  queue.push(data);
158  }
159  signalPush.notify_all();
160  return true;
161  }
162 
163  bool empty() const {
164  std::lock_guard<std::mutex> lock(guard);
165  return queue.empty();
166  }
167 
168  bool front(T& value) {
169  std::unique_lock<std::mutex> lock(guard);
170  if(queue.empty()) {
171  return false;
172  }
173 
174  value = queue.front();
175  return true;
176  }
177 
178  bool tryPop(T& value) {
179  {
180  std::lock_guard<std::mutex> lock(guard);
181  if(queue.empty()) {
182  return false;
183  }
184 
185  value = std::move(queue.front());
186  queue.pop();
187  }
188  signalPop.notify_all();
189  return true;
190  }
191 
192  bool waitAndPop(T& value) {
193  {
194  std::unique_lock<std::mutex> lock(guard);
195 
196  signalPush.wait(lock, [this]() { return (!queue.empty() || destructed); });
197  if(queue.empty()) return false;
198  if(destructed) return false;
199 
200  value = std::move(queue.front());
201  queue.pop();
202  }
203  signalPop.notify_all();
204  return true;
205  }
206 
207  template <typename Rep, typename Period>
208  bool tryWaitAndPop(T& value, std::chrono::duration<Rep, Period> timeout) {
209  {
210  std::unique_lock<std::mutex> lock(guard);
211 
212  // First checks predicate, then waits
213  bool pred = signalPush.wait_for(lock, timeout, [this]() { return !queue.empty() || destructed; });
214  if(!pred) return false;
215  if(destructed) return false;
216 
217  value = std::move(queue.front());
218  queue.pop();
219  }
220  signalPop.notify_all();
221  return true;
222  }
223 
224  void waitEmpty() {
225  std::unique_lock<std::mutex> lock(guard);
226  signalPop.wait(lock, [this]() { return queue.empty() || destructed; });
227  }
228 
229  private:
230  unsigned maxSize = std::numeric_limits<unsigned>::max();
231  bool blocking = true;
232  std::queue<T> queue;
233  mutable std::mutex guard;
234  bool destructed{false};
235  std::condition_variable signalPop;
236  std::condition_variable signalPush;
237 };
238 
239 } // namespace dai
dai::LockingQueue::push
bool push(T const &data)
Definition: LockingQueue.hpp:106
dai::LockingQueue::setMaxSize
void setMaxSize(unsigned sz)
Definition: LockingQueue.hpp:17
dai::LockingQueue::getMaxSize
unsigned getMaxSize() const
Definition: LockingQueue.hpp:29
dai::LockingQueue::LockingQueue
LockingQueue()=default
dai::LockingQueue::guard
std::mutex guard
Definition: LockingQueue.hpp:233
dai::LockingQueue::tryWaitAndPush
bool tryWaitAndPush(T const &data, std::chrono::duration< Rep, Period > timeout)
Definition: LockingQueue.hpp:134
dai::LockingQueue::maxSize
unsigned maxSize
Definition: LockingQueue.hpp:230
DAI_SPAN_NAMESPACE_NAME::detail::data
constexpr auto data(C &c) -> decltype(c.data())
Definition: span.hpp:177
dai::LockingQueue::consumeAll
bool consumeAll(std::function< void(T &)> callback)
Definition: LockingQueue.hpp:90
dai::LockingQueue::queue
std::queue< T > queue
Definition: LockingQueue.hpp:232
dai::LockingQueue::waitAndPop
bool waitAndPop(T &value)
Definition: LockingQueue.hpp:192
dai::LockingQueue::tryWaitAndPop
bool tryWaitAndPop(T &value, std::chrono::duration< Rep, Period > timeout)
Definition: LockingQueue.hpp:208
dai::LockingQueue::empty
bool empty() const
Definition: LockingQueue.hpp:163
dai::LockingQueue::setBlocking
void setBlocking(bool bl)
Definition: LockingQueue.hpp:23
dai::LockingQueue::waitEmpty
void waitEmpty()
Definition: LockingQueue.hpp:224
dai::LockingQueue
Definition: LockingQueue.hpp:12
dai::LockingQueue::blocking
bool blocking
Definition: LockingQueue.hpp:231
dai::LockingQueue::~LockingQueue
~LockingQueue()=default
dai::LockingQueue::getBlocking
bool getBlocking() const
Definition: LockingQueue.hpp:35
dai::LockingQueue::signalPush
std::condition_variable signalPush
Definition: LockingQueue.hpp:236
dai::LockingQueue::front
bool front(T &value)
Definition: LockingQueue.hpp:168
dai::LockingQueue::tryPop
bool tryPop(T &value)
Definition: LockingQueue.hpp:178
dai::LockingQueue::waitAndConsumeAll
bool waitAndConsumeAll(std::function< void(T &)> callback)
Definition: LockingQueue.hpp:72
dai::LockingQueue::waitAndConsumeAll
bool waitAndConsumeAll(std::function< void(T &)> callback, std::chrono::duration< Rep, Period > timeout)
Definition: LockingQueue.hpp:52
dai::LockingQueue::destructed
bool destructed
Definition: LockingQueue.hpp:234
dai::LockingQueue::LockingQueue
LockingQueue(unsigned maxSize, bool blocking=true)
Definition: LockingQueue.hpp:15
dai::LockingQueue::signalPop
std::condition_variable signalPop
Definition: LockingQueue.hpp:235
dai
Definition: CameraExposureOffset.hpp:6
dai::LockingQueue::destruct
void destruct()
Definition: LockingQueue.hpp:41


depthai
Author(s): Martin Peterlin
autogenerated on Sat Mar 22 2025 02:58:19