callback_queue_manager.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2010, Willow Garage, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * * Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  * * Redistributions in binary form must reproduce the above copyright
11  * notice, this list of conditions and the following disclaimer in the
12  * documentation and/or other materials provided with the distribution.
13  * * Neither the name of the Willow Garage, Inc. nor the names of its
14  * contributors may be used to endorse or promote products derived from
15  * this software without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 #ifndef NODELET_CALLBACK_QUEUE_MANAGER_H
31 #define NODELET_CALLBACK_QUEUE_MANAGER_H
32 
33 #include "nodelet/nodeletdecl.h"
34 
35 #include <ros/types.h>
36 
37 #include <boost/shared_ptr.hpp>
38 #include <boost/scoped_array.hpp>
39 #include <boost/unordered_map.hpp>
40 #include <boost/thread/mutex.hpp>
41 #include <boost/thread/thread.hpp>
42 #include <boost/detail/atomic_count.hpp>
43 
44 #include <vector>
45 #include <deque>
46 
47 namespace nodelet
48 {
49 namespace detail
50 {
53 
66 {
67 public:
73  CallbackQueueManager(uint32_t num_worker_threads = 0);
75 
76  void addQueue(const CallbackQueuePtr& queue, bool threaded);
77  void removeQueue(const CallbackQueuePtr& queue);
78  void callbackAdded(const CallbackQueuePtr& queue);
79 
80  uint32_t getNumWorkerThreads();
81 
82  void stop();
83 
84 private:
85  void managerThread();
86  struct ThreadInfo;
87  void workerThread(ThreadInfo*);
88 
89  ThreadInfo* getSmallestQueue();
90 
91  struct QueueInfo
92  {
94  : threaded(false)
95  , thread_index(0xffffffff)
96  , in_thread(0)
97  {}
98 
99  CallbackQueuePtr queue;
100  bool threaded;
101 
102  // Only used if threaded == false
103  boost::mutex st_mutex;
105  uint32_t thread_index;
106  uint32_t in_thread;
107  };
109 
110  typedef boost::unordered_map<CallbackQueue*, QueueInfoPtr> M_Queue;
111  M_Queue queues_;
112  boost::mutex queues_mutex_;
113 
115  typedef std::vector<CallbackQueuePtr> V_Queue;
116  V_Queue waiting_;
117  boost::mutex waiting_mutex_;
119  boost::thread_group tg_;
120 
121  struct ThreadInfo
122  {
124  : calling(0)
125  {}
126 
128  boost::mutex queue_mutex;
130  std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> > queue;
131  boost::detail::atomic_count calling;
132 
133 #ifdef NODELET_QUEUE_DEBUG
134  struct Record
135  {
136  Record(double stamp, uint32_t tasks, bool threaded)
137  : stamp(stamp), tasks(tasks), threaded(threaded)
138  {}
139 
140  double stamp;
141  uint32_t tasks;
142  bool threaded;
143  };
144 
145  std::vector<Record> history;
146 #endif
147 
148  // Pad sizeof(ThreadInfo) to be a multiple of 64 (cache line size) to avoid false sharing.
149  // This still doesn't guarantee ThreadInfo is actually allocated on a cache line boundary though.
150  static const int ACTUAL_SIZE =
151  sizeof(boost::mutex) +
152  sizeof(boost::condition_variable) +
153  sizeof(std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> >) +
154  sizeof(boost::detail::atomic_count);
155  uint8_t pad[((ACTUAL_SIZE + 63) & ~63) - ACTUAL_SIZE];
156  };
158  typedef boost::scoped_array<ThreadInfo> V_ThreadInfo;
159  V_ThreadInfo thread_info_;
160 
161  bool running_;
163 };
164 
165 } // namespace detail
166 } // namespace nodelet
167 
168 #endif // NODELET_CALLBACK_QUEUE_MANAGER_H
boost::shared_ptr< QueueInfo > QueueInfoPtr
boost::shared_ptr< CallbackQueue > CallbackQueuePtr
boost::unordered_map< CallbackQueue *, QueueInfoPtr > M_Queue
boost::scoped_array< ThreadInfo > V_ThreadInfo
#define NODELETLIB_DECL
Definition: nodeletdecl.h:40
std::vector< CallbackQueuePtr > V_Queue
std::vector< std::pair< CallbackQueuePtr, QueueInfoPtr > > queue


nodelet
Author(s): Tully Foote, Radu Bogdan Rusu
autogenerated on Sun Feb 17 2019 03:38:46