Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #ifndef NODELET_CALLBACK_QUEUE_MANAGER_H
00031 #define NODELET_CALLBACK_QUEUE_MANAGER_H
00032
00033 #include <boost/shared_ptr.hpp>
00034 #include <boost/scoped_array.hpp>
00035 #include <boost/unordered_map.hpp>
00036 #include <boost/thread/mutex.hpp>
00037 #include <boost/thread/thread.hpp>
00038 #include <boost/detail/atomic_count.hpp>
00039
00040 #include <vector>
00041 #include <deque>
00042
00043 namespace nodelet
00044 {
00045 namespace detail
00046 {
00047 class CallbackQueue;
00048 typedef boost::shared_ptr<CallbackQueue> CallbackQueuePtr;
00049
00061 class CallbackQueueManager
00062 {
00063 public:
00069 CallbackQueueManager(uint32_t num_worker_threads = 0);
00070 ~CallbackQueueManager();
00071
00072 void addQueue(const CallbackQueuePtr& queue, bool threaded);
00073 void removeQueue(const CallbackQueuePtr& queue);
00074 void callbackAdded(const CallbackQueuePtr& queue);
00075
00076 uint32_t getNumWorkerThreads();
00077
00078 void stop();
00079
00080 private:
00081 void managerThread();
00082 struct ThreadInfo;
00083 void workerThread(ThreadInfo*);
00084
00085 class ThreadInfo;
00086 ThreadInfo* getSmallestQueue();
00087
00088 struct QueueInfo
00089 {
00090 QueueInfo()
00091 : threaded(false)
00092 , thread_index(0xffffffff)
00093 , in_thread(0)
00094 {}
00095
00096 CallbackQueuePtr queue;
00097 bool threaded;
00098
00099
00100 boost::mutex st_mutex;
00102 uint32_t thread_index;
00103 uint32_t in_thread;
00104 };
00105 typedef boost::shared_ptr<QueueInfo> QueueInfoPtr;
00106
00107 typedef boost::unordered_map<CallbackQueue*, QueueInfoPtr> M_Queue;
00108 M_Queue queues_;
00109 boost::mutex queues_mutex_;
00110
00112 typedef std::vector<CallbackQueuePtr> V_Queue;
00113 V_Queue waiting_;
00114 boost::mutex waiting_mutex_;
00115 boost::condition_variable waiting_cond_;
00116 boost::thread_group tg_;
00117
00118 struct ThreadInfo
00119 {
00120 ThreadInfo()
00121 : calling(0)
00122 {}
00123
00125 boost::mutex queue_mutex;
00126 boost::condition_variable queue_cond;
00127 std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> > queue;
00128 boost::detail::atomic_count calling;
00129
00130 #ifdef NODELET_QUEUE_DEBUG
00131 struct Record
00132 {
00133 Record(double stamp, uint32_t tasks, bool threaded)
00134 : stamp(stamp), tasks(tasks), threaded(threaded)
00135 {}
00136
00137 double stamp;
00138 uint32_t tasks;
00139 bool threaded;
00140 };
00141
00142 std::vector<Record> history;
00143 #endif
00144
00145
00146
00147 static const int ACTUAL_SIZE =
00148 sizeof(boost::mutex) +
00149 sizeof(boost::condition_variable) +
00150 sizeof(std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> >) +
00151 sizeof(boost::detail::atomic_count);
00152 uint8_t pad[((ACTUAL_SIZE + 63) & ~63) - ACTUAL_SIZE];
00153 };
00155 typedef boost::scoped_array<ThreadInfo> V_ThreadInfo;
00156 V_ThreadInfo thread_info_;
00157
00158 bool running_;
00159 uint32_t num_worker_threads_;
00160 };
00161
00162 }
00163 }
00164
00165 #endif // NODELET_CALLBACK_QUEUE_MANAGER_H