blocking_queue.h
Go to the documentation of this file.
00001 /*
00002  * Copyright 2016 The Cartographer Authors
00003  *
00004  * Licensed under the Apache License, Version 2.0 (the "License");
00005  * you may not use this file except in compliance with the License.
00006  * You may obtain a copy of the License at
00007  *
00008  *      http://www.apache.org/licenses/LICENSE-2.0
00009  *
00010  * Unless required by applicable law or agreed to in writing, software
00011  * distributed under the License is distributed on an "AS IS" BASIS,
00012  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  * See the License for the specific language governing permissions and
00014  * limitations under the License.
00015  */
00016 
00017 #ifndef CARTOGRAPHER_COMMON_BLOCKING_QUEUE_H_
00018 #define CARTOGRAPHER_COMMON_BLOCKING_QUEUE_H_
00019 
00020 #include <cstddef>
00021 #include <deque>
00022 #include <memory>
00023 
00024 #include "absl/synchronization/mutex.h"
00025 #include "cartographer/common/port.h"
00026 #include "cartographer/common/time.h"
00027 #include "glog/logging.h"
00028 
00029 namespace cartographer {
00030 namespace common {
00031 
00032 // A thread-safe blocking queue that is useful for producer/consumer patterns.
00033 // 'T' must be movable.
00034 template <typename T>
00035 class BlockingQueue {
00036  public:
00037   static constexpr size_t kInfiniteQueueSize = 0;
00038 
00039   // Constructs a blocking queue with infinite queue size.
00040   BlockingQueue() : BlockingQueue(kInfiniteQueueSize) {}
00041 
00042   BlockingQueue(const BlockingQueue&) = delete;
00043   BlockingQueue& operator=(const BlockingQueue&) = delete;
00044 
00045   // Constructs a blocking queue with a size of 'queue_size'.
00046   explicit BlockingQueue(const size_t queue_size) : queue_size_(queue_size) {}
00047 
00048   // Pushes a value onto the queue. Blocks if the queue is full.
00049   void Push(T t) {
00050     const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
00051       return QueueNotFullCondition();
00052     };
00053     absl::MutexLock lock(&mutex_);
00054     mutex_.Await(absl::Condition(&predicate));
00055     deque_.push_back(std::move(t));
00056   }
00057 
00058   // Like push, but returns false if 'timeout' is reached.
00059   bool PushWithTimeout(T t, const common::Duration timeout) {
00060     const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
00061       return QueueNotFullCondition();
00062     };
00063     absl::MutexLock lock(&mutex_);
00064     if (!mutex_.AwaitWithTimeout(absl::Condition(&predicate),
00065                                  absl::FromChrono(timeout))) {
00066       return false;
00067     }
00068     deque_.push_back(std::move(t));
00069     return true;
00070   }
00071 
00072   // Pops the next value from the queue. Blocks until a value is available.
00073   T Pop() {
00074     const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
00075       return !QueueEmptyCondition();
00076     };
00077     absl::MutexLock lock(&mutex_);
00078     mutex_.Await(absl::Condition(&predicate));
00079 
00080     T t = std::move(deque_.front());
00081     deque_.pop_front();
00082     return t;
00083   }
00084 
00085   // Like Pop, but can timeout. Returns nullptr in this case.
00086   T PopWithTimeout(const common::Duration timeout) {
00087     const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
00088       return !QueueEmptyCondition();
00089     };
00090     absl::MutexLock lock(&mutex_);
00091     if (!mutex_.AwaitWithTimeout(absl::Condition(&predicate),
00092                                  absl::FromChrono(timeout))) {
00093       return nullptr;
00094     }
00095     T t = std::move(deque_.front());
00096     deque_.pop_front();
00097     return t;
00098   }
00099 
00100   // Like Peek, but can timeout. Returns nullptr in this case.
00101   template <typename R>
00102   R* PeekWithTimeout(const common::Duration timeout) {
00103     const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
00104       return !QueueEmptyCondition();
00105     };
00106     absl::MutexLock lock(&mutex_);
00107     if (!mutex_.AwaitWithTimeout(absl::Condition(&predicate),
00108                                  absl::FromChrono(timeout))) {
00109       return nullptr;
00110     }
00111     return deque_.front().get();
00112   }
00113 
00114   // Returns the next value in the queue or nullptr if the queue is empty.
00115   // Maintains ownership. This assumes a member function get() that returns
00116   // a pointer to the given type R.
00117   template <typename R>
00118   const R* Peek() {
00119     absl::MutexLock lock(&mutex_);
00120     if (deque_.empty()) {
00121       return nullptr;
00122     }
00123     return deque_.front().get();
00124   }
00125 
00126   // Returns the number of items currently in the queue.
00127   size_t Size() {
00128     absl::MutexLock lock(&mutex_);
00129     return deque_.size();
00130   }
00131 
00132   // Blocks until the queue is empty.
00133   void WaitUntilEmpty() {
00134     const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
00135       return QueueEmptyCondition();
00136     };
00137     absl::MutexLock lock(&mutex_);
00138     mutex_.Await(absl::Condition(&predicate));
00139   }
00140 
00141  private:
00142   // Returns true iff the queue is empty.
00143   bool QueueEmptyCondition() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
00144     return deque_.empty();
00145   }
00146 
00147   // Returns true iff the queue is not full.
00148   bool QueueNotFullCondition() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
00149     return queue_size_ == kInfiniteQueueSize || deque_.size() < queue_size_;
00150   }
00151 
00152   absl::Mutex mutex_;
00153   const size_t queue_size_ GUARDED_BY(mutex_);
00154   std::deque<T> deque_ GUARDED_BY(mutex_);
00155 };
00156 
00157 }  // namespace common
00158 }  // namespace cartographer
00159 
00160 #endif  // CARTOGRAPHER_COMMON_BLOCKING_QUEUE_H_


cartographer
Author(s): The Cartographer Authors
autogenerated on Thu May 9 2019 02:27:35