callback_queue.cpp
Go to the documentation of this file.
00001 /*
00002  * Software License Agreement (BSD License)
00003  *
00004  *  Copyright (c) 2009, Willow Garage, Inc.
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions
00009  *  are met:
00010  *
00011  *   * Redistributions of source code must retain the above copyright
00012  *     notice, this list of conditions and the following disclaimer.
00013  *   * Redistributions in binary form must reproduce the above
00014  *     copyright notice, this list of conditions and the following
00015  *     disclaimer in the documentation and/or other materials provided
00016  *     with the distribution.
00017  *   * Neither the name of Willow Garage, Inc. nor the names of its
00018  *     contributors may be used to endorse or promote products derived
00019  *     from this software without specific prior written permission.
00020  *
00021  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024  *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025  *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027  *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028  *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029  *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030  *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031  *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032  *  POSSIBILITY OF SUCH DAMAGE.
00033  */
00034 
00035 #include "ros/callback_queue.h"
00036 #include "ros/assert.h"
00037 #include <boost/scope_exit.hpp>
00038 
00039 namespace ros
00040 {
00041 
00042 CallbackQueue::CallbackQueue(bool enabled)
00043 : calling_(0)
00044 , enabled_(enabled)
00045 {
00046 }
00047 
00048 CallbackQueue::~CallbackQueue()
00049 {
00050   disable();
00051 }
00052 
00053 void CallbackQueue::enable()
00054 {
00055   boost::mutex::scoped_lock lock(mutex_);
00056   enabled_ = true;
00057 
00058   condition_.notify_all();
00059 }
00060 
00061 void CallbackQueue::disable()
00062 {
00063   boost::mutex::scoped_lock lock(mutex_);
00064   enabled_ = false;
00065 
00066   condition_.notify_all();
00067 }
00068 
00069 void CallbackQueue::clear()
00070 {
00071   boost::mutex::scoped_lock lock(mutex_);
00072 
00073   callbacks_.clear();
00074 }
00075 
00076 bool CallbackQueue::isEmpty()
00077 {
00078   boost::mutex::scoped_lock lock(mutex_);
00079 
00080   return callbacks_.empty() && calling_ == 0;
00081 }
00082 
00083 bool CallbackQueue::isEnabled()
00084 {
00085   boost::mutex::scoped_lock lock(mutex_);
00086 
00087   return enabled_;
00088 }
00089 
00090 void CallbackQueue::setupTLS()
00091 {
00092   if (!tls_.get())
00093   {
00094     tls_.reset(new TLS);
00095   }
00096 }
00097 
00098 void CallbackQueue::addCallback(const CallbackInterfacePtr& callback, uint64_t removal_id)
00099 {
00100   CallbackInfo info;
00101   info.callback = callback;
00102   info.removal_id = removal_id;
00103 
00104   {
00105     boost::mutex::scoped_lock lock(mutex_);
00106 
00107     if (!enabled_)
00108     {
00109       return;
00110     }
00111 
00112     callbacks_.push_back(info);
00113   }
00114 
00115   {
00116     boost::mutex::scoped_lock lock(id_info_mutex_);
00117 
00118     M_IDInfo::iterator it = id_info_.find(removal_id);
00119     if (it == id_info_.end())
00120     {
00121       IDInfoPtr id_info(boost::make_shared<IDInfo>());
00122       id_info->id = removal_id;
00123       id_info_.insert(std::make_pair(removal_id, id_info));
00124     }
00125   }
00126 
00127   condition_.notify_one();
00128 }
00129 
00130 CallbackQueue::IDInfoPtr CallbackQueue::getIDInfo(uint64_t id)
00131 {
00132   boost::mutex::scoped_lock lock(id_info_mutex_);
00133   M_IDInfo::iterator it = id_info_.find(id);
00134   if (it != id_info_.end())
00135   {
00136     return it->second;
00137   }
00138 
00139   return IDInfoPtr();
00140 }
00141 
00142 void CallbackQueue::removeByID(uint64_t removal_id)
00143 {
00144   setupTLS();
00145 
00146   {
00147     IDInfoPtr id_info;
00148     {
00149       boost::mutex::scoped_lock lock(id_info_mutex_);
00150       M_IDInfo::iterator it = id_info_.find(removal_id);
00151       if (it != id_info_.end())
00152       {
00153         id_info = it->second;
00154       }
00155       else
00156       {
00157         return;
00158       }
00159     }
00160 
00161     // If we're being called from within a callback from our queue, we must unlock the shared lock we already own
00162     // here so that we can take a unique lock.  We'll re-lock it later.
00163     if (tls_->calling_in_this_thread == id_info->id)
00164     {
00165       id_info->calling_rw_mutex.unlock_shared();
00166     }
00167 
00168     {
00169       boost::unique_lock<boost::shared_mutex> rw_lock(id_info->calling_rw_mutex);
00170       boost::mutex::scoped_lock lock(mutex_);
00171       D_CallbackInfo::iterator it = callbacks_.begin();
00172       for (; it != callbacks_.end();)
00173       {
00174         CallbackInfo& info = *it;
00175         if (info.removal_id == removal_id)
00176         {
00177           it = callbacks_.erase(it);
00178         }
00179         else
00180         {
00181           ++it;
00182         }
00183       }
00184     }
00185 
00186     if (tls_->calling_in_this_thread == id_info->id)
00187     {
00188       id_info->calling_rw_mutex.lock_shared();
00189     }
00190   }
00191 
00192   // If we're being called from within a callback, we need to remove the callbacks that match the id that have already been
00193   // popped off the queue
00194   {
00195     D_CallbackInfo::iterator it = tls_->callbacks.begin();
00196     D_CallbackInfo::iterator end = tls_->callbacks.end();
00197     for (; it != end; ++it)
00198     {
00199       CallbackInfo& info = *it;
00200       if (info.removal_id == removal_id)
00201       {
00202         info.marked_for_removal = true;
00203       }
00204     }
00205   }
00206 
00207   {
00208     boost::mutex::scoped_lock lock(id_info_mutex_);
00209     id_info_.erase(removal_id);
00210   }
00211 }
00212 
00213 CallbackQueue::CallOneResult CallbackQueue::callOne(ros::WallDuration timeout)
00214 {
00215   setupTLS();
00216   TLS* tls = tls_.get();
00217 
00218   CallbackInfo cb_info;
00219 
00220   {
00221     boost::mutex::scoped_lock lock(mutex_);
00222 
00223     if (!enabled_)
00224     {
00225       return Disabled;
00226     }
00227 
00228     if (callbacks_.empty())
00229     {
00230       if (!timeout.isZero())
00231       {
00232         condition_.timed_wait(lock, boost::posix_time::microseconds(timeout.toSec() * 1000000.0f));
00233       }
00234 
00235       if (callbacks_.empty())
00236       {
00237         return Empty;
00238       }
00239 
00240       if (!enabled_)
00241       {
00242         return Disabled;
00243       }
00244     }
00245 
00246     D_CallbackInfo::iterator it = callbacks_.begin();
00247     for (; it != callbacks_.end();)
00248     {
00249       CallbackInfo& info = *it;
00250 
00251       if (info.marked_for_removal)
00252       {
00253         it = callbacks_.erase(it);
00254         continue;
00255       }
00256 
00257       if (info.callback->ready())
00258       {
00259         cb_info = info;
00260         it = callbacks_.erase(it);
00261         break;
00262       }
00263 
00264       ++it;
00265     }
00266 
00267     if (!cb_info.callback)
00268     {
00269       return TryAgain;
00270     }
00271 
00272     ++calling_;
00273   }
00274 
00275   bool was_empty = tls->callbacks.empty();
00276   tls->callbacks.push_back(cb_info);
00277   if (was_empty)
00278   {
00279     tls->cb_it = tls->callbacks.begin();
00280   }
00281 
00282   CallOneResult res = callOneCB(tls);
00283   if (res != Empty)
00284   {
00285     boost::mutex::scoped_lock lock(mutex_);
00286     --calling_;
00287   }
00288   return res;
00289 }
00290 
00291 void CallbackQueue::callAvailable(ros::WallDuration timeout)
00292 {
00293   setupTLS();
00294   TLS* tls = tls_.get();
00295 
00296   {
00297     boost::mutex::scoped_lock lock(mutex_);
00298 
00299     if (!enabled_)
00300     {
00301       return;
00302     }
00303 
00304     if (callbacks_.empty())
00305     {
00306       if (!timeout.isZero())
00307       {
00308         condition_.timed_wait(lock, boost::posix_time::microseconds(timeout.toSec() * 1000000.0f));
00309       }
00310 
00311       if (callbacks_.empty() || !enabled_)
00312       {
00313         return;
00314       }
00315     }
00316 
00317     bool was_empty = tls->callbacks.empty();
00318 
00319     tls->callbacks.insert(tls->callbacks.end(), callbacks_.begin(), callbacks_.end());
00320     callbacks_.clear();
00321 
00322     calling_ += tls->callbacks.size();
00323 
00324     if (was_empty)
00325     {
00326       tls->cb_it = tls->callbacks.begin();
00327     }
00328   }
00329 
00330   size_t called = 0;
00331 
00332   while (!tls->callbacks.empty())
00333   {
00334     if (callOneCB(tls) != Empty)
00335     {
00336       ++called;
00337     }
00338   }
00339 
00340   {
00341     boost::mutex::scoped_lock lock(mutex_);
00342     calling_ -= called;
00343   }
00344 }
00345 
00346 CallbackQueue::CallOneResult CallbackQueue::callOneCB(TLS* tls)
00347 {
00348   // Check for a recursive call.  If recursive, increment the current iterator.  Otherwise
00349   // set the iterator it the beginning of the thread-local callbacks
00350   if (tls->calling_in_this_thread == 0xffffffffffffffffULL)
00351   {
00352     tls->cb_it = tls->callbacks.begin();
00353   }
00354 
00355   if (tls->cb_it == tls->callbacks.end())
00356   {
00357     return Empty;
00358   }
00359 
00360   ROS_ASSERT(!tls->callbacks.empty());
00361   ROS_ASSERT(tls->cb_it != tls->callbacks.end());
00362 
00363   CallbackInfo info = *tls->cb_it;
00364   CallbackInterfacePtr& cb = info.callback;
00365 
00366   IDInfoPtr id_info = getIDInfo(info.removal_id);
00367   if (id_info)
00368   {
00369     boost::shared_lock<boost::shared_mutex> rw_lock(id_info->calling_rw_mutex);
00370 
00371     uint64_t last_calling = tls->calling_in_this_thread;
00372     tls->calling_in_this_thread = id_info->id;
00373 
00374     CallbackInterface::CallResult result = CallbackInterface::Invalid;
00375 
00376     {
00377       // Ensure that thread id gets restored, even if callback throws.
00378       // This is done with RAII rather than try-catch so that the source
00379       // of the original exception is not masked in a crash report.
00380       BOOST_SCOPE_EXIT(&tls, &last_calling)
00381       {
00382         tls->calling_in_this_thread = last_calling;
00383       }
00384       BOOST_SCOPE_EXIT_END
00385 
00386       if (info.marked_for_removal)
00387       {
00388         tls->cb_it = tls->callbacks.erase(tls->cb_it);
00389       }
00390       else
00391       {
00392         tls->cb_it = tls->callbacks.erase(tls->cb_it);
00393         result = cb->call();
00394       }
00395     }
00396 
00397     // Push TryAgain callbacks to the back of the shared queue
00398     if (result == CallbackInterface::TryAgain && !info.marked_for_removal)
00399     {
00400       boost::mutex::scoped_lock lock(mutex_);
00401       callbacks_.push_back(info);
00402 
00403       return TryAgain;
00404     }
00405 
00406     return Called;
00407   }
00408   else
00409   {
00410     tls->cb_it = tls->callbacks.erase(tls->cb_it);
00411   }
00412 
00413   return Called;
00414 }
00415 
00416 }


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Thu Jun 6 2019 21:10:05