00001 #ifndef MEGATREE_BLOCKING_QUEUE 00002 #define MEGATREE_BLOCKING_QUEUE 00003 00004 #include <deque> 00005 #include <boost/thread/mutex.hpp> 00006 00007 namespace megatree 00008 { 00009 00010 static size_t WRITE_QUEUE_SIZE = 1000; 00011 00012 //This classs provides a thread safe implementation of a queue. It also 00013 //enforces a max size, so calls to enqueue will block when this size is reached 00014 //until dequeue is called. 00015 template <class T> 00016 class BlockingQueue 00017 { 00018 public: 00019 BlockingQueue(size_t max_size = WRITE_QUEUE_SIZE) 00020 : max_size_(max_size) 00021 { 00022 } 00023 00024 //Enqueue will block on a call to enqueue when the maximum size of the 00025 //queue is reached. Otherwise, it just locks the queue and pushes something 00026 //back onto it 00027 size_t enqueue(const T& item) 00028 { 00029 boost::mutex::scoped_lock lock(mutex_); 00030 if (max_size_ > 0) 00031 { 00032 // Waits for the queue to drop below the maximum size 00033 while (queue_.size() >= max_size_) 00034 enqueue_condition.wait(lock); 00035 } 00036 00037 //add the item onto the end of the queue 00038 queue_.push_back(item); 00039 00040 //notify any calls to dequeue that are blocked waiting for input 00041 dequeue_condition.notify_one(); 00042 return queue_.size(); 00043 } 00044 00045 //Dequeue will block on a call to dequeue when the queue is empty. 00046 //Otherwise, it just locks the queue and pops something from it 00047 T dequeue() 00048 { 00049 boost::mutex::scoped_lock lock(mutex_); 00050 00051 //if there's nothing in the queue, we'll wait for something to be added 00052 if(queue_.empty()) 00053 dequeue_condition.wait(lock); 00054 00055 //there's something on the queue, we need to pop it off 00056 T item = queue_.front(); 00057 queue_.pop_front(); 00058 00059 //make sure to notify any waiting calls to enqueue that something has 00060 //been removed from the queue 00061 enqueue_condition.notify_one(); 00062 00063 return item; 00064 } 00065 00066 void dequeueBatch(size_t batch_size, std::vector<T>& batch) 00067 { 00068 batch.clear(); 00069 boost::mutex::scoped_lock lock(mutex_); 00070 00071 //it there's nothing in the queue, we'll wait for something to be added 00072 if(queue_.empty()) 00073 dequeue_condition.wait(lock); 00074 00075 //we want to pull at most batch_size items from the queue 00076 while(batch.size() < batch_size && !queue_.empty()) 00077 { 00078 batch.push_back(queue_.front()); 00079 queue_.pop_front(); 00080 00081 //make sure to notify any waiting calles to enqueue that something has 00082 //been removed from the queue 00083 enqueue_condition.notify_one(); 00084 } 00085 } 00086 00087 void unblockAll() 00088 { 00089 enqueue_condition.notify_all(); 00090 dequeue_condition.notify_all(); 00091 } 00092 00093 private: 00094 size_t max_size_; 00095 std::deque<T> queue_; 00096 boost::mutex mutex_; 00097 boost::condition enqueue_condition, dequeue_condition; 00098 }; 00099 00100 } 00101 00102 #endif