spinner.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2009, Willow Garage, Inc.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are met:
6  * * Redistributions of source code must retain the above copyright notice,
7  * this list of conditions and the following disclaimer.
8  * * Redistributions in binary form must reproduce the above copyright
9  * notice, this list of conditions and the following disclaimer in the
10  * documentation and/or other materials provided with the distribution.
11  * * Neither the names of Stanford University or Willow Garage, Inc. nor the names of its
12  * contributors may be used to endorse or promote products derived from
13  * this software without specific prior written permission.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
19  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25  * POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #include "ros/spinner.h"
29 #include "ros/ros.h"
30 #include "ros/callback_queue.h"
31 
32 #include <boost/thread/thread.hpp>
33 #include <boost/thread/mutex.hpp>
34 
35 namespace {
36 
37 const std::string DEFAULT_ERROR_MESSAGE =
38  "\nAttempt to spin a callback queue from two spinners, one of them being single-threaded."
39  "\nThis will probably result in callbacks being executed out-of-order."
40  "\nIn future this will throw an exception!";
41 
55 struct SpinnerMonitor
56 {
57  /* store spinner information per callback queue:
58  Only alike spinners (single-threaded or multi-threaded) are allowed on a callback queue.
59  For single-threaded spinners we store their thread id.
60  We store the number of alike spinners operating on the callback queue.
61  */
62  struct Entry
63  {
64  Entry(const boost::thread::id &tid,
65  const boost::thread::id &initial_tid) : tid(tid), initial_tid(initial_tid), num(0) {}
66 
67  boost::thread::id tid; // proper thread id of single-threaded spinner
68  boost::thread::id initial_tid; // to retain old behaviour, store first spinner's thread id
69  unsigned int num; // number of (alike) spinners serving this queue
70  };
71 
73  bool add(ros::CallbackQueue* queue, bool single_threaded)
74  {
75  boost::mutex::scoped_lock lock(mutex_);
76 
77  boost::thread::id current_tid = boost::this_thread::get_id();
78  boost::thread::id tid; // current thread id for single-threaded spinners, zero for multi-threaded ones
79  if (single_threaded)
80  tid = current_tid;
81 
82  std::map<ros::CallbackQueue*, Entry>::iterator it = spinning_queues_.find(queue);
83  bool can_spin = ( it == spinning_queues_.end() || // we will spin on any new queue
84  it->second.tid == tid ); // otherwise spinner must be alike (all multi-threaded: 0, or single-threaded on same thread id)
85 
86  if (!can_spin)
87  {
88  // Previous behavior (up to Kinetic) was to accept multiple spinners on a queue
89  // as long as they were started from the same thread. Although this is wrong behavior,
90  // we retain it here for backwards compatibility, i.e. we allow spinning of a
91  // single-threaded spinner after several multi-threaded ones, given that they
92  // were started from the same initial thread
93  if (it->second.initial_tid == tid)
94  {
95  ROS_ERROR_STREAM("SpinnerMonitor: single-threaded spinner after multi-threaded one(s)."
96  << DEFAULT_ERROR_MESSAGE
97  << " Only allowed for backwards compatibility.");
98  it->second.tid = tid; // "upgrade" tid to represent single-threaded spinner
99  }
100  else
101  return false;
102  }
103 
104  if (it == spinning_queues_.end())
105  it = spinning_queues_.insert(it, std::make_pair(queue, Entry(tid, current_tid)));
106 
107  // increment number of active spinners
108  it->second.num += 1;
109 
110  return true;
111  }
112 
114  void remove(ros::CallbackQueue* queue)
115  {
116  boost::mutex::scoped_lock lock(mutex_);
117  std::map<ros::CallbackQueue*, Entry>::iterator it = spinning_queues_.find(queue);
118  ROS_ASSERT_MSG(it != spinning_queues_.end(), "Call to SpinnerMonitor::remove() without matching call to add().");
119 
120  if (it->second.tid != boost::thread::id() && it->second.tid != boost::this_thread::get_id())
121  {
122  // This doesn't harm, but isn't good practice?
123  // It was enforced by the previous implementation.
124  ROS_WARN("SpinnerMonitor::remove() called from different thread than add().");
125  }
126 
127  ROS_ASSERT_MSG(it->second.num > 0, "SpinnerMonitor::remove(): Invalid spinner count (0) encountered.");
128  it->second.num -= 1;
129  if (it->second.num == 0)
130  spinning_queues_.erase(it); // erase queue entry to allow future queues with same pointer
131  }
132 
133  std::map<ros::CallbackQueue*, Entry> spinning_queues_;
134  boost::mutex mutex_;
135 };
136 
137 SpinnerMonitor spinner_monitor;
138 }
139 
140 namespace ros
141 {
142 
143 
145 {
146  if (!queue)
147  {
148  queue = getGlobalCallbackQueue();
149  }
150 
151  if (!spinner_monitor.add(queue, true))
152  {
153  ROS_ERROR_STREAM("SingleThreadedSpinner: " << DEFAULT_ERROR_MESSAGE);
154  return;
155  }
156 
157  ros::WallDuration timeout(0.1f);
158  ros::NodeHandle n;
159  while (n.ok())
160  {
161  queue->callAvailable(timeout);
162  }
163  spinner_monitor.remove(queue);
164 }
165 
166 MultiThreadedSpinner::MultiThreadedSpinner(uint32_t thread_count)
167 : thread_count_(thread_count)
168 {
169 }
170 
172 {
173  AsyncSpinner s(thread_count_, queue);
174  s.start();
175 
177 }
178 
180 {
181 public:
182  AsyncSpinnerImpl(uint32_t thread_count, CallbackQueue* queue);
183  ~AsyncSpinnerImpl();
184 
185  bool canStart();
186  void start();
187  void stop();
188 
189 private:
190  void threadFunc();
191 
192  boost::mutex mutex_;
193  boost::thread_group threads_;
194 
195  uint32_t thread_count_;
197 
198  volatile bool continue_;
199 
201 };
202 
203 AsyncSpinnerImpl::AsyncSpinnerImpl(uint32_t thread_count, CallbackQueue* queue)
204 : thread_count_(thread_count)
205 , callback_queue_(queue)
206 , continue_(false)
207 {
208  if (thread_count == 0)
209  {
210  thread_count_ = boost::thread::hardware_concurrency();
211 
212  if (thread_count_ == 0)
213  {
214  thread_count_ = 1;
215  }
216  }
217 
218  if (!queue)
219  {
221  }
222 }
223 
225 {
226  stop();
227 }
228 
230 {
231  return true;
232 }
233 
235 {
236  boost::mutex::scoped_lock lock(mutex_);
237 
238  if (continue_)
239  return; // already spinning
240 
241  if (!spinner_monitor.add(callback_queue_, false))
242  {
243  ROS_ERROR_STREAM("AsyncSpinnerImpl: " << DEFAULT_ERROR_MESSAGE);
244  return;
245  }
246 
247  continue_ = true;
248 
249  for (uint32_t i = 0; i < thread_count_; ++i)
250  {
251  threads_.create_thread(boost::bind(&AsyncSpinnerImpl::threadFunc, this));
252  }
253 }
254 
256 {
257  boost::mutex::scoped_lock lock(mutex_);
258  if (!continue_)
259  return;
260 
261  continue_ = false;
262  threads_.join_all();
263 
264  spinner_monitor.remove(callback_queue_);
265 }
266 
268 {
269  disableAllSignalsInThisThread();
270 
272  bool use_call_available = thread_count_ == 1;
273  WallDuration timeout(0.1);
274 
275  while (continue_ && nh_.ok())
276  {
277  if (use_call_available)
278  {
279  queue->callAvailable(timeout);
280  }
281  else
282  {
283  queue->callOne(timeout);
284  }
285  }
286 }
287 
288 AsyncSpinner::AsyncSpinner(uint32_t thread_count)
289 : impl_(new AsyncSpinnerImpl(thread_count, 0))
290 {
291 }
292 
293 AsyncSpinner::AsyncSpinner(uint32_t thread_count, CallbackQueue* queue)
294 : impl_(new AsyncSpinnerImpl(thread_count, queue))
295 {
296 }
297 
299 {
300  return impl_->canStart();
301 }
302 
304 {
305  impl_->start();
306 }
307 
309 {
310  impl_->stop();
311 }
312 
313 }
This is the default implementation of the ros::CallbackQueueInterface.
AsyncSpinnerImpl(uint32_t thread_count, CallbackQueue *queue)
Definition: spinner.cpp:203
ROSCPP_DECL void start()
Actually starts the internals of the node (spins up threads, starts the network polling and xmlrpc lo...
Definition: init.cpp:293
f
volatile bool continue_
Definition: spinner.cpp:198
ros::NodeHandle nh_
Definition: spinner.cpp:200
CallbackQueue * callback_queue_
Definition: spinner.cpp:196
XmlRpcServer s
CallOneResult callOne()
Pop a single callback off the front of the queue and invoke it. If the callback was not ready to be c...
#define ROS_WARN(...)
boost::mutex mutex_
Definition: spinner.cpp:192
AsyncSpinner(uint32_t thread_count)
Simple constructor. Uses the global callback queue.
Definition: spinner.cpp:288
ROSCPP_DECL CallbackQueue * getGlobalCallbackQueue()
Returns a pointer to the global default callback queue.
Definition: init.cpp:565
void start()
Start this spinner spinning asynchronously.
Definition: spinner.cpp:303
void callAvailable()
Invoke all callbacks currently in the queue. If a callback was not ready to be called, pushes it back onto the queue.
uint32_t thread_count_
Definition: spinner.cpp:195
#define ROS_ASSERT_MSG(cond,...)
roscpp&#39;s interface for creating subscribers, publishers, etc.
Definition: node_handle.h:87
void threadFunc(boost::barrier *b)
ROSCPP_DECL void spin()
Enter simple event loop.
Definition: init.cpp:541
virtual void spin(CallbackQueue *queue=0)
Spin on a callback queue (defaults to the global one). Blocks until roscpp has been shutdown...
Definition: spinner.cpp:171
bool canStart()
Check if the spinner can be started. The spinner shouldn&#39;t be started if another single-threaded spin...
Definition: spinner.cpp:298
AsyncSpinnerImplPtr impl_
Definition: spinner.h:130
boost::thread_group threads_
Definition: spinner.cpp:193
bool ok() const
Check whether it&#39;s time to exit.
#define ROS_ERROR_STREAM(args)
void stop()
Stop this spinner from running.
Definition: spinner.cpp:308
ROSCPP_DECL void waitForShutdown()
Wait for this node to be shutdown, whether through Ctrl-C, ros::shutdown(), or similar.
Definition: init.cpp:557
AsyncSpinner is a spinner that does not conform to the abstract Spinner interface. Instead, it spins asynchronously when you call start(), and stops when either you call stop(), ros::shutdown() is called, or its destructor is called.
Definition: spinner.h:93


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Wed Dec 20 2017 03:58:41