blocking_queue.h
Go to the documentation of this file.
1 /*
2  * Copyright 2016 The Cartographer Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef CARTOGRAPHER_COMMON_BLOCKING_QUEUE_H_
18 #define CARTOGRAPHER_COMMON_BLOCKING_QUEUE_H_
19 
20 #include <cstddef>
21 #include <deque>
22 #include <memory>
23 
27 #include "glog/logging.h"
28 
29 namespace cartographer {
30 namespace common {
31 
32 // A thread-safe blocking queue that is useful for producer/consumer patterns.
33 // 'T' must be movable.
34 template <typename T>
36  public:
37  static constexpr size_t kInfiniteQueueSize = 0;
38 
39  // Constructs a blocking queue with infinite queue size.
40  BlockingQueue() : BlockingQueue(kInfiniteQueueSize) {}
41 
42  BlockingQueue(const BlockingQueue&) = delete;
43  BlockingQueue& operator=(const BlockingQueue&) = delete;
44 
45  // Constructs a blocking queue with a size of 'queue_size'.
46  explicit BlockingQueue(const size_t queue_size) : queue_size_(queue_size) {}
47 
48  // Pushes a value onto the queue. Blocks if the queue is full.
49  void Push(T t) {
50  MutexLocker lock(&mutex_);
51  lock.Await([this]() REQUIRES(mutex_) { return QueueNotFullCondition(); });
52  deque_.push_back(std::move(t));
53  }
54 
55  // Like push, but returns false if 'timeout' is reached.
56  bool PushWithTimeout(T t, const common::Duration timeout) {
57  MutexLocker lock(&mutex_);
58  if (!lock.AwaitWithTimeout(
59  [this]() REQUIRES(mutex_) { return QueueNotFullCondition(); },
60  timeout)) {
61  return false;
62  }
63  deque_.push_back(std::move(t));
64  return true;
65  }
66 
67  // Pops the next value from the queue. Blocks until a value is available.
68  T Pop() {
69  MutexLocker lock(&mutex_);
70  lock.Await([this]() REQUIRES(mutex_) { return !QueueEmptyCondition(); });
71 
72  T t = std::move(deque_.front());
73  deque_.pop_front();
74  return t;
75  }
76 
77  // Like Pop, but can timeout. Returns nullptr in this case.
78  T PopWithTimeout(const common::Duration timeout) {
79  MutexLocker lock(&mutex_);
80  if (!lock.AwaitWithTimeout(
81  [this]() REQUIRES(mutex_) { return !QueueEmptyCondition(); },
82  timeout)) {
83  return nullptr;
84  }
85  T t = std::move(deque_.front());
86  deque_.pop_front();
87  return t;
88  }
89 
90  // Returns the next value in the queue or nullptr if the queue is empty.
91  // Maintains ownership. This assumes a member function get() that returns
92  // a pointer to the given type R.
93  template <typename R>
94  const R* Peek() {
95  MutexLocker lock(&mutex_);
96  if (deque_.empty()) {
97  return nullptr;
98  }
99  return deque_.front().get();
100  }
101 
102  // Returns the number of items currently in the queue.
103  size_t Size() {
104  MutexLocker lock(&mutex_);
105  return deque_.size();
106  }
107 
108  // Blocks until the queue is empty.
109  void WaitUntilEmpty() {
110  MutexLocker lock(&mutex_);
111  lock.Await([this]() REQUIRES(mutex_) { return QueueEmptyCondition(); });
112  }
113 
114  private:
115  // Returns true iff the queue is empty.
116  bool QueueEmptyCondition() REQUIRES(mutex_) { return deque_.empty(); }
117 
118  // Returns true iff the queue is not full.
120  return queue_size_ == kInfiniteQueueSize || deque_.size() < queue_size_;
121  }
122 
123  Mutex mutex_;
124  const size_t queue_size_ GUARDED_BY(mutex_);
125  std::deque<T> deque_ GUARDED_BY(mutex_);
126 };
127 
128 } // namespace common
129 } // namespace cartographer
130 
131 #endif // CARTOGRAPHER_COMMON_BLOCKING_QUEUE_H_
bool PushWithTimeout(T t, const common::Duration timeout)
static constexpr size_t kInfiniteQueueSize
bool QueueEmptyCondition() REQUIRES(mutex_)
BlockingQueue(const size_t queue_size)
const size_t queue_size_ GUARDED_BY(mutex_)
UniversalTimeScaleClock::duration Duration
Definition: time.h:43
#define REQUIRES(...)
Definition: mutex.h:44
T PopWithTimeout(const common::Duration timeout)
bool QueueNotFullCondition() REQUIRES(mutex_)
Mutex::Locker MutexLocker
Definition: mutex.h:95
BlockingQueue & operator=(const BlockingQueue &)=delete


cartographer
Author(s): The Cartographer Authors
autogenerated on Mon Feb 28 2022 22:00:58