Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #ifndef CARTOGRAPHER_COMMON_BLOCKING_QUEUE_H_
00018 #define CARTOGRAPHER_COMMON_BLOCKING_QUEUE_H_
00019
00020 #include <cstddef>
00021 #include <deque>
00022 #include <memory>
00023
00024 #include "absl/synchronization/mutex.h"
00025 #include "cartographer/common/port.h"
00026 #include "cartographer/common/time.h"
00027 #include "glog/logging.h"
00028
00029 namespace cartographer {
00030 namespace common {
00031
00032
00033
00034 template <typename T>
00035 class BlockingQueue {
00036 public:
00037 static constexpr size_t kInfiniteQueueSize = 0;
00038
00039
00040 BlockingQueue() : BlockingQueue(kInfiniteQueueSize) {}
00041
00042 BlockingQueue(const BlockingQueue&) = delete;
00043 BlockingQueue& operator=(const BlockingQueue&) = delete;
00044
00045
00046 explicit BlockingQueue(const size_t queue_size) : queue_size_(queue_size) {}
00047
00048
00049 void Push(T t) {
00050 const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
00051 return QueueNotFullCondition();
00052 };
00053 absl::MutexLock lock(&mutex_);
00054 mutex_.Await(absl::Condition(&predicate));
00055 deque_.push_back(std::move(t));
00056 }
00057
00058
00059 bool PushWithTimeout(T t, const common::Duration timeout) {
00060 const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
00061 return QueueNotFullCondition();
00062 };
00063 absl::MutexLock lock(&mutex_);
00064 if (!mutex_.AwaitWithTimeout(absl::Condition(&predicate),
00065 absl::FromChrono(timeout))) {
00066 return false;
00067 }
00068 deque_.push_back(std::move(t));
00069 return true;
00070 }
00071
00072
00073 T Pop() {
00074 const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
00075 return !QueueEmptyCondition();
00076 };
00077 absl::MutexLock lock(&mutex_);
00078 mutex_.Await(absl::Condition(&predicate));
00079
00080 T t = std::move(deque_.front());
00081 deque_.pop_front();
00082 return t;
00083 }
00084
00085
00086 T PopWithTimeout(const common::Duration timeout) {
00087 const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
00088 return !QueueEmptyCondition();
00089 };
00090 absl::MutexLock lock(&mutex_);
00091 if (!mutex_.AwaitWithTimeout(absl::Condition(&predicate),
00092 absl::FromChrono(timeout))) {
00093 return nullptr;
00094 }
00095 T t = std::move(deque_.front());
00096 deque_.pop_front();
00097 return t;
00098 }
00099
00100
00101 template <typename R>
00102 R* PeekWithTimeout(const common::Duration timeout) {
00103 const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
00104 return !QueueEmptyCondition();
00105 };
00106 absl::MutexLock lock(&mutex_);
00107 if (!mutex_.AwaitWithTimeout(absl::Condition(&predicate),
00108 absl::FromChrono(timeout))) {
00109 return nullptr;
00110 }
00111 return deque_.front().get();
00112 }
00113
00114
00115
00116
00117 template <typename R>
00118 const R* Peek() {
00119 absl::MutexLock lock(&mutex_);
00120 if (deque_.empty()) {
00121 return nullptr;
00122 }
00123 return deque_.front().get();
00124 }
00125
00126
00127 size_t Size() {
00128 absl::MutexLock lock(&mutex_);
00129 return deque_.size();
00130 }
00131
00132
00133 void WaitUntilEmpty() {
00134 const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
00135 return QueueEmptyCondition();
00136 };
00137 absl::MutexLock lock(&mutex_);
00138 mutex_.Await(absl::Condition(&predicate));
00139 }
00140
00141 private:
00142
00143 bool QueueEmptyCondition() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
00144 return deque_.empty();
00145 }
00146
00147
00148 bool QueueNotFullCondition() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
00149 return queue_size_ == kInfiniteQueueSize || deque_.size() < queue_size_;
00150 }
00151
00152 absl::Mutex mutex_;
00153 const size_t queue_size_ GUARDED_BY(mutex_);
00154 std::deque<T> deque_ GUARDED_BY(mutex_);
00155 };
00156
00157 }
00158 }
00159
00160 #endif // CARTOGRAPHER_COMMON_BLOCKING_QUEUE_H_