00001 #ifndef CVD_INCLUDE_MESSAGE_QUEUE_H 00002 #define CVD_INCLUDE_MESSAGE_QUEUE_H 00003 00004 #include <cvd/synchronized.h> 00005 #include <semaphore.h> 00006 #include <deque> 00007 00008 namespace CVD 00009 { 00010 00012 template<class C> class MessageQueue 00013 { 00014 public: 00016 MessageQueue() 00017 { 00018 sem_init(&empty_slots, 0, 0); 00019 } 00020 00022 ~MessageQueue() 00023 { 00024 sem_destroy(&empty_slots); 00025 } 00026 00029 void write(const C& message) 00030 { 00031 //Lock the queue, so it can be safely used. 00032 queue_mutex.lock(); 00033 queue.push_back(message); 00034 queue_mutex.unlock(); 00035 00036 sem_post(&empty_slots); 00037 } 00038 00042 C read() 00043 { 00044 sem_wait(&empty_slots); 00045 C ret; 00046 00047 queue_mutex.lock(); 00048 ret = queue.front(); 00049 queue.pop_front(); 00050 queue_mutex.unlock(); 00051 00052 return ret; 00053 } 00054 00055 int size() 00056 { 00057 int s; 00058 sem_getvalue(&empty_slots, &s); 00059 return s; 00060 } 00061 00062 private: 00063 Synchronized queue_mutex; 00064 std::deque<C> queue; 00065 sem_t empty_slots; 00066 }; 00067 00068 } 00069 #endif