00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #ifndef NODELET_CALLBACK_QUEUE_MANAGER_H
00031 #define NODELET_CALLBACK_QUEUE_MANAGER_H
00032
00033 #include <boost/shared_ptr.hpp>
00034 #include <boost/scoped_array.hpp>
00035 #include <boost/unordered_map.hpp>
00036 #include <boost/thread/mutex.hpp>
00037 #include <boost/thread/thread.hpp>
00038 #include <boost/detail/atomic_count.hpp>
00039
00040 #include <vector>
00041 #include <deque>
00042
00043 namespace nodelet
00044 {
00045 namespace detail
00046 {
00047 class CallbackQueue;
00048 typedef boost::shared_ptr<CallbackQueue> CallbackQueuePtr;
00049
00061 class CallbackQueueManager
00062 {
00063 public:
00064 CallbackQueueManager(uint32_t num_worker_threads=boost::thread::hardware_concurrency());
00065 ~CallbackQueueManager();
00066
00067 void addQueue(const CallbackQueuePtr& queue, bool threaded);
00068 void removeQueue(const CallbackQueuePtr& queue);
00069 void callbackAdded(const CallbackQueuePtr& queue);
00070
00071 uint32_t getNumWorkerThreads();
00072
00073 void stop();
00074
00075 private:
00076 void managerThread();
00077 struct ThreadInfo;
00078 void workerThread(ThreadInfo*);
00079
00080 class ThreadInfo;
00081 ThreadInfo* getSmallestQueue();
00082
00083 struct QueueInfo
00084 {
00085 QueueInfo()
00086 : threaded(false)
00087 , thread_index(0xffffffff)
00088 , in_thread(0)
00089 {}
00090
00091 CallbackQueuePtr queue;
00092 bool threaded;
00093
00094
00095 boost::mutex st_mutex;
00097 uint32_t thread_index;
00098 uint32_t in_thread;
00099 };
00100 typedef boost::shared_ptr<QueueInfo> QueueInfoPtr;
00101
00102 typedef boost::unordered_map<CallbackQueue*, QueueInfoPtr> M_Queue;
00103 M_Queue queues_;
00104 boost::mutex queues_mutex_;
00105
00107 typedef std::vector<CallbackQueuePtr> V_Queue;
00108 V_Queue waiting_;
00109 boost::mutex waiting_mutex_;
00110 boost::condition_variable waiting_cond_;
00111 boost::thread_group tg_;
00112
00113 struct ThreadInfo
00114 {
00115 ThreadInfo()
00116 : calling(0)
00117 {}
00118
00120 boost::mutex queue_mutex;
00121 boost::condition_variable queue_cond;
00122 std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> > queue;
00123 boost::detail::atomic_count calling;
00124
00125 #ifdef NODELET_QUEUE_DEBUG
00126 struct Record
00127 {
00128 Record(double stamp, uint32_t tasks, bool threaded)
00129 : stamp(stamp), tasks(tasks), threaded(threaded)
00130 {}
00131
00132 double stamp;
00133 uint32_t tasks;
00134 bool threaded;
00135 };
00136
00137 std::vector<Record> history;
00138 #endif
00139
00140
00141
00142 static const int ACTUAL_SIZE =
00143 sizeof(boost::mutex) +
00144 sizeof(boost::condition_variable) +
00145 sizeof(std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> >) +
00146 sizeof(boost::detail::atomic_count);
00147 uint8_t pad[((ACTUAL_SIZE + 63) & ~63) - ACTUAL_SIZE];
00148 };
00150 typedef boost::scoped_array<ThreadInfo> V_ThreadInfo;
00151 V_ThreadInfo thread_info_;
00152
00153 bool running_;
00154 uint32_t num_worker_threads_;
00155 };
00156
00157 }
00158 }
00159
00160 #endif // NODELET_CALLBACK_QUEUE_MANAGER_H