callback_queue.cpp
Go to the documentation of this file.
00001 /*
00002  * Software License Agreement (BSD License)
00003  *
00004  *  Copyright (c) 2009, Willow Garage, Inc.
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions
00009  *  are met:
00010  *
00011  *   * Redistributions of source code must retain the above copyright
00012  *     notice, this list of conditions and the following disclaimer.
00013  *   * Redistributions in binary form must reproduce the above
00014  *     copyright notice, this list of conditions and the following
00015  *     disclaimer in the documentation and/or other materials provided
00016  *     with the distribution.
00017  *   * Neither the name of Willow Garage, Inc. nor the names of its
00018  *     contributors may be used to endorse or promote products derived
00019  *     from this software without specific prior written permission.
00020  *
00021  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024  *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025  *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027  *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028  *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029  *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030  *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031  *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032  *  POSSIBILITY OF SUCH DAMAGE.
00033  */
00034 
00035 #include "ros/callback_queue.h"
00036 #include "ros/assert.h"
00037 
00038 namespace ros
00039 {
00040 
00041 CallbackQueue::CallbackQueue(bool enabled)
00042 : calling_(0)
00043 , enabled_(enabled)
00044 {
00045 }
00046 
00047 CallbackQueue::~CallbackQueue()
00048 {
00049   disable();
00050 }
00051 
00052 void CallbackQueue::enable()
00053 {
00054   boost::mutex::scoped_lock lock(mutex_);
00055   enabled_ = true;
00056 
00057   condition_.notify_all();
00058 }
00059 
00060 void CallbackQueue::disable()
00061 {
00062   boost::mutex::scoped_lock lock(mutex_);
00063   enabled_ = false;
00064 
00065   condition_.notify_all();
00066 }
00067 
00068 void CallbackQueue::clear()
00069 {
00070   boost::mutex::scoped_lock lock(mutex_);
00071 
00072   callbacks_.clear();
00073 }
00074 
00075 bool CallbackQueue::isEmpty()
00076 {
00077   boost::mutex::scoped_lock lock(mutex_);
00078 
00079   return callbacks_.empty() && calling_ == 0;
00080 }
00081 
00082 bool CallbackQueue::isEnabled()
00083 {
00084   boost::mutex::scoped_lock lock(mutex_);
00085 
00086   return enabled_;
00087 }
00088 
00089 void CallbackQueue::setupTLS()
00090 {
00091   if (!tls_.get())
00092   {
00093     tls_.reset(new TLS);
00094   }
00095 }
00096 
00097 void CallbackQueue::addCallback(const CallbackInterfacePtr& callback, uint64_t removal_id)
00098 {
00099   CallbackInfo info;
00100   info.callback = callback;
00101   info.removal_id = removal_id;
00102 
00103   {
00104     boost::mutex::scoped_lock lock(mutex_);
00105 
00106     if (!enabled_)
00107     {
00108       return;
00109     }
00110 
00111     callbacks_.push_back(info);
00112   }
00113 
00114   {
00115     boost::mutex::scoped_lock lock(id_info_mutex_);
00116 
00117     M_IDInfo::iterator it = id_info_.find(removal_id);
00118     if (it == id_info_.end())
00119     {
00120       IDInfoPtr id_info(new IDInfo);
00121       id_info->id = removal_id;
00122       id_info_.insert(std::make_pair(removal_id, id_info));
00123     }
00124   }
00125 
00126   condition_.notify_one();
00127 }
00128 
00129 CallbackQueue::IDInfoPtr CallbackQueue::getIDInfo(uint64_t id)
00130 {
00131   boost::mutex::scoped_lock lock(id_info_mutex_);
00132   M_IDInfo::iterator it = id_info_.find(id);
00133   if (it != id_info_.end())
00134   {
00135     return it->second;
00136   }
00137 
00138   return IDInfoPtr();
00139 }
00140 
00141 void CallbackQueue::removeByID(uint64_t removal_id)
00142 {
00143   setupTLS();
00144 
00145   {
00146     IDInfoPtr id_info;
00147     {
00148       boost::mutex::scoped_lock lock(id_info_mutex_);
00149       M_IDInfo::iterator it = id_info_.find(removal_id);
00150       if (it != id_info_.end())
00151       {
00152         id_info = it->second;
00153       }
00154       else
00155       {
00156         return;
00157       }
00158     }
00159 
00160     // If we're being called from within a callback from our queue, we must unlock the shared lock we already own
00161     // here so that we can take a unique lock.  We'll re-lock it later.
00162     if (tls_->calling_in_this_thread == id_info->id)
00163     {
00164       id_info->calling_rw_mutex.unlock_shared();
00165     }
00166 
00167     {
00168       boost::unique_lock<boost::shared_mutex> rw_lock(id_info->calling_rw_mutex);
00169       boost::mutex::scoped_lock lock(mutex_);
00170       D_CallbackInfo::iterator it = callbacks_.begin();
00171       for (; it != callbacks_.end();)
00172       {
00173         CallbackInfo& info = *it;
00174         if (info.removal_id == removal_id)
00175         {
00176           it = callbacks_.erase(it);
00177         }
00178         else
00179         {
00180           ++it;
00181         }
00182       }
00183     }
00184 
00185     if (tls_->calling_in_this_thread == id_info->id)
00186     {
00187       id_info->calling_rw_mutex.lock_shared();
00188     }
00189   }
00190 
00191   // If we're being called from within a callback, we need to remove the callbacks that match the id that have already been
00192   // popped off the queue
00193   {
00194     D_CallbackInfo::iterator it = tls_->callbacks.begin();
00195     D_CallbackInfo::iterator end = tls_->callbacks.end();
00196     for (; it != end; ++it)
00197     {
00198       CallbackInfo& info = *it;
00199       if (info.removal_id == removal_id)
00200       {
00201         info.marked_for_removal = true;
00202       }
00203     }
00204   }
00205 
00206   {
00207     boost::mutex::scoped_lock lock(id_info_mutex_);
00208     id_info_.erase(removal_id);
00209   }
00210 }
00211 
00212 CallbackQueue::CallOneResult CallbackQueue::callOne(ros::WallDuration timeout)
00213 {
00214   setupTLS();
00215   TLS* tls = tls_.get();
00216 
00217   CallbackInfo cb_info;
00218 
00219   {
00220     boost::mutex::scoped_lock lock(mutex_);
00221 
00222     if (!enabled_)
00223     {
00224       return Disabled;
00225     }
00226 
00227     if (callbacks_.empty())
00228     {
00229       if (!timeout.isZero())
00230       {
00231         condition_.timed_wait(lock, boost::posix_time::microseconds(timeout.toSec() * 1000000.0f));
00232       }
00233 
00234       if (callbacks_.empty())
00235       {
00236         return Empty;
00237       }
00238 
00239       if (!enabled_)
00240       {
00241         return Disabled;
00242       }
00243     }
00244 
00245     D_CallbackInfo::iterator it = callbacks_.begin();
00246     for (; it != callbacks_.end();)
00247     {
00248       CallbackInfo& info = *it;
00249 
00250       if (info.marked_for_removal)
00251       {
00252         it = callbacks_.erase(it);
00253         continue;
00254       }
00255 
00256       if (info.callback->ready())
00257       {
00258         cb_info = info;
00259         it = callbacks_.erase(it);
00260         break;
00261       }
00262 
00263       ++it;
00264     }
00265 
00266     if (!cb_info.callback)
00267     {
00268       return TryAgain;
00269     }
00270 
00271     ++calling_;
00272   }
00273 
00274   bool was_empty = tls->callbacks.empty();
00275   tls->callbacks.push_back(cb_info);
00276   if (was_empty)
00277   {
00278     tls->cb_it = tls->callbacks.begin();
00279   }
00280 
00281   CallOneResult res = callOneCB(tls);
00282   if (res != Empty)
00283   {
00284     boost::mutex::scoped_lock lock(mutex_);
00285     --calling_;
00286   }
00287   return res;
00288 }
00289 
00290 void CallbackQueue::callAvailable(ros::WallDuration timeout)
00291 {
00292   setupTLS();
00293   TLS* tls = tls_.get();
00294 
00295   {
00296     boost::mutex::scoped_lock lock(mutex_);
00297 
00298     if (!enabled_)
00299     {
00300       return;
00301     }
00302 
00303     if (callbacks_.empty())
00304     {
00305       if (!timeout.isZero())
00306       {
00307         condition_.timed_wait(lock, boost::posix_time::microseconds(timeout.toSec() * 1000000.0f));
00308       }
00309 
00310       if (callbacks_.empty() || !enabled_)
00311       {
00312         return;
00313       }
00314     }
00315 
00316     bool was_empty = tls->callbacks.empty();
00317 
00318     tls->callbacks.insert(tls->callbacks.end(), callbacks_.begin(), callbacks_.end());
00319     callbacks_.clear();
00320 
00321     calling_ += tls->callbacks.size();
00322 
00323     if (was_empty)
00324     {
00325       tls->cb_it = tls->callbacks.begin();
00326     }
00327   }
00328 
00329   size_t called = 0;
00330 
00331   while (!tls->callbacks.empty())
00332   {
00333     if (callOneCB(tls) != Empty)
00334     {
00335       ++called;
00336     }
00337   }
00338 
00339   {
00340     boost::mutex::scoped_lock lock(mutex_);
00341     calling_ -= called;
00342   }
00343 }
00344 
00345 CallbackQueue::CallOneResult CallbackQueue::callOneCB(TLS* tls)
00346 {
00347   // Check for a recursive call.  If recursive, increment the current iterator.  Otherwise
00348   // set the iterator it the beginning of the thread-local callbacks
00349   if (tls->calling_in_this_thread == 0xffffffffffffffffULL)
00350   {
00351     tls->cb_it = tls->callbacks.begin();
00352   }
00353 
00354   if (tls->cb_it == tls->callbacks.end())
00355   {
00356     return Empty;
00357   }
00358 
00359   ROS_ASSERT(!tls->callbacks.empty());
00360   ROS_ASSERT(tls->cb_it != tls->callbacks.end());
00361 
00362   CallbackInfo info = *tls->cb_it;
00363   CallbackInterfacePtr& cb = info.callback;
00364 
00365   IDInfoPtr id_info = getIDInfo(info.removal_id);
00366   if (id_info)
00367   {
00368     boost::shared_lock<boost::shared_mutex> rw_lock(id_info->calling_rw_mutex);
00369 
00370     uint64_t last_calling = tls->calling_in_this_thread;
00371     tls->calling_in_this_thread = id_info->id;
00372 
00373     CallbackInterface::CallResult result = CallbackInterface::Invalid;
00374     try {
00375       if (info.marked_for_removal)
00376       {
00377         tls->cb_it = tls->callbacks.erase(tls->cb_it);
00378       }
00379       else
00380       {
00381         tls->cb_it = tls->callbacks.erase(tls->cb_it);
00382         result = cb->call();
00383       }
00384     }
00385     catch (std::exception&)
00386     {
00387       // ensure that thread id gets restored, even in case of an exception
00388       tls->calling_in_this_thread = last_calling;
00389       throw;
00390     }
00391 
00392     tls->calling_in_this_thread = last_calling;
00393 
00394     // Push TryAgain callbacks to the back of the shared queue
00395     if (result == CallbackInterface::TryAgain && !info.marked_for_removal)
00396     {
00397       boost::mutex::scoped_lock lock(mutex_);
00398       callbacks_.push_back(info);
00399 
00400       return TryAgain;
00401     }
00402 
00403     return Called;
00404   }
00405   else
00406   {
00407     tls->cb_it = tls->callbacks.erase(tls->cb_it);
00408   }
00409 
00410   return Called;
00411 }
00412 
00413 }


roscpp
Author(s): Morgan Quigley, Josh Faust, Brian Gerkey, Troy Straszheim
autogenerated on Mon Oct 6 2014 11:46:44