00001 #ifndef CONCURRENT_QUEUE_H 00002 #define CONCURRENT_QUEUE_H 00003 00004 #include <queue> 00005 #include <boost/thread.hpp> 00006 00007 namespace serial { 00008 namespace utils { 00009 // Based on: http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html 00010 template<typename Data> 00011 class ConcurrentQueue 00012 { 00013 private: 00014 std::queue<Data> the_queue; 00015 mutable boost::mutex the_mutex; 00016 boost::condition_variable the_condition_variable; 00017 bool canceled_; 00018 public: 00019 ConcurrentQueue() : canceled_(false) {} 00020 00021 void push(Data const& data) { 00022 boost::mutex::scoped_lock lock(the_mutex); 00023 the_queue.push(data); 00024 lock.unlock(); 00025 the_condition_variable.notify_one(); 00026 } 00027 00028 bool empty() const { 00029 boost::mutex::scoped_lock lock(the_mutex); 00030 return the_queue.empty(); 00031 } 00032 00033 bool try_pop(Data& popped_value) { 00034 boost::mutex::scoped_lock lock(the_mutex); 00035 if(the_queue.empty()) { 00036 return false; 00037 } 00038 00039 popped_value=the_queue.front(); 00040 the_queue.pop(); 00041 return true; 00042 } 00043 00044 bool timed_wait_and_pop(Data& popped_value, long timeout) { 00045 using namespace boost::posix_time; 00046 bool result; 00047 boost::mutex::scoped_lock lock(the_mutex); 00048 result = !the_queue.empty(); 00049 if (!result) { 00050 result = the_condition_variable.timed_wait(lock, milliseconds(timeout)); 00051 } 00052 00053 if (result) { 00054 popped_value = the_queue.front(); 00055 the_queue.pop(); 00056 } 00057 return result; 00058 } 00059 00060 void wait_and_pop(Data& popped_value) { 00061 boost::mutex::scoped_lock lock(the_mutex); 00062 while(the_queue.empty() && !this->canceled_) { 00063 the_condition_variable.wait(lock); 00064 } 00065 00066 if (!this->canceled_) { 00067 popped_value = the_queue.front(); 00068 the_queue.pop(); 00069 } 00070 } 00071 00072 size_t size() const { 00073 return the_queue.size(); 00074 } 00075 00076 void cancel() { 00077 this->canceled_ = true; 00078 the_condition_variable.notify_all(); 00079 } 00080 00081 void clear_cancel() { 00082 this->canceled_ = false; 00083 } 00084 00085 void clear() { 00086 boost::mutex::scoped_lock lock(the_mutex); 00087 while (!the_queue.empty()) { 00088 the_queue.pop(); 00089 } 00090 } 00091 }; 00092 } 00093 } 00094 00095 #endif // CONCURRENT_QUEUE_H