spinner.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2009, Willow Garage, Inc.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are met:
6  * * Redistributions of source code must retain the above copyright notice,
7  * this list of conditions and the following disclaimer.
8  * * Redistributions in binary form must reproduce the above copyright
9  * notice, this list of conditions and the following disclaimer in the
10  * documentation and/or other materials provided with the distribution.
11  * * Neither the names of Stanford University or Willow Garage, Inc. nor the names of its
12  * contributors may be used to endorse or promote products derived from
13  * this software without specific prior written permission.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
19  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25  * POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #include "ros/spinner.h"
29 #include "ros/ros.h"
30 #include "ros/callback_queue.h"
31 
32 #include <boost/thread/thread.hpp>
33 #include <boost/thread/mutex.hpp>
34 
35 namespace {
36 
37 const std::string DEFAULT_ERROR_MESSAGE =
38  "Attempt to spin a callback queue from two spinners, one of them being single-threaded."
39  "\nIn the future this will throw an exception!";
40 
54 struct SpinnerMonitor
55 {
56  /* store spinner information per callback queue:
57  Only alike spinners (single-threaded or multi-threaded) are allowed on a callback queue.
58  For single-threaded spinners we store their thread id.
59  We store the number of alike spinners operating on the callback queue.
60  */
61  struct Entry
62  {
63  Entry(const boost::thread::id &tid,
64  const boost::thread::id &initial_tid) : tid(tid), initial_tid(initial_tid), num(0) {}
65 
66  boost::thread::id tid; // proper thread id of single-threaded spinner
67  boost::thread::id initial_tid; // to retain old behaviour, store first spinner's thread id
68  unsigned int num; // number of (alike) spinners serving this queue
69  };
70 
72  bool add(ros::CallbackQueue* queue, bool single_threaded)
73  {
74  boost::mutex::scoped_lock lock(mutex_);
75 
76  boost::thread::id current_tid = boost::this_thread::get_id();
77  boost::thread::id tid; // current thread id for single-threaded spinners, zero for multi-threaded ones
78  if (single_threaded)
79  tid = current_tid;
80 
81  std::map<ros::CallbackQueue*, Entry>::iterator it = spinning_queues_.find(queue);
82  bool can_spin = ( it == spinning_queues_.end() || // we will spin on any new queue
83  it->second.tid == tid ); // otherwise spinner must be alike (all multi-threaded: 0, or single-threaded on same thread id)
84 
85  if (!can_spin)
86  {
87  // Previous behavior (up to Kinetic) was to accept multiple spinners on a queue
88  // as long as they were started from the same thread. Although this is wrong behavior,
89  // we retain it here for backwards compatibility, i.e. we allow spinning of a
90  // single-threaded spinner after several multi-threaded ones, given that they
91  // were started from the same initial thread
92  if (it->second.initial_tid == tid)
93  {
94  ROS_ERROR_STREAM("SpinnerMonitor: single-threaded spinner after multi-threaded one(s)."
95  << DEFAULT_ERROR_MESSAGE
96  << " Only allowed for backwards compatibility.");
97  it->second.tid = tid; // "upgrade" tid to represent single-threaded spinner
98  }
99  else
100  return false;
101  }
102 
103  if (it == spinning_queues_.end())
104  it = spinning_queues_.insert(it, std::make_pair(queue, Entry(tid, current_tid)));
105 
106  // increment number of active spinners
107  it->second.num += 1;
108 
109  return true;
110  }
111 
113  void remove(ros::CallbackQueue* queue)
114  {
115  boost::mutex::scoped_lock lock(mutex_);
116  std::map<ros::CallbackQueue*, Entry>::iterator it = spinning_queues_.find(queue);
117  ROS_ASSERT_MSG(it != spinning_queues_.end(), "Call to SpinnerMonitor::remove() without matching call to add().");
118 
119  if (it->second.tid != boost::thread::id() && it->second.tid != boost::this_thread::get_id())
120  {
121  // This doesn't harm, but isn't good practice?
122  // It was enforced by the previous implementation.
123  ROS_WARN("SpinnerMonitor::remove() called from different thread than add().");
124  }
125 
126  ROS_ASSERT_MSG(it->second.num > 0, "SpinnerMonitor::remove(): Invalid spinner count (0) encountered.");
127  it->second.num -= 1;
128  if (it->second.num == 0)
129  spinning_queues_.erase(it); // erase queue entry to allow future queues with same pointer
130  }
131 
132  std::map<ros::CallbackQueue*, Entry> spinning_queues_;
133  boost::mutex mutex_;
134 };
135 
136 SpinnerMonitor spinner_monitor;
137 }
138 
139 namespace ros
140 {
141 
142 
144 {
145  if (!queue)
146  {
147  queue = getGlobalCallbackQueue();
148  }
149 
150  if (!spinner_monitor.add(queue, true))
151  {
152  ROS_ERROR_STREAM("SingleThreadedSpinner: " << DEFAULT_ERROR_MESSAGE + " You might want to use a MultiThreadedSpinner instead.");
153  return;
154  }
155 
156  ros::WallDuration timeout(0.1f);
157  ros::NodeHandle n;
158  while (n.ok())
159  {
160  queue->callAvailable(timeout);
161  }
162  spinner_monitor.remove(queue);
163 }
164 
165 MultiThreadedSpinner::MultiThreadedSpinner(uint32_t thread_count)
166 : thread_count_(thread_count)
167 {
168 }
169 
171 {
172  AsyncSpinner s(thread_count_, queue);
173  s.start();
174 
176 }
177 
179 {
180 public:
181  AsyncSpinnerImpl(uint32_t thread_count, CallbackQueue* queue);
182  ~AsyncSpinnerImpl();
183 
184  bool canStart();
185  void start();
186  void stop();
187 
188 private:
189  void threadFunc();
190 
191  boost::mutex mutex_;
192  boost::thread_group threads_;
193 
194  uint32_t thread_count_;
196 
197  volatile bool continue_;
198 
200 };
201 
202 AsyncSpinnerImpl::AsyncSpinnerImpl(uint32_t thread_count, CallbackQueue* queue)
203 : thread_count_(thread_count)
204 , callback_queue_(queue)
205 , continue_(false)
206 {
207  if (thread_count == 0)
208  {
209  thread_count_ = boost::thread::hardware_concurrency();
210 
211  if (thread_count_ == 0)
212  {
213  thread_count_ = 1;
214  }
215  }
216 
217  if (!queue)
218  {
220  }
221 }
222 
224 {
225  stop();
226 }
227 
229 {
230  return true;
231 }
232 
234 {
235  boost::mutex::scoped_lock lock(mutex_);
236 
237  if (continue_)
238  return; // already spinning
239 
240  if (!spinner_monitor.add(callback_queue_, false))
241  {
242  ROS_ERROR_STREAM("AsyncSpinnerImpl: " << DEFAULT_ERROR_MESSAGE);
243  return;
244  }
245 
246  continue_ = true;
247 
248  for (uint32_t i = 0; i < thread_count_; ++i)
249  {
250  threads_.create_thread(boost::bind(&AsyncSpinnerImpl::threadFunc, this));
251  }
252 }
253 
255 {
256  boost::mutex::scoped_lock lock(mutex_);
257  if (!continue_)
258  return;
259 
260  continue_ = false;
261  threads_.join_all();
262 
263  spinner_monitor.remove(callback_queue_);
264 }
265 
267 {
268  disableAllSignalsInThisThread();
269 
271  bool use_call_available = thread_count_ == 1;
272  WallDuration timeout(0.1);
273 
274  while (continue_ && nh_.ok())
275  {
276  if (use_call_available)
277  {
278  queue->callAvailable(timeout);
279  }
280  else
281  {
282  queue->callOne(timeout);
283  }
284  }
285 }
286 
287 AsyncSpinner::AsyncSpinner(uint32_t thread_count)
288 : impl_(new AsyncSpinnerImpl(thread_count, 0))
289 {
290 }
291 
292 AsyncSpinner::AsyncSpinner(uint32_t thread_count, CallbackQueue* queue)
293 : impl_(new AsyncSpinnerImpl(thread_count, queue))
294 {
295 }
296 
298 {
299  return impl_->canStart();
300 }
301 
303 {
304  impl_->start();
305 }
306 
308 {
309  impl_->stop();
310 }
311 
312 }
This is the default implementation of the ros::CallbackQueueInterface.
AsyncSpinnerImpl(uint32_t thread_count, CallbackQueue *queue)
Definition: spinner.cpp:202
ROSCPP_DECL void start()
Actually starts the internals of the node (spins up threads, starts the network polling and xmlrpc lo...
Definition: init.cpp:293
f
volatile bool continue_
Definition: spinner.cpp:197
ros::NodeHandle nh_
Definition: spinner.cpp:199
CallbackQueue * callback_queue_
Definition: spinner.cpp:195
XmlRpcServer s
CallOneResult callOne()
Pop a single callback off the front of the queue and invoke it. If the callback was not ready to be c...
#define ROS_WARN(...)
boost::mutex mutex_
Definition: spinner.cpp:191
AsyncSpinner(uint32_t thread_count)
Simple constructor. Uses the global callback queue.
Definition: spinner.cpp:287
ROSCPP_DECL CallbackQueue * getGlobalCallbackQueue()
Returns a pointer to the global default callback queue.
Definition: init.cpp:568
void start()
Start this spinner spinning asynchronously.
Definition: spinner.cpp:302
void callAvailable()
Invoke all callbacks currently in the queue. If a callback was not ready to be called, pushes it back onto the queue.
uint32_t thread_count_
Definition: spinner.cpp:194
#define ROS_ASSERT_MSG(cond,...)
roscpp&#39;s interface for creating subscribers, publishers, etc.
Definition: node_handle.h:87
void threadFunc(boost::barrier *b)
ROSCPP_DECL void spin()
Enter simple event loop.
Definition: init.cpp:544
virtual void spin(CallbackQueue *queue=0)
Spin on a callback queue (defaults to the global one). Blocks until roscpp has been shutdown...
Definition: spinner.cpp:170
bool canStart()
Check if the spinner can be started. The spinner shouldn&#39;t be started if another single-threaded spin...
Definition: spinner.cpp:297
AsyncSpinnerImplPtr impl_
Definition: spinner.h:130
boost::thread_group threads_
Definition: spinner.cpp:192
bool ok() const
Check whether it&#39;s time to exit.
#define ROS_ERROR_STREAM(args)
void stop()
Stop this spinner from running.
Definition: spinner.cpp:307
ROSCPP_DECL void waitForShutdown()
Wait for this node to be shutdown, whether through Ctrl-C, ros::shutdown(), or similar.
Definition: init.cpp:560
AsyncSpinner is a spinner that does not conform to the abstract Spinner interface. Instead, it spins asynchronously when you call start(), and stops when either you call stop(), ros::shutdown() is called, or its destructor is called.
Definition: spinner.h:93


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas
autogenerated on Mon Nov 2 2020 03:52:26