$search
00001 /* 00002 * Software License Agreement (BSD License) 00003 * 00004 * Copyright (c) 2009, Willow Garage, Inc. 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions 00009 * are met: 00010 * 00011 * * Redistributions of source code must retain the above copyright 00012 * notice, this list of conditions and the following disclaimer. 00013 * * Redistributions in binary form must reproduce the above 00014 * copyright notice, this list of conditions and the following 00015 * disclaimer in the documentation and/or other materials provided 00016 * with the distribution. 00017 * * Neither the name of Willow Garage, Inc. nor the names of its 00018 * contributors may be used to endorse or promote products derived 00019 * from this software without specific prior written permission. 00020 * 00021 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 * POSSIBILITY OF SUCH DAMAGE. 00033 */ 00034 00035 #include "ros/callback_queue.h" 00036 #include "ros/assert.h" 00037 00038 namespace ros 00039 { 00040 00041 CallbackQueue::CallbackQueue(bool enabled) 00042 : calling_(0) 00043 , enabled_(enabled) 00044 { 00045 } 00046 00047 CallbackQueue::~CallbackQueue() 00048 { 00049 disable(); 00050 } 00051 00052 void CallbackQueue::enable() 00053 { 00054 boost::mutex::scoped_lock lock(mutex_); 00055 enabled_ = true; 00056 00057 condition_.notify_all(); 00058 } 00059 00060 void CallbackQueue::disable() 00061 { 00062 boost::mutex::scoped_lock lock(mutex_); 00063 enabled_ = false; 00064 00065 condition_.notify_all(); 00066 } 00067 00068 void CallbackQueue::clear() 00069 { 00070 boost::mutex::scoped_lock lock(mutex_); 00071 00072 callbacks_.clear(); 00073 } 00074 00075 bool CallbackQueue::isEmpty() 00076 { 00077 boost::mutex::scoped_lock lock(mutex_); 00078 00079 return callbacks_.empty() && calling_ == 0; 00080 } 00081 00082 bool CallbackQueue::isEnabled() 00083 { 00084 boost::mutex::scoped_lock lock(mutex_); 00085 00086 return enabled_; 00087 } 00088 00089 void CallbackQueue::setupTLS() 00090 { 00091 if (!tls_.get()) 00092 { 00093 tls_.reset(new TLS); 00094 } 00095 } 00096 00097 void CallbackQueue::addCallback(const CallbackInterfacePtr& callback, uint64_t removal_id) 00098 { 00099 CallbackInfo info; 00100 info.callback = callback; 00101 info.removal_id = removal_id; 00102 00103 { 00104 boost::mutex::scoped_lock lock(mutex_); 00105 00106 if (!enabled_) 00107 { 00108 return; 00109 } 00110 00111 callbacks_.push_back(info); 00112 } 00113 00114 { 00115 boost::mutex::scoped_lock lock(id_info_mutex_); 00116 00117 M_IDInfo::iterator it = id_info_.find(removal_id); 00118 if (it == id_info_.end()) 00119 { 00120 IDInfoPtr id_info(new IDInfo); 00121 id_info->id = removal_id; 00122 id_info_.insert(std::make_pair(removal_id, id_info)); 00123 } 00124 } 00125 00126 condition_.notify_one(); 00127 } 00128 00129 CallbackQueue::IDInfoPtr CallbackQueue::getIDInfo(uint64_t id) 00130 { 00131 boost::mutex::scoped_lock lock(id_info_mutex_); 00132 M_IDInfo::iterator it = id_info_.find(id); 00133 if (it != id_info_.end()) 00134 { 00135 return it->second; 00136 } 00137 00138 return IDInfoPtr(); 00139 } 00140 00141 void CallbackQueue::removeByID(uint64_t removal_id) 00142 { 00143 setupTLS(); 00144 00145 { 00146 IDInfoPtr id_info; 00147 { 00148 boost::mutex::scoped_lock lock(id_info_mutex_); 00149 M_IDInfo::iterator it = id_info_.find(removal_id); 00150 if (it != id_info_.end()) 00151 { 00152 id_info = it->second; 00153 } 00154 else 00155 { 00156 return; 00157 } 00158 } 00159 00160 // If we're being called from within a callback from our queue, we must unlock the shared lock we already own 00161 // here so that we can take a unique lock. We'll re-lock it later. 00162 if (tls_->calling_in_this_thread == id_info->id) 00163 { 00164 id_info->calling_rw_mutex.unlock_shared(); 00165 } 00166 00167 { 00168 boost::unique_lock<boost::shared_mutex> rw_lock(id_info->calling_rw_mutex); 00169 boost::mutex::scoped_lock lock(mutex_); 00170 D_CallbackInfo::iterator it = callbacks_.begin(); 00171 for (; it != callbacks_.end();) 00172 { 00173 CallbackInfo& info = *it; 00174 if (info.removal_id == removal_id) 00175 { 00176 it = callbacks_.erase(it); 00177 } 00178 else 00179 { 00180 ++it; 00181 } 00182 } 00183 } 00184 00185 if (tls_->calling_in_this_thread == id_info->id) 00186 { 00187 id_info->calling_rw_mutex.lock_shared(); 00188 } 00189 } 00190 00191 // If we're being called from within a callback, we need to remove the callbacks that match the id that have already been 00192 // popped off the queue 00193 { 00194 D_CallbackInfo::iterator it = tls_->callbacks.begin(); 00195 D_CallbackInfo::iterator end = tls_->callbacks.end(); 00196 for (; it != end; ++it) 00197 { 00198 CallbackInfo& info = *it; 00199 if (info.removal_id == removal_id) 00200 { 00201 info.marked_for_removal = true; 00202 } 00203 } 00204 } 00205 00206 { 00207 boost::mutex::scoped_lock lock(id_info_mutex_); 00208 id_info_.erase(removal_id); 00209 } 00210 } 00211 00212 CallbackQueue::CallOneResult CallbackQueue::callOne(ros::WallDuration timeout) 00213 { 00214 setupTLS(); 00215 TLS* tls = tls_.get(); 00216 00217 CallbackInfo cb_info; 00218 00219 { 00220 boost::mutex::scoped_lock lock(mutex_); 00221 00222 if (!enabled_) 00223 { 00224 return Disabled; 00225 } 00226 00227 if (callbacks_.empty()) 00228 { 00229 if (!timeout.isZero()) 00230 { 00231 condition_.timed_wait(lock, boost::posix_time::microseconds(timeout.toSec() * 1000000.0f)); 00232 } 00233 00234 if (callbacks_.empty()) 00235 { 00236 return Empty; 00237 } 00238 00239 if (!enabled_) 00240 { 00241 return Disabled; 00242 } 00243 } 00244 00245 D_CallbackInfo::iterator it = callbacks_.begin(); 00246 for (; it != callbacks_.end();) 00247 { 00248 CallbackInfo& info = *it; 00249 00250 if (info.marked_for_removal) 00251 { 00252 it = callbacks_.erase(it); 00253 continue; 00254 } 00255 00256 if (info.callback->ready()) 00257 { 00258 cb_info = info; 00259 it = callbacks_.erase(it); 00260 break; 00261 } 00262 00263 ++it; 00264 } 00265 00266 if (!cb_info.callback) 00267 { 00268 return TryAgain; 00269 } 00270 00271 ++calling_; 00272 } 00273 00274 bool was_empty = tls->callbacks.empty(); 00275 tls->callbacks.push_back(cb_info); 00276 if (was_empty) 00277 { 00278 tls->cb_it = tls->callbacks.begin(); 00279 } 00280 00281 CallOneResult res = callOneCB(tls); 00282 if (res != Empty) 00283 { 00284 boost::mutex::scoped_lock lock(mutex_); 00285 --calling_; 00286 } 00287 return res; 00288 } 00289 00290 void CallbackQueue::callAvailable(ros::WallDuration timeout) 00291 { 00292 setupTLS(); 00293 TLS* tls = tls_.get(); 00294 00295 { 00296 boost::mutex::scoped_lock lock(mutex_); 00297 00298 if (!enabled_) 00299 { 00300 return; 00301 } 00302 00303 if (callbacks_.empty()) 00304 { 00305 if (!timeout.isZero()) 00306 { 00307 condition_.timed_wait(lock, boost::posix_time::microseconds(timeout.toSec() * 1000000.0f)); 00308 } 00309 00310 if (callbacks_.empty() || !enabled_) 00311 { 00312 return; 00313 } 00314 } 00315 00316 bool was_empty = tls->callbacks.empty(); 00317 00318 tls->callbacks.insert(tls->callbacks.end(), callbacks_.begin(), callbacks_.end()); 00319 callbacks_.clear(); 00320 00321 calling_ += tls->callbacks.size(); 00322 00323 if (was_empty) 00324 { 00325 tls->cb_it = tls->callbacks.begin(); 00326 } 00327 } 00328 00329 size_t called = 0; 00330 00331 while (!tls->callbacks.empty()) 00332 { 00333 if (callOneCB(tls) != Empty) 00334 { 00335 ++called; 00336 } 00337 } 00338 00339 { 00340 boost::mutex::scoped_lock lock(mutex_); 00341 calling_ -= called; 00342 } 00343 } 00344 00345 CallbackQueue::CallOneResult CallbackQueue::callOneCB(TLS* tls) 00346 { 00347 // Check for a recursive call. If recursive, increment the current iterator. Otherwise 00348 // set the iterator it the beginning of the thread-local callbacks 00349 if (tls->calling_in_this_thread == 0xffffffffffffffffULL) 00350 { 00351 tls->cb_it = tls->callbacks.begin(); 00352 } 00353 00354 if (tls->cb_it == tls->callbacks.end()) 00355 { 00356 return Empty; 00357 } 00358 00359 ROS_ASSERT(!tls->callbacks.empty()); 00360 ROS_ASSERT(tls->cb_it != tls->callbacks.end()); 00361 00362 CallbackInfo info = *tls->cb_it; 00363 CallbackInterfacePtr& cb = info.callback; 00364 00365 IDInfoPtr id_info = getIDInfo(info.removal_id); 00366 if (id_info) 00367 { 00368 boost::shared_lock<boost::shared_mutex> rw_lock(id_info->calling_rw_mutex); 00369 00370 uint64_t last_calling = tls->calling_in_this_thread; 00371 tls->calling_in_this_thread = id_info->id; 00372 00373 CallbackInterface::CallResult result = CallbackInterface::Invalid; 00374 if (info.marked_for_removal) 00375 { 00376 tls->cb_it = tls->callbacks.erase(tls->cb_it); 00377 } 00378 else 00379 { 00380 tls->cb_it = tls->callbacks.erase(tls->cb_it); 00381 result = cb->call(); 00382 } 00383 00384 tls->calling_in_this_thread = last_calling; 00385 00386 // Push TryAgain callbacks to the back of the shared queue 00387 if (result == CallbackInterface::TryAgain && !info.marked_for_removal) 00388 { 00389 boost::mutex::scoped_lock lock(mutex_); 00390 callbacks_.push_back(info); 00391 00392 return TryAgain; 00393 } 00394 00395 return Called; 00396 } 00397 else 00398 { 00399 tls->cb_it = tls->callbacks.erase(tls->cb_it); 00400 } 00401 00402 return Called; 00403 } 00404 00405 }