Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #ifndef NODELET_CALLBACK_QUEUE_MANAGER_H
00031 #define NODELET_CALLBACK_QUEUE_MANAGER_H
00032
00033 #include "nodelet/nodeletdecl.h"
00034
00035 #include <ros/types.h>
00036
00037 #include <boost/shared_ptr.hpp>
00038 #include <boost/scoped_array.hpp>
00039 #include <boost/unordered_map.hpp>
00040 #include <boost/thread/mutex.hpp>
00041 #include <boost/thread/thread.hpp>
00042 #include <boost/detail/atomic_count.hpp>
00043
00044 #include <vector>
00045 #include <deque>
00046
00047 namespace nodelet
00048 {
00049 namespace detail
00050 {
00051 class CallbackQueue;
00052 typedef boost::shared_ptr<CallbackQueue> CallbackQueuePtr;
00053
00065 class NODELETLIB_DECL CallbackQueueManager
00066 {
00067 public:
00073 CallbackQueueManager(uint32_t num_worker_threads = 0);
00074 ~CallbackQueueManager();
00075
00076 void addQueue(const CallbackQueuePtr& queue, bool threaded);
00077 void removeQueue(const CallbackQueuePtr& queue);
00078 void callbackAdded(const CallbackQueuePtr& queue);
00079
00080 uint32_t getNumWorkerThreads();
00081
00082 void stop();
00083
00084 private:
00085 void managerThread();
00086 struct ThreadInfo;
00087 void workerThread(ThreadInfo*);
00088
00089 ThreadInfo* getSmallestQueue();
00090
00091 struct QueueInfo
00092 {
00093 QueueInfo()
00094 : threaded(false)
00095 , thread_index(0xffffffff)
00096 , in_thread(0)
00097 {}
00098
00099 CallbackQueuePtr queue;
00100 bool threaded;
00101
00102
00103 boost::mutex st_mutex;
00105 uint32_t thread_index;
00106 uint32_t in_thread;
00107 };
00108 typedef boost::shared_ptr<QueueInfo> QueueInfoPtr;
00109
00110 typedef boost::unordered_map<CallbackQueue*, QueueInfoPtr> M_Queue;
00111 M_Queue queues_;
00112 boost::mutex queues_mutex_;
00113
00115 typedef std::vector<CallbackQueuePtr> V_Queue;
00116 V_Queue waiting_;
00117 boost::mutex waiting_mutex_;
00118 boost::condition_variable waiting_cond_;
00119 boost::thread_group tg_;
00120
00121 struct ThreadInfo
00122 {
00123 ThreadInfo()
00124 : calling(0)
00125 {}
00126
00128 boost::mutex queue_mutex;
00129 boost::condition_variable queue_cond;
00130 std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> > queue;
00131 boost::detail::atomic_count calling;
00132
00133 #ifdef NODELET_QUEUE_DEBUG
00134 struct Record
00135 {
00136 Record(double stamp, uint32_t tasks, bool threaded)
00137 : stamp(stamp), tasks(tasks), threaded(threaded)
00138 {}
00139
00140 double stamp;
00141 uint32_t tasks;
00142 bool threaded;
00143 };
00144
00145 std::vector<Record> history;
00146 #endif
00147
00148
00149
00150 static const int ACTUAL_SIZE =
00151 sizeof(boost::mutex) +
00152 sizeof(boost::condition_variable) +
00153 sizeof(std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> >) +
00154 sizeof(boost::detail::atomic_count);
00155 uint8_t pad[((ACTUAL_SIZE + 63) & ~63) - ACTUAL_SIZE];
00156 };
00158 typedef boost::scoped_array<ThreadInfo> V_ThreadInfo;
00159 V_ThreadInfo thread_info_;
00160
00161 bool running_;
00162 uint32_t num_worker_threads_;
00163 };
00164
00165 }
00166 }
00167
00168 #endif // NODELET_CALLBACK_QUEUE_MANAGER_H