spinner.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2009, Willow Garage, Inc.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are met:
6  * * Redistributions of source code must retain the above copyright notice,
7  * this list of conditions and the following disclaimer.
8  * * Redistributions in binary form must reproduce the above copyright
9  * notice, this list of conditions and the following disclaimer in the
10  * documentation and/or other materials provided with the distribution.
11  * * Neither the names of Stanford University or Willow Garage, Inc. nor the names of its
12  * contributors may be used to endorse or promote products derived from
13  * this software without specific prior written permission.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
19  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25  * POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #include "ros/spinner.h"
29 #include "ros/ros.h"
30 #include "ros/callback_queue.h"
31 
32 #include <boost/thread/thread.hpp>
33 #include <boost/thread/mutex.hpp>
34 
35 namespace {
36 
50 struct SpinnerMonitor
51 {
52  /* store spinner information per callback queue:
53  Only alike spinners (single-threaded or multi-threaded) are allowed on a callback queue.
54  For single-threaded spinners we store their thread id.
55  We store the number of alike spinners operating on the callback queue.
56  */
57  struct Entry
58  {
59  Entry(const boost::thread::id &tid) : tid(tid), num(0) {}
60 
61  boost::thread::id tid; // proper thread id of single-threaded spinner
62  unsigned int num; // number of (alike) spinners serving this queue
63  };
64 
66  bool add(ros::CallbackQueue* queue, bool single_threaded)
67  {
68  boost::mutex::scoped_lock lock(mutex_);
69 
70  boost::thread::id tid; // current thread id for single-threaded spinners, zero for multi-threaded ones
71  if (single_threaded)
72  tid = boost::this_thread::get_id();
73 
74  std::map<ros::CallbackQueue*, Entry>::iterator it = spinning_queues_.find(queue);
75  bool can_spin = ( it == spinning_queues_.end() || // we will spin on any new queue
76  it->second.tid == tid ); // otherwise spinner must be alike (all multi-threaded: 0, or single-threaded on same thread id)
77 
78  if (!can_spin)
79  return false;
80 
81  if (it == spinning_queues_.end())
82  it = spinning_queues_.insert(it, std::make_pair(queue, Entry(tid)));
83 
84  // increment number of active spinners
85  it->second.num += 1;
86 
87  return true;
88  }
89 
91  void remove(ros::CallbackQueue* queue)
92  {
93  boost::mutex::scoped_lock lock(mutex_);
94  std::map<ros::CallbackQueue*, Entry>::iterator it = spinning_queues_.find(queue);
95  ROS_ASSERT_MSG(it != spinning_queues_.end(), "Call to SpinnerMonitor::remove() without matching call to add().");
96 
97  if (it->second.tid != boost::thread::id() && it->second.tid != boost::this_thread::get_id())
98  {
99  // This doesn't harm, but isn't good practice?
100  // It was enforced by the previous implementation.
101  ROS_WARN("SpinnerMonitor::remove() called from different thread than add().");
102  }
103 
104  ROS_ASSERT_MSG(it->second.num > 0, "SpinnerMonitor::remove(): Invalid spinner count (0) encountered.");
105  it->second.num -= 1;
106  if (it->second.num == 0)
107  spinning_queues_.erase(it); // erase queue entry to allow future queues with same pointer
108  }
109 
110  std::map<ros::CallbackQueue*, Entry> spinning_queues_;
111  boost::mutex mutex_;
112 };
113 
114 SpinnerMonitor spinner_monitor;
115 const std::string DEFAULT_ERROR_MESSAGE =
116  "Attempt to spin a callback queue from two spinners, one of them being single-threaded.";
117 }
118 
119 namespace ros
120 {
121 
122 
124 {
125  if (!queue)
126  {
127  queue = getGlobalCallbackQueue();
128  }
129 
130  if (!spinner_monitor.add(queue, true))
131  {
132  std::string errorMessage = "SingleThreadedSpinner: " + DEFAULT_ERROR_MESSAGE + " You might want to use a MultiThreadedSpinner instead.";
133  ROS_FATAL_STREAM(errorMessage);
134  throw std::runtime_error(errorMessage);
135  }
136 
137  ros::WallDuration timeout(0.1f);
138  ros::NodeHandle n;
139  while (n.ok())
140  {
141  queue->callAvailable(timeout);
142  }
143  spinner_monitor.remove(queue);
144 }
145 
147 : thread_count_(thread_count)
148 {
149 }
150 
152 {
153  AsyncSpinner s(thread_count_, queue);
154  s.start();
155 
157 }
158 
160 {
161 public:
162  AsyncSpinnerImpl(uint32_t thread_count, CallbackQueue* queue);
164 
165  bool canStart();
166  void start();
167  void stop();
168 
169 private:
170  void threadFunc();
171 
172  boost::mutex mutex_;
173  boost::thread_group threads_;
174 
175  uint32_t thread_count_;
177 
178  volatile bool continue_;
179 
181 };
182 
183 AsyncSpinnerImpl::AsyncSpinnerImpl(uint32_t thread_count, CallbackQueue* queue)
184 : thread_count_(thread_count)
185 , callback_queue_(queue)
186 , continue_(false)
187 {
188  if (thread_count == 0)
189  {
190  thread_count_ = boost::thread::hardware_concurrency();
191 
192  if (thread_count_ == 0)
193  {
194  thread_count_ = 1;
195  }
196  }
197 
198  if (!queue)
199  {
201  }
202 }
203 
205 {
206  stop();
207 }
208 
210 {
211  return true;
212 }
213 
215 {
216  boost::mutex::scoped_lock lock(mutex_);
217 
218  if (continue_)
219  return; // already spinning
220 
221  if (!spinner_monitor.add(callback_queue_, false))
222  {
223  std::string errorMessage = "AsyncSpinnerImpl: " + DEFAULT_ERROR_MESSAGE;
224  ROS_FATAL_STREAM(errorMessage);
225  throw std::runtime_error(errorMessage);
226  }
227 
228  continue_ = true;
229 
230  for (uint32_t i = 0; i < thread_count_; ++i)
231  {
232  threads_.create_thread(boost::bind(&AsyncSpinnerImpl::threadFunc, this));
233  }
234 }
235 
237 {
238  boost::mutex::scoped_lock lock(mutex_);
239  if (!continue_)
240  return;
241 
242  continue_ = false;
243  threads_.join_all();
244 
245  spinner_monitor.remove(callback_queue_);
246 }
247 
249 {
250  disableAllSignalsInThisThread();
251 
253  bool use_call_available = thread_count_ == 1;
254  WallDuration timeout(0.1);
255 
256  while (continue_ && nh_.ok())
257  {
258  if (use_call_available)
259  {
260  queue->callAvailable(timeout);
261  }
262  else
263  {
264  queue->callOne(timeout);
265  }
266  }
267 }
268 
269 AsyncSpinner::AsyncSpinner(uint32_t thread_count)
270 : impl_(new AsyncSpinnerImpl(thread_count, 0))
271 {
272 }
273 
274 AsyncSpinner::AsyncSpinner(uint32_t thread_count, CallbackQueue* queue)
275 : impl_(new AsyncSpinnerImpl(thread_count, queue))
276 {
277 }
278 
280 {
281  return impl_->canStart();
282 }
283 
285 {
286  impl_->start();
287 }
288 
290 {
291  impl_->stop();
292 }
293 
294 }
ros::AsyncSpinnerImpl::AsyncSpinnerImpl
AsyncSpinnerImpl(uint32_t thread_count, CallbackQueue *queue)
Definition: spinner.cpp:183
ros::AsyncSpinnerImpl::threads_
boost::thread_group threads_
Definition: spinner.cpp:173
ros::MultiThreadedSpinner::thread_count_
uint32_t thread_count_
Definition: spinner.h:79
ros::AsyncSpinner::start
void start()
Start this spinner spinning asynchronously.
Definition: spinner.cpp:284
s
XmlRpcServer s
ros
ROS_FATAL_STREAM
#define ROS_FATAL_STREAM(args)
ros::AsyncSpinner
AsyncSpinner is a spinner that does not conform to the abstract Spinner interface....
Definition: spinner.h:93
ros::AsyncSpinnerImpl::callback_queue_
CallbackQueue * callback_queue_
Definition: spinner.cpp:176
spinner.h
ros::AsyncSpinnerImpl::continue_
volatile bool continue_
Definition: spinner.cpp:178
ros::AsyncSpinnerImpl::nh_
ros::NodeHandle nh_
Definition: spinner.cpp:180
ros::MultiThreadedSpinner::spin
virtual void spin(CallbackQueue *queue=0)
Spin on a callback queue (defaults to the global one). Blocks until roscpp has been shutdown.
Definition: spinner.cpp:151
f
f
ros::AsyncSpinnerImpl::mutex_
boost::mutex mutex_
Definition: spinner.cpp:172
ros::AsyncSpinner::canStart
bool canStart()
Check if the spinner can be started. The spinner shouldn't be started if another single-threaded spin...
Definition: spinner.cpp:279
ros::AsyncSpinner::AsyncSpinner
AsyncSpinner(uint32_t thread_count)
Simple constructor. Uses the global callback queue.
Definition: spinner.cpp:269
ros::AsyncSpinnerImpl::thread_count_
uint32_t thread_count_
Definition: spinner.cpp:175
ros.h
ros::AsyncSpinnerImpl::threadFunc
void threadFunc()
Definition: spinner.cpp:248
ROS_ASSERT_MSG
#define ROS_ASSERT_MSG(cond,...)
ROS_WARN
#define ROS_WARN(...)
ros::AsyncSpinnerImpl::canStart
bool canStart()
Definition: spinner.cpp:209
ros::NodeHandle
roscpp's interface for creating subscribers, publishers, etc.
Definition: node_handle.h:87
ros::AsyncSpinnerImpl::stop
void stop()
Definition: spinner.cpp:236
ros::AsyncSpinner::stop
void stop()
Stop this spinner from running.
Definition: spinner.cpp:289
ros::NodeHandle::ok
bool ok() const
Check whether it's time to exit.
Definition: node_handle.cpp:803
ros::CallbackQueue
This is the default implementation of the ros::CallbackQueueInterface.
Definition: callback_queue.h:57
ros::waitForShutdown
ROSCPP_DECL void waitForShutdown()
Wait for this node to be shutdown, whether through Ctrl-C, ros::shutdown(), or similar.
Definition: init.cpp:587
ros::AsyncSpinnerImpl
Definition: spinner.cpp:159
ros::MultiThreadedSpinner::MultiThreadedSpinner
MultiThreadedSpinner(uint32_t thread_count=0)
Definition: spinner.cpp:146
ros::CallbackQueue::callOne
CallOneResult callOne()
Pop a single callback off the front of the queue and invoke it. If the callback was not ready to be c...
Definition: callback_queue.h:78
callback_queue.h
ros::WallDuration
ros::AsyncSpinnerImpl::~AsyncSpinnerImpl
~AsyncSpinnerImpl()
Definition: spinner.cpp:204
ros::SingleThreadedSpinner::spin
virtual void spin(CallbackQueue *queue=0)
Spin on a callback queue (defaults to the global one). Blocks until roscpp has been shutdown.
Definition: spinner.cpp:123
ros::AsyncSpinner::impl_
AsyncSpinnerImplPtr impl_
Definition: spinner.h:130
ros::AsyncSpinnerImpl::start
void start()
Definition: spinner.cpp:214
ros::CallbackQueue::callAvailable
void callAvailable()
Invoke all callbacks currently in the queue. If a callback was not ready to be called,...
Definition: callback_queue.h:96
ros::getGlobalCallbackQueue
ROSCPP_DECL CallbackQueue * getGlobalCallbackQueue()
Returns a pointer to the global default callback queue.
Definition: init.cpp:595


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim, Dirk Thomas , Jacob Perron
autogenerated on Thu Nov 23 2023 04:01:44