Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #ifndef NODELET_CALLBACK_QUEUE_MANAGER_H
00031 #define NODELET_CALLBACK_QUEUE_MANAGER_H
00032
00033 #include <ros/types.h>
00034
00035 #include <boost/shared_ptr.hpp>
00036 #include <boost/scoped_array.hpp>
00037 #include <boost/unordered_map.hpp>
00038 #include <boost/thread/mutex.hpp>
00039 #include <boost/thread/thread.hpp>
00040 #include <boost/detail/atomic_count.hpp>
00041
00042 #include <vector>
00043 #include <deque>
00044
00045 namespace nodelet
00046 {
00047 namespace detail
00048 {
00049 class CallbackQueue;
00050 typedef boost::shared_ptr<CallbackQueue> CallbackQueuePtr;
00051
00063 class CallbackQueueManager
00064 {
00065 public:
00071 CallbackQueueManager(uint32_t num_worker_threads = 0);
00072 ~CallbackQueueManager();
00073
00074 void addQueue(const CallbackQueuePtr& queue, bool threaded);
00075 void removeQueue(const CallbackQueuePtr& queue);
00076 void callbackAdded(const CallbackQueuePtr& queue);
00077
00078 uint32_t getNumWorkerThreads();
00079
00080 void stop();
00081
00082 private:
00083 void managerThread();
00084 struct ThreadInfo;
00085 void workerThread(ThreadInfo*);
00086
00087 class ThreadInfo;
00088 ThreadInfo* getSmallestQueue();
00089
00090 struct QueueInfo
00091 {
00092 QueueInfo()
00093 : threaded(false)
00094 , thread_index(0xffffffff)
00095 , in_thread(0)
00096 {}
00097
00098 CallbackQueuePtr queue;
00099 bool threaded;
00100
00101
00102 boost::mutex st_mutex;
00104 uint32_t thread_index;
00105 uint32_t in_thread;
00106 };
00107 typedef boost::shared_ptr<QueueInfo> QueueInfoPtr;
00108
00109 typedef boost::unordered_map<CallbackQueue*, QueueInfoPtr> M_Queue;
00110 M_Queue queues_;
00111 boost::mutex queues_mutex_;
00112
00114 typedef std::vector<CallbackQueuePtr> V_Queue;
00115 V_Queue waiting_;
00116 boost::mutex waiting_mutex_;
00117 boost::condition_variable waiting_cond_;
00118 boost::thread_group tg_;
00119
00120 struct ThreadInfo
00121 {
00122 ThreadInfo()
00123 : calling(0)
00124 {}
00125
00127 boost::mutex queue_mutex;
00128 boost::condition_variable queue_cond;
00129 std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> > queue;
00130 boost::detail::atomic_count calling;
00131
00132 #ifdef NODELET_QUEUE_DEBUG
00133 struct Record
00134 {
00135 Record(double stamp, uint32_t tasks, bool threaded)
00136 : stamp(stamp), tasks(tasks), threaded(threaded)
00137 {}
00138
00139 double stamp;
00140 uint32_t tasks;
00141 bool threaded;
00142 };
00143
00144 std::vector<Record> history;
00145 #endif
00146
00147
00148
00149 static const int ACTUAL_SIZE =
00150 sizeof(boost::mutex) +
00151 sizeof(boost::condition_variable) +
00152 sizeof(std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> >) +
00153 sizeof(boost::detail::atomic_count);
00154 uint8_t pad[((ACTUAL_SIZE + 63) & ~63) - ACTUAL_SIZE];
00155 };
00157 typedef boost::scoped_array<ThreadInfo> V_ThreadInfo;
00158 V_ThreadInfo thread_info_;
00159
00160 bool running_;
00161 uint32_t num_worker_threads_;
00162 };
00163
00164 }
00165 }
00166
00167 #endif // NODELET_CALLBACK_QUEUE_MANAGER_H