spinner.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2009, Willow Garage, Inc.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are met:
6  * * Redistributions of source code must retain the above copyright notice,
7  * this list of conditions and the following disclaimer.
8  * * Redistributions in binary form must reproduce the above copyright
9  * notice, this list of conditions and the following disclaimer in the
10  * documentation and/or other materials provided with the distribution.
11  * * Neither the names of Stanford University or Willow Garage, Inc. nor the names of its
12  * contributors may be used to endorse or promote products derived from
13  * this software without specific prior written permission.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
19  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25  * POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #include "ros/spinner.h"
29 #include "ros/ros.h"
30 #include "ros/callback_queue.h"
31 
32 #include <boost/thread/thread.hpp>
33 #include <boost/thread/mutex.hpp>
34 
35 namespace {
36 
50 struct SpinnerMonitor
51 {
52  /* store spinner information per callback queue:
53  Only alike spinners (single-threaded or multi-threaded) are allowed on a callback queue.
54  For single-threaded spinners we store their thread id.
55  We store the number of alike spinners operating on the callback queue.
56  */
57  struct Entry
58  {
59  Entry(const boost::thread::id &tid) : tid(tid), num(0) {}
60 
61  boost::thread::id tid; // proper thread id of single-threaded spinner
62  unsigned int num; // number of (alike) spinners serving this queue
63  };
64 
66  bool add(ros::CallbackQueue* queue, bool single_threaded)
67  {
68  boost::mutex::scoped_lock lock(mutex_);
69 
70  boost::thread::id tid; // current thread id for single-threaded spinners, zero for multi-threaded ones
71  if (single_threaded)
72  tid = boost::this_thread::get_id();
73 
74  std::map<ros::CallbackQueue*, Entry>::iterator it = spinning_queues_.find(queue);
75  bool can_spin = ( it == spinning_queues_.end() || // we will spin on any new queue
76  it->second.tid == tid ); // otherwise spinner must be alike (all multi-threaded: 0, or single-threaded on same thread id)
77 
78  if (!can_spin)
79  return false;
80 
81  if (it == spinning_queues_.end())
82  it = spinning_queues_.insert(it, std::make_pair(queue, Entry(tid)));
83 
84  // increment number of active spinners
85  it->second.num += 1;
86 
87  return true;
88  }
89 
91  void remove(ros::CallbackQueue* queue)
92  {
93  boost::mutex::scoped_lock lock(mutex_);
94  std::map<ros::CallbackQueue*, Entry>::iterator it = spinning_queues_.find(queue);
95  ROS_ASSERT_MSG(it != spinning_queues_.end(), "Call to SpinnerMonitor::remove() without matching call to add().");
96 
97  if (it->second.tid != boost::thread::id() && it->second.tid != boost::this_thread::get_id())
98  {
99  // This doesn't harm, but isn't good practice?
100  // It was enforced by the previous implementation.
101  ROS_WARN("SpinnerMonitor::remove() called from different thread than add().");
102  }
103 
104  ROS_ASSERT_MSG(it->second.num > 0, "SpinnerMonitor::remove(): Invalid spinner count (0) encountered.");
105  it->second.num -= 1;
106  if (it->second.num == 0)
107  spinning_queues_.erase(it); // erase queue entry to allow future queues with same pointer
108  }
109 
110  std::map<ros::CallbackQueue*, Entry> spinning_queues_;
111  boost::mutex mutex_;
112 };
113 
114 SpinnerMonitor spinner_monitor;
115 const std::string DEFAULT_ERROR_MESSAGE =
116  "Attempt to spin a callback queue from two spinners, one of them being single-threaded.";
117 }
118 
119 namespace ros
120 {
121 
122 
124 {
125  if (!queue)
126  {
127  queue = getGlobalCallbackQueue();
128  }
129 
130  if (!spinner_monitor.add(queue, true))
131  {
132  std::string errorMessage = "SingleThreadedSpinner: " + DEFAULT_ERROR_MESSAGE + " You might want to use a MultiThreadedSpinner instead.";
133  ROS_FATAL_STREAM(errorMessage);
134  throw std::runtime_error(errorMessage);
135  }
136 
137  ros::WallDuration timeout(0.1f);
138  ros::NodeHandle n;
139  while (n.ok())
140  {
141  queue->callAvailable(timeout);
142  }
143  spinner_monitor.remove(queue);
144 }
145 
146 MultiThreadedSpinner::MultiThreadedSpinner(uint32_t thread_count)
147 : thread_count_(thread_count)
148 {
149 }
150 
152 {
153  AsyncSpinner s(thread_count_, queue);
154  s.start();
155 
157 }
158 
160 {
161 public:
162  AsyncSpinnerImpl(uint32_t thread_count, CallbackQueue* queue);
163  ~AsyncSpinnerImpl();
164 
165  bool canStart();
166  void start();
167  void stop();
168 
169 private:
170  void threadFunc();
171 
172  boost::mutex mutex_;
173  boost::thread_group threads_;
174 
175  uint32_t thread_count_;
177 
178  volatile bool continue_;
179 
181 };
182 
183 AsyncSpinnerImpl::AsyncSpinnerImpl(uint32_t thread_count, CallbackQueue* queue)
184 : thread_count_(thread_count)
185 , callback_queue_(queue)
186 , continue_(false)
187 {
188  if (thread_count == 0)
189  {
190  thread_count_ = boost::thread::hardware_concurrency();
191 
192  if (thread_count_ == 0)
193  {
194  thread_count_ = 1;
195  }
196  }
197 
198  if (!queue)
199  {
201  }
202 }
203 
205 {
206  stop();
207 }
208 
210 {
211  return true;
212 }
213 
215 {
216  boost::mutex::scoped_lock lock(mutex_);
217 
218  if (continue_)
219  return; // already spinning
220 
221  if (!spinner_monitor.add(callback_queue_, false))
222  {
223  std::string errorMessage = "AsyncSpinnerImpl: " + DEFAULT_ERROR_MESSAGE;
224  ROS_FATAL_STREAM(errorMessage);
225  throw std::runtime_error(errorMessage);
226  }
227 
228  continue_ = true;
229 
230  for (uint32_t i = 0; i < thread_count_; ++i)
231  {
232  threads_.create_thread(boost::bind(&AsyncSpinnerImpl::threadFunc, this));
233  }
234 }
235 
237 {
238  boost::mutex::scoped_lock lock(mutex_);
239  if (!continue_)
240  return;
241 
242  continue_ = false;
243  threads_.join_all();
244 
245  spinner_monitor.remove(callback_queue_);
246 }
247 
249 {
250  disableAllSignalsInThisThread();
251 
253  bool use_call_available = thread_count_ == 1;
254  WallDuration timeout(0.1);
255 
256  while (continue_ && nh_.ok())
257  {
258  if (use_call_available)
259  {
260  queue->callAvailable(timeout);
261  }
262  else
263  {
264  queue->callOne(timeout);
265  }
266  }
267 }
268 
269 AsyncSpinner::AsyncSpinner(uint32_t thread_count)
270 : impl_(new AsyncSpinnerImpl(thread_count, 0))
271 {
272 }
273 
274 AsyncSpinner::AsyncSpinner(uint32_t thread_count, CallbackQueue* queue)
275 : impl_(new AsyncSpinnerImpl(thread_count, queue))
276 {
277 }
278 
280 {
281  return impl_->canStart();
282 }
283 
285 {
286  impl_->start();
287 }
288 
290 {
291  impl_->stop();
292 }
293 
294 }
This is the default implementation of the ros::CallbackQueueInterface.
AsyncSpinnerImpl(uint32_t thread_count, CallbackQueue *queue)
Definition: spinner.cpp:183
ROSCPP_DECL void start()
Actually starts the internals of the node (spins up threads, starts the network polling and xmlrpc lo...
Definition: init.cpp:293
f
volatile bool continue_
Definition: spinner.cpp:178
ros::NodeHandle nh_
Definition: spinner.cpp:180
CallbackQueue * callback_queue_
Definition: spinner.cpp:176
XmlRpcServer s
CallOneResult callOne()
Pop a single callback off the front of the queue and invoke it. If the callback was not ready to be c...
#define ROS_WARN(...)
boost::mutex mutex_
Definition: spinner.cpp:172
AsyncSpinner(uint32_t thread_count)
Simple constructor. Uses the global callback queue.
Definition: spinner.cpp:269
ROSCPP_DECL CallbackQueue * getGlobalCallbackQueue()
Returns a pointer to the global default callback queue.
Definition: init.cpp:568
void start()
Start this spinner spinning asynchronously.
Definition: spinner.cpp:284
void callAvailable()
Invoke all callbacks currently in the queue. If a callback was not ready to be called, pushes it back onto the queue.
uint32_t thread_count_
Definition: spinner.cpp:175
#define ROS_FATAL_STREAM(args)
#define ROS_ASSERT_MSG(cond,...)
roscpp&#39;s interface for creating subscribers, publishers, etc.
Definition: node_handle.h:87
void threadFunc(boost::barrier *b)
ROSCPP_DECL void spin()
Enter simple event loop.
Definition: init.cpp:544
virtual void spin(CallbackQueue *queue=0)
Spin on a callback queue (defaults to the global one). Blocks until roscpp has been shutdown...
Definition: spinner.cpp:151
bool canStart()
Check if the spinner can be started. The spinner shouldn&#39;t be started if another single-threaded spin...
Definition: spinner.cpp:279
AsyncSpinnerImplPtr impl_
Definition: spinner.h:130
boost::thread_group threads_
Definition: spinner.cpp:173
bool ok() const
Check whether it&#39;s time to exit.
void stop()
Stop this spinner from running.
Definition: spinner.cpp:289
ROSCPP_DECL void waitForShutdown()
Wait for this node to be shutdown, whether through Ctrl-C, ros::shutdown(), or similar.
Definition: init.cpp:560
AsyncSpinner is a spinner that does not conform to the abstract Spinner interface. Instead, it spins asynchronously when you call start(), and stops when either you call stop(), ros::shutdown() is called, or its destructor is called.
Definition: spinner.h:93


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Sun Feb 3 2019 03:29:54