Go to the documentation of this file.00001
00023 #include "micros_swarm/msg_queue_manager.h"
00024
00025 namespace micros_swarm{
00026
00027 OutMsgQueue::OutMsgQueue(const std::string& name, int size, MsgQueueManager *manager_ptr): name_(name), size_(size), queue_manager_ptr_(manager_ptr)
00028 {
00029 queue_.reset(new cqueue<std::vector<uint8_t> >(size_));
00030 }
00031
00032 OutMsgQueue::~OutMsgQueue()
00033 {
00034 queue_.reset();
00035 }
00036
00037 bool OutMsgQueue::full()
00038 {
00039 boost::shared_lock<boost::shared_mutex> lock(mutex_);
00040 return queue_->full();
00041 }
00042
00043 bool OutMsgQueue::empty()
00044 {
00045 boost::shared_lock<boost::shared_mutex> lock(mutex_);
00046 return queue_->empty();
00047 }
00048
00049 int OutMsgQueue::size()
00050 {
00051 boost::shared_lock<boost::shared_mutex> lock(mutex_);
00052 return queue_->size();
00053 }
00054
00055 const std::vector<uint8_t>& OutMsgQueue::front()
00056 {
00057 boost::shared_lock<boost::shared_mutex> lock(mutex_);
00058 return queue_->front();
00059 }
00060
00061 void OutMsgQueue::pop()
00062 {
00063 boost::unique_lock<boost::shared_mutex> lock(mutex_);
00064 queue_->pop();
00065 }
00066
00067 void OutMsgQueue::push(const std::vector<uint8_t>& msg)
00068 {
00069 boost::unique_lock<boost::shared_mutex> lock(mutex_);
00070 queue_->push(msg);
00071 queue_manager_ptr_->msg_queue_condition.notify_one();
00072 }
00073
00074 MsgQueueManager::MsgQueueManager()
00075 {
00076 queue_map_.clear();
00077 }
00078
00079 MsgQueueManager::~MsgQueueManager()
00080 {
00081 std::map<std::string, OutMsgQueue*>::iterator it;
00082 for(it = queue_map_.begin(); it != queue_map_.end(); it++) {
00083 delete (it->second);
00084 }
00085 queue_map_.clear();
00086 }
00087
00088 void MsgQueueManager::createOutMsgQueue(std::string name, int size)
00089 {
00090 OutMsgQueue *queue = new OutMsgQueue(name, size, this);
00091 queue_map_.insert(std::pair<std::string, OutMsgQueue*>(name, queue));
00092 }
00093
00094 OutMsgQueue* MsgQueueManager::getOutMsgQueue(std::string name)
00095 {
00096 std::map<std::string, OutMsgQueue*>::iterator it = queue_map_.find(name);
00097 if(it != queue_map_.end()) {
00098 return it->second;
00099 }
00100 else {
00101 std::cout<<"get out msg queue "<<name<<" failed!"<<std::endl;
00102 exit(-1);
00103 }
00104 }
00105
00106 void MsgQueueManager::spinAllOutMsgQueueOnce(std::vector<std::vector<uint8_t> >& msg_vec)
00107 {
00108 boost::shared_lock<boost::shared_mutex> lock(msg_queue_mutex);
00109 std::map<std::string, OutMsgQueue*>::iterator it;
00110 for(it = queue_map_.begin(); it != queue_map_.end(); it++) {
00111 std::string name = it->first;
00112 OutMsgQueue *msg_queue = getOutMsgQueue(name);
00113 if(name == "scds_pso") {
00114 for(int i = 0; i < msg_queue->size(); i++) {
00115 msg_vec.push_back(msg_queue->front());
00116 msg_queue->pop();
00117 }
00118 }
00119 else {
00120 if (!msg_queue->empty()) {
00121 msg_vec.push_back(msg_queue->front());
00122 msg_queue->pop();
00123 }
00124 }
00125 }
00126 while(allOutMsgQueueEmpty()) {
00127 msg_queue_condition.wait(lock);
00128 }
00129 }
00130
00131 bool MsgQueueManager::allOutMsgQueueEmpty()
00132 {
00133 std::map<std::string, OutMsgQueue*>::iterator it;
00134 for(it = queue_map_.begin(); it != queue_map_.end(); it++) {
00135 if(!it->second->empty()) {
00136 return false;
00137 }
00138 }
00139 return true;
00140 }
00141 };