callback_queue_manager.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2010, Willow Garage, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * * Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  * * Redistributions in binary form must reproduce the above copyright
11  * notice, this list of conditions and the following disclaimer in the
12  * documentation and/or other materials provided with the distribution.
13  * * Neither the name of the Willow Garage, Inc. nor the names of its
14  * contributors may be used to endorse or promote products derived from
15  * this software without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 #ifndef NODELET_CALLBACK_QUEUE_MANAGER_H
31 #define NODELET_CALLBACK_QUEUE_MANAGER_H
32 
33 #include "nodelet/nodeletdecl.h"
34 
35 #include <ros/types.h>
36 
37 #include <boost/shared_ptr.hpp>
38 #include <boost/scoped_array.hpp>
39 #include <boost/unordered_map.hpp>
40 #include <boost/thread/mutex.hpp>
41 #include <boost/thread/thread.hpp>
42 #include <boost/detail/atomic_count.hpp>
43 
44 #include <vector>
45 #include <deque>
46 
47 namespace nodelet
48 {
49 namespace detail
50 {
53 
66 {
67 public:
73  CallbackQueueManager(uint32_t num_worker_threads = 0);
75 
76  void addQueue(const CallbackQueuePtr& queue, bool threaded);
77  void removeQueue(const CallbackQueuePtr& queue);
78  void callbackAdded(const CallbackQueuePtr& queue);
79 
80  uint32_t getNumWorkerThreads();
81 
82  void stop();
83 
84 private:
85  void managerThread();
86  struct ThreadInfo;
87  void workerThread(ThreadInfo*);
88 
89  ThreadInfo* getSmallestQueue();
90 
91  struct QueueInfo
92  {
94  : threaded(false)
95  , thread_index(0xffffffff)
96  , in_thread(0)
97  {}
98 
100  bool threaded;
101 
102  // Only used if threaded == false
103  boost::mutex st_mutex;
105  uint32_t thread_index;
106  uint32_t in_thread;
107  };
109 
110  typedef boost::unordered_map<CallbackQueue*, QueueInfoPtr> M_Queue;
112  boost::mutex queues_mutex_;
113 
115  typedef std::vector<CallbackQueuePtr> V_Queue;
117  boost::mutex waiting_mutex_;
118  boost::condition_variable waiting_cond_;
119  boost::thread_group tg_;
120 
121  struct ThreadInfo
122  {
124  : calling(0)
125  {}
126 
128  boost::mutex queue_mutex;
129  boost::condition_variable queue_cond;
130  std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> > queue;
131  boost::detail::atomic_count calling;
132 
133 #ifdef NODELET_QUEUE_DEBUG
134  struct Record
135  {
136  Record(double stamp, uint32_t tasks, bool threaded)
137  : stamp(stamp), tasks(tasks), threaded(threaded)
138  {}
139 
140  double stamp;
141  uint32_t tasks;
142  bool threaded;
143  };
144 
145  std::vector<Record> history;
146 #endif
147 
148  // Pad sizeof(ThreadInfo) to be a multiple of 64 (cache line size) to avoid false sharing.
149  // This still doesn't guarantee ThreadInfo is actually allocated on a cache line boundary though.
150  static const int ACTUAL_SIZE =
151  sizeof(boost::mutex) +
152  sizeof(boost::condition_variable) +
153  sizeof(std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> >) +
154  sizeof(boost::detail::atomic_count);
155  uint8_t pad[((ACTUAL_SIZE + 63) & ~63) - ACTUAL_SIZE];
156  };
158  typedef boost::scoped_array<ThreadInfo> V_ThreadInfo;
160 
161  bool running_;
163 };
164 
165 } // namespace detail
166 } // namespace nodelet
167 
168 #endif // NODELET_CALLBACK_QUEUE_MANAGER_H
nodelet::detail::CallbackQueueManager::waiting_
V_Queue waiting_
Definition: callback_queue_manager.h:116
nodelet::detail::CallbackQueueManager::ThreadInfo::queue_cond
boost::condition_variable queue_cond
Definition: callback_queue_manager.h:129
nodelet::detail::CallbackQueueManager::running_
bool running_
Definition: callback_queue_manager.h:161
nodelet::detail::CallbackQueueManager::QueueInfoPtr
boost::shared_ptr< QueueInfo > QueueInfoPtr
Definition: callback_queue_manager.h:108
nodelet::detail::CallbackQueueManager::QueueInfo::QueueInfo
QueueInfo()
Definition: callback_queue_manager.h:93
boost::shared_ptr< CallbackQueue >
nodeletdecl.h
nodelet::detail::CallbackQueueManager::QueueInfo
Definition: callback_queue_manager.h:91
nodelet::detail::CallbackQueueManager::queues_mutex_
boost::mutex queues_mutex_
Definition: callback_queue_manager.h:112
nodelet::detail::CallbackQueueManager::QueueInfo::queue
CallbackQueuePtr queue
Definition: callback_queue_manager.h:99
nodelet::detail::CallbackQueueManager::queues_
M_Queue queues_
Definition: callback_queue_manager.h:111
nodelet::detail::CallbackQueue
Definition: callback_queue.h:52
nodelet::detail::CallbackQueueManager::M_Queue
boost::unordered_map< CallbackQueue *, QueueInfoPtr > M_Queue
Definition: callback_queue_manager.h:110
nodelet::detail::CallbackQueueManager::waiting_mutex_
boost::mutex waiting_mutex_
Definition: callback_queue_manager.h:117
nodelet::detail::CallbackQueueManager::ThreadInfo::queue
std::vector< std::pair< CallbackQueuePtr, QueueInfoPtr > > queue
Definition: callback_queue_manager.h:130
nodelet::detail::CallbackQueueManager
Internal use.
Definition: callback_queue_manager.h:65
nodelet::detail::CallbackQueueManager::ThreadInfo::calling
boost::detail::atomic_count calling
Definition: callback_queue_manager.h:131
nodelet::detail::CallbackQueueManager::num_worker_threads_
uint32_t num_worker_threads_
Definition: callback_queue_manager.h:162
nodelet::detail::CallbackQueueManager::QueueInfo::threaded
bool threaded
Definition: callback_queue_manager.h:100
nodelet::detail::CallbackQueueManager::tg_
boost::thread_group tg_
Definition: callback_queue_manager.h:119
nodelet::detail::CallbackQueueManager::V_ThreadInfo
boost::scoped_array< ThreadInfo > V_ThreadInfo
Definition: callback_queue_manager.h:158
nodelet::detail::CallbackQueueManager::QueueInfo::in_thread
uint32_t in_thread
Definition: callback_queue_manager.h:106
nodelet::detail::CallbackQueueManager::waiting_cond_
boost::condition_variable waiting_cond_
Definition: callback_queue_manager.h:118
nodelet::detail::CallbackQueueManager::ThreadInfo
Definition: callback_queue_manager.h:121
nodelet::detail::CallbackQueueManager::thread_info_
V_ThreadInfo thread_info_
Definition: callback_queue_manager.h:159
nodelet
Definition: callback_queue.h:44
nodelet::detail::CallbackQueueManager::ThreadInfo::ThreadInfo
ThreadInfo()
Definition: callback_queue_manager.h:123
nodelet::detail::CallbackQueueManager::V_Queue
std::vector< CallbackQueuePtr > V_Queue
Definition: callback_queue_manager.h:115
nodelet::detail::CallbackQueuePtr
boost::shared_ptr< CallbackQueue > CallbackQueuePtr
Definition: callback_queue_manager.h:51
nodelet::detail::CallbackQueueManager::ThreadInfo::queue_mutex
boost::mutex queue_mutex
Definition: callback_queue_manager.h:128
types.h
NODELETLIB_DECL
#define NODELETLIB_DECL
Definition: nodeletdecl.h:40
nodelet::detail::CallbackQueueManager::QueueInfo::st_mutex
boost::mutex st_mutex
Definition: callback_queue_manager.h:103
nodelet::detail::CallbackQueueManager::QueueInfo::thread_index
uint32_t thread_index
Definition: callback_queue_manager.h:105


nodelet
Author(s): Tully Foote, Radu Bogdan Rusu, Michael Carroll
autogenerated on Fri Nov 15 2024 03:38:09