00001 /* 00002 * Software License Agreement (BSD License) 00003 * 00004 * Copyright (c) 2009, Willow Garage, Inc. 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions 00009 * are met: 00010 * 00011 * * Redistributions of source code must retain the above copyright 00012 * notice, this list of conditions and the following disclaimer. 00013 * * Redistributions in binary form must reproduce the above 00014 * copyright notice, this list of conditions and the following 00015 * disclaimer in the documentation and/or other materials provided 00016 * with the distribution. 00017 * * Neither the name of Willow Garage, Inc. nor the names of its 00018 * contributors may be used to endorse or promote products derived 00019 * from this software without specific prior written permission. 00020 * 00021 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 * POSSIBILITY OF SUCH DAMAGE. 00033 */ 00034 00035 #include "ros/callback_queue.h" 00036 #include "ros/assert.h" 00037 #include <boost/scope_exit.hpp> 00038 00039 namespace ros 00040 { 00041 00042 CallbackQueue::CallbackQueue(bool enabled) 00043 : calling_(0) 00044 , enabled_(enabled) 00045 { 00046 } 00047 00048 CallbackQueue::~CallbackQueue() 00049 { 00050 disable(); 00051 } 00052 00053 void CallbackQueue::enable() 00054 { 00055 boost::mutex::scoped_lock lock(mutex_); 00056 enabled_ = true; 00057 00058 condition_.notify_all(); 00059 } 00060 00061 void CallbackQueue::disable() 00062 { 00063 boost::mutex::scoped_lock lock(mutex_); 00064 enabled_ = false; 00065 00066 condition_.notify_all(); 00067 } 00068 00069 void CallbackQueue::clear() 00070 { 00071 boost::mutex::scoped_lock lock(mutex_); 00072 00073 callbacks_.clear(); 00074 } 00075 00076 bool CallbackQueue::isEmpty() 00077 { 00078 boost::mutex::scoped_lock lock(mutex_); 00079 00080 return callbacks_.empty() && calling_ == 0; 00081 } 00082 00083 bool CallbackQueue::isEnabled() 00084 { 00085 boost::mutex::scoped_lock lock(mutex_); 00086 00087 return enabled_; 00088 } 00089 00090 void CallbackQueue::setupTLS() 00091 { 00092 if (!tls_.get()) 00093 { 00094 tls_.reset(new TLS); 00095 } 00096 } 00097 00098 void CallbackQueue::addCallback(const CallbackInterfacePtr& callback, uint64_t removal_id) 00099 { 00100 CallbackInfo info; 00101 info.callback = callback; 00102 info.removal_id = removal_id; 00103 00104 { 00105 boost::mutex::scoped_lock lock(mutex_); 00106 00107 if (!enabled_) 00108 { 00109 return; 00110 } 00111 00112 callbacks_.push_back(info); 00113 } 00114 00115 { 00116 boost::mutex::scoped_lock lock(id_info_mutex_); 00117 00118 M_IDInfo::iterator it = id_info_.find(removal_id); 00119 if (it == id_info_.end()) 00120 { 00121 IDInfoPtr id_info(boost::make_shared<IDInfo>()); 00122 id_info->id = removal_id; 00123 id_info_.insert(std::make_pair(removal_id, id_info)); 00124 } 00125 } 00126 00127 condition_.notify_one(); 00128 } 00129 00130 CallbackQueue::IDInfoPtr CallbackQueue::getIDInfo(uint64_t id) 00131 { 00132 boost::mutex::scoped_lock lock(id_info_mutex_); 00133 M_IDInfo::iterator it = id_info_.find(id); 00134 if (it != id_info_.end()) 00135 { 00136 return it->second; 00137 } 00138 00139 return IDInfoPtr(); 00140 } 00141 00142 void CallbackQueue::removeByID(uint64_t removal_id) 00143 { 00144 setupTLS(); 00145 00146 { 00147 IDInfoPtr id_info; 00148 { 00149 boost::mutex::scoped_lock lock(id_info_mutex_); 00150 M_IDInfo::iterator it = id_info_.find(removal_id); 00151 if (it != id_info_.end()) 00152 { 00153 id_info = it->second; 00154 } 00155 else 00156 { 00157 return; 00158 } 00159 } 00160 00161 // If we're being called from within a callback from our queue, we must unlock the shared lock we already own 00162 // here so that we can take a unique lock. We'll re-lock it later. 00163 if (tls_->calling_in_this_thread == id_info->id) 00164 { 00165 id_info->calling_rw_mutex.unlock_shared(); 00166 } 00167 00168 { 00169 boost::unique_lock<boost::shared_mutex> rw_lock(id_info->calling_rw_mutex); 00170 boost::mutex::scoped_lock lock(mutex_); 00171 D_CallbackInfo::iterator it = callbacks_.begin(); 00172 for (; it != callbacks_.end();) 00173 { 00174 CallbackInfo& info = *it; 00175 if (info.removal_id == removal_id) 00176 { 00177 it = callbacks_.erase(it); 00178 } 00179 else 00180 { 00181 ++it; 00182 } 00183 } 00184 } 00185 00186 if (tls_->calling_in_this_thread == id_info->id) 00187 { 00188 id_info->calling_rw_mutex.lock_shared(); 00189 } 00190 } 00191 00192 // If we're being called from within a callback, we need to remove the callbacks that match the id that have already been 00193 // popped off the queue 00194 { 00195 D_CallbackInfo::iterator it = tls_->callbacks.begin(); 00196 D_CallbackInfo::iterator end = tls_->callbacks.end(); 00197 for (; it != end; ++it) 00198 { 00199 CallbackInfo& info = *it; 00200 if (info.removal_id == removal_id) 00201 { 00202 info.marked_for_removal = true; 00203 } 00204 } 00205 } 00206 00207 { 00208 boost::mutex::scoped_lock lock(id_info_mutex_); 00209 id_info_.erase(removal_id); 00210 } 00211 } 00212 00213 CallbackQueue::CallOneResult CallbackQueue::callOne(ros::WallDuration timeout) 00214 { 00215 setupTLS(); 00216 TLS* tls = tls_.get(); 00217 00218 CallbackInfo cb_info; 00219 00220 { 00221 boost::mutex::scoped_lock lock(mutex_); 00222 00223 if (!enabled_) 00224 { 00225 return Disabled; 00226 } 00227 00228 if (callbacks_.empty()) 00229 { 00230 if (!timeout.isZero()) 00231 { 00232 condition_.timed_wait(lock, boost::posix_time::microseconds(timeout.toSec() * 1000000.0f)); 00233 } 00234 00235 if (callbacks_.empty()) 00236 { 00237 return Empty; 00238 } 00239 00240 if (!enabled_) 00241 { 00242 return Disabled; 00243 } 00244 } 00245 00246 D_CallbackInfo::iterator it = callbacks_.begin(); 00247 for (; it != callbacks_.end();) 00248 { 00249 CallbackInfo& info = *it; 00250 00251 if (info.marked_for_removal) 00252 { 00253 it = callbacks_.erase(it); 00254 continue; 00255 } 00256 00257 if (info.callback->ready()) 00258 { 00259 cb_info = info; 00260 it = callbacks_.erase(it); 00261 break; 00262 } 00263 00264 ++it; 00265 } 00266 00267 if (!cb_info.callback) 00268 { 00269 return TryAgain; 00270 } 00271 00272 ++calling_; 00273 } 00274 00275 bool was_empty = tls->callbacks.empty(); 00276 tls->callbacks.push_back(cb_info); 00277 if (was_empty) 00278 { 00279 tls->cb_it = tls->callbacks.begin(); 00280 } 00281 00282 CallOneResult res = callOneCB(tls); 00283 if (res != Empty) 00284 { 00285 boost::mutex::scoped_lock lock(mutex_); 00286 --calling_; 00287 } 00288 return res; 00289 } 00290 00291 void CallbackQueue::callAvailable(ros::WallDuration timeout) 00292 { 00293 setupTLS(); 00294 TLS* tls = tls_.get(); 00295 00296 { 00297 boost::mutex::scoped_lock lock(mutex_); 00298 00299 if (!enabled_) 00300 { 00301 return; 00302 } 00303 00304 if (callbacks_.empty()) 00305 { 00306 if (!timeout.isZero()) 00307 { 00308 condition_.timed_wait(lock, boost::posix_time::microseconds(timeout.toSec() * 1000000.0f)); 00309 } 00310 00311 if (callbacks_.empty() || !enabled_) 00312 { 00313 return; 00314 } 00315 } 00316 00317 bool was_empty = tls->callbacks.empty(); 00318 00319 tls->callbacks.insert(tls->callbacks.end(), callbacks_.begin(), callbacks_.end()); 00320 callbacks_.clear(); 00321 00322 calling_ += tls->callbacks.size(); 00323 00324 if (was_empty) 00325 { 00326 tls->cb_it = tls->callbacks.begin(); 00327 } 00328 } 00329 00330 size_t called = 0; 00331 00332 while (!tls->callbacks.empty()) 00333 { 00334 if (callOneCB(tls) != Empty) 00335 { 00336 ++called; 00337 } 00338 } 00339 00340 { 00341 boost::mutex::scoped_lock lock(mutex_); 00342 calling_ -= called; 00343 } 00344 } 00345 00346 CallbackQueue::CallOneResult CallbackQueue::callOneCB(TLS* tls) 00347 { 00348 // Check for a recursive call. If recursive, increment the current iterator. Otherwise 00349 // set the iterator it the beginning of the thread-local callbacks 00350 if (tls->calling_in_this_thread == 0xffffffffffffffffULL) 00351 { 00352 tls->cb_it = tls->callbacks.begin(); 00353 } 00354 00355 if (tls->cb_it == tls->callbacks.end()) 00356 { 00357 return Empty; 00358 } 00359 00360 ROS_ASSERT(!tls->callbacks.empty()); 00361 ROS_ASSERT(tls->cb_it != tls->callbacks.end()); 00362 00363 CallbackInfo info = *tls->cb_it; 00364 CallbackInterfacePtr& cb = info.callback; 00365 00366 IDInfoPtr id_info = getIDInfo(info.removal_id); 00367 if (id_info) 00368 { 00369 boost::shared_lock<boost::shared_mutex> rw_lock(id_info->calling_rw_mutex); 00370 00371 uint64_t last_calling = tls->calling_in_this_thread; 00372 tls->calling_in_this_thread = id_info->id; 00373 00374 CallbackInterface::CallResult result = CallbackInterface::Invalid; 00375 00376 { 00377 // Ensure that thread id gets restored, even if callback throws. 00378 // This is done with RAII rather than try-catch so that the source 00379 // of the original exception is not masked in a crash report. 00380 BOOST_SCOPE_EXIT(&tls, &last_calling) 00381 { 00382 tls->calling_in_this_thread = last_calling; 00383 } 00384 BOOST_SCOPE_EXIT_END 00385 00386 if (info.marked_for_removal) 00387 { 00388 tls->cb_it = tls->callbacks.erase(tls->cb_it); 00389 } 00390 else 00391 { 00392 tls->cb_it = tls->callbacks.erase(tls->cb_it); 00393 result = cb->call(); 00394 } 00395 } 00396 00397 // Push TryAgain callbacks to the back of the shared queue 00398 if (result == CallbackInterface::TryAgain && !info.marked_for_removal) 00399 { 00400 boost::mutex::scoped_lock lock(mutex_); 00401 callbacks_.push_back(info); 00402 00403 return TryAgain; 00404 } 00405 00406 return Called; 00407 } 00408 else 00409 { 00410 tls->cb_it = tls->callbacks.erase(tls->cb_it); 00411 } 00412 00413 return Called; 00414 } 00415 00416 }