blocking_queue.h
Go to the documentation of this file.
00001 #ifndef MEGATREE_BLOCKING_QUEUE
00002 #define MEGATREE_BLOCKING_QUEUE
00003 
00004 #include <deque>
00005 #include <boost/thread/mutex.hpp>
00006 
00007 namespace megatree
00008 {
00009 
00010 static size_t WRITE_QUEUE_SIZE = 1000;
00011 
00012 //This classs provides a thread safe implementation of a queue. It also
00013 //enforces a max size, so calls to enqueue will block when this size is reached
00014 //until dequeue is called.
00015 template <class T>
00016 class BlockingQueue
00017 {
00018   public:
00019     BlockingQueue(size_t max_size = WRITE_QUEUE_SIZE) 
00020       : max_size_(max_size)
00021     {
00022     }
00023 
00024     //Enqueue will block on a call to enqueue when the maximum size of the
00025     //queue is reached. Otherwise, it just locks the queue and pushes something
00026     //back onto it
00027     size_t enqueue(const T& item)
00028     {
00029       boost::mutex::scoped_lock lock(mutex_);
00030       if (max_size_ > 0)
00031       {
00032         // Waits for the queue to drop below the maximum size
00033         while (queue_.size() >= max_size_)
00034           enqueue_condition.wait(lock);
00035       }
00036 
00037       //add the item onto the end of the queue
00038       queue_.push_back(item);
00039 
00040       //notify any calls to dequeue that are blocked waiting for input
00041       dequeue_condition.notify_one();
00042       return queue_.size();
00043     }
00044 
00045     //Dequeue will block on a call to dequeue when the queue is empty.
00046     //Otherwise, it just locks the queue and pops something from it
00047     T dequeue()
00048     {
00049       boost::mutex::scoped_lock lock(mutex_);
00050 
00051       //if there's nothing in the queue, we'll wait for something to be added
00052       if(queue_.empty())
00053         dequeue_condition.wait(lock);
00054 
00055       //there's something on the queue, we need to pop it off
00056       T item =  queue_.front();
00057       queue_.pop_front();
00058 
00059       //make sure to notify any waiting calls to enqueue that something has
00060       //been removed from the queue
00061       enqueue_condition.notify_one();
00062 
00063       return item;
00064     }
00065 
00066     void dequeueBatch(size_t batch_size, std::vector<T>& batch)
00067     {
00068       batch.clear();
00069       boost::mutex::scoped_lock lock(mutex_);
00070 
00071       //it there's nothing in the queue, we'll wait for something to be added
00072       if(queue_.empty())
00073         dequeue_condition.wait(lock);
00074 
00075       //we want to pull at most batch_size items from the queue
00076       while(batch.size() < batch_size && !queue_.empty())
00077       {
00078         batch.push_back(queue_.front());
00079         queue_.pop_front();
00080 
00081         //make sure to notify any waiting calles to enqueue that something has
00082         //been removed from the queue
00083         enqueue_condition.notify_one();
00084       }
00085     }
00086 
00087     void unblockAll()
00088     {
00089       enqueue_condition.notify_all();
00090       dequeue_condition.notify_all();
00091     }
00092 
00093   private:
00094     size_t max_size_;
00095     std::deque<T> queue_;
00096     boost::mutex mutex_;
00097     boost::condition enqueue_condition, dequeue_condition;
00098 };
00099 
00100 }
00101 
00102 #endif


megatree_core
Author(s): Stuart Glaser
autogenerated on Mon Dec 2 2013 13:01:15