concurrent_queue.h
Go to the documentation of this file.
00001 #ifndef CONCURRENT_QUEUE_H
00002 #define CONCURRENT_QUEUE_H
00003 
00004 #include <queue>
00005 #include <boost/thread.hpp>
00006 
00007 namespace serial {
00008   namespace utils {
00009 // Based on: http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html
00010 template<typename Data>
00011 class ConcurrentQueue
00012 {
00013 private:
00014   std::queue<Data> the_queue;
00015   mutable boost::mutex the_mutex;
00016   boost::condition_variable the_condition_variable;
00017   bool canceled_;
00018 public:
00019   ConcurrentQueue() : canceled_(false) {}
00020 
00021   void push(Data const& data) {
00022     boost::mutex::scoped_lock lock(the_mutex);
00023     the_queue.push(data);
00024     lock.unlock();
00025     the_condition_variable.notify_one();
00026   }
00027 
00028   bool empty() const {
00029     boost::mutex::scoped_lock lock(the_mutex);
00030     return the_queue.empty();
00031   }
00032 
00033   bool try_pop(Data& popped_value) {
00034     boost::mutex::scoped_lock lock(the_mutex);
00035     if(the_queue.empty()) {
00036       return false;
00037     }
00038 
00039     popped_value=the_queue.front();
00040     the_queue.pop();
00041     return true;
00042   }
00043 
00044   bool timed_wait_and_pop(Data& popped_value, long timeout) {
00045     using namespace boost::posix_time;
00046     bool result;
00047     boost::mutex::scoped_lock lock(the_mutex);
00048     result = !the_queue.empty();
00049     if (!result) {
00050       result = the_condition_variable.timed_wait(lock, milliseconds(timeout));
00051     }
00052 
00053     if (result) {
00054       popped_value = the_queue.front();
00055       the_queue.pop();
00056     }
00057     return result;
00058   }
00059 
00060   void wait_and_pop(Data& popped_value) {
00061     boost::mutex::scoped_lock lock(the_mutex);
00062     while(the_queue.empty() && !this->canceled_) {
00063       the_condition_variable.wait(lock);
00064     }
00065 
00066     if (!this->canceled_) {
00067       popped_value = the_queue.front();
00068       the_queue.pop();
00069     }
00070   }
00071 
00072   size_t size() const {
00073     return the_queue.size();
00074   }
00075 
00076   void cancel() {
00077     this->canceled_ = true;
00078     the_condition_variable.notify_all();
00079   }
00080 
00081   void clear_cancel() {
00082     this->canceled_ = false;
00083   }
00084 
00085   void clear() {
00086     boost::mutex::scoped_lock lock(the_mutex);
00087     while (!the_queue.empty()) {
00088       the_queue.pop();
00089     }
00090   }
00091 };
00092   }
00093 }
00094 
00095 #endif // CONCURRENT_QUEUE_H


serial_utils
Author(s): William Woodall , John Harrison
autogenerated on Thu Jun 6 2019 19:02:26