00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "ros/callback_queue.h"
00036 #include "ros/assert.h"
00037
00038 namespace ros
00039 {
00040
00041 CallbackQueue::CallbackQueue(bool enabled)
00042 : calling_(0)
00043 , enabled_(enabled)
00044 {
00045 }
00046
00047 CallbackQueue::~CallbackQueue()
00048 {
00049 disable();
00050 }
00051
00052 void CallbackQueue::enable()
00053 {
00054 boost::mutex::scoped_lock lock(mutex_);
00055 enabled_ = true;
00056
00057 condition_.notify_all();
00058 }
00059
00060 void CallbackQueue::disable()
00061 {
00062 boost::mutex::scoped_lock lock(mutex_);
00063 enabled_ = false;
00064
00065 condition_.notify_all();
00066 }
00067
00068 void CallbackQueue::clear()
00069 {
00070 boost::mutex::scoped_lock lock(mutex_);
00071
00072 callbacks_.clear();
00073 }
00074
00075 bool CallbackQueue::isEmpty()
00076 {
00077 boost::mutex::scoped_lock lock(mutex_);
00078
00079 return callbacks_.empty() && calling_ == 0;
00080 }
00081
00082 bool CallbackQueue::isEnabled()
00083 {
00084 boost::mutex::scoped_lock lock(mutex_);
00085
00086 return enabled_;
00087 }
00088
00089 void CallbackQueue::setupTLS()
00090 {
00091 if (!tls_.get())
00092 {
00093 tls_.reset(new TLS);
00094 }
00095 }
00096
00097 void CallbackQueue::addCallback(const CallbackInterfacePtr& callback, uint64_t removal_id)
00098 {
00099 CallbackInfo info;
00100 info.callback = callback;
00101 info.removal_id = removal_id;
00102
00103 {
00104 boost::mutex::scoped_lock lock(mutex_);
00105
00106 if (!enabled_)
00107 {
00108 return;
00109 }
00110
00111 callbacks_.push_back(info);
00112 }
00113
00114 {
00115 boost::mutex::scoped_lock lock(id_info_mutex_);
00116
00117 M_IDInfo::iterator it = id_info_.find(removal_id);
00118 if (it == id_info_.end())
00119 {
00120 IDInfoPtr id_info(new IDInfo);
00121 id_info->id = removal_id;
00122 id_info_.insert(std::make_pair(removal_id, id_info));
00123 }
00124 }
00125
00126 condition_.notify_one();
00127 }
00128
00129 CallbackQueue::IDInfoPtr CallbackQueue::getIDInfo(uint64_t id)
00130 {
00131 boost::mutex::scoped_lock lock(id_info_mutex_);
00132 M_IDInfo::iterator it = id_info_.find(id);
00133 if (it != id_info_.end())
00134 {
00135 return it->second;
00136 }
00137
00138 return IDInfoPtr();
00139 }
00140
00141 void CallbackQueue::removeByID(uint64_t removal_id)
00142 {
00143 setupTLS();
00144
00145 {
00146 IDInfoPtr id_info;
00147 {
00148 boost::mutex::scoped_lock lock(id_info_mutex_);
00149 M_IDInfo::iterator it = id_info_.find(removal_id);
00150 if (it != id_info_.end())
00151 {
00152 id_info = it->second;
00153 }
00154 else
00155 {
00156 return;
00157 }
00158 }
00159
00160
00161
00162 if (tls_->calling_in_this_thread == id_info->id)
00163 {
00164 id_info->calling_rw_mutex.unlock_shared();
00165 }
00166
00167 {
00168 boost::unique_lock<boost::shared_mutex> rw_lock(id_info->calling_rw_mutex);
00169 boost::mutex::scoped_lock lock(mutex_);
00170 D_CallbackInfo::iterator it = callbacks_.begin();
00171 for (; it != callbacks_.end();)
00172 {
00173 CallbackInfo& info = *it;
00174 if (info.removal_id == removal_id)
00175 {
00176 it = callbacks_.erase(it);
00177 }
00178 else
00179 {
00180 ++it;
00181 }
00182 }
00183 }
00184
00185 if (tls_->calling_in_this_thread == id_info->id)
00186 {
00187 id_info->calling_rw_mutex.lock_shared();
00188 }
00189 }
00190
00191
00192
00193 {
00194 D_CallbackInfo::iterator it = tls_->callbacks.begin();
00195 D_CallbackInfo::iterator end = tls_->callbacks.end();
00196 for (; it != end; ++it)
00197 {
00198 CallbackInfo& info = *it;
00199 if (info.removal_id == removal_id)
00200 {
00201 info.marked_for_removal = true;
00202 }
00203 }
00204 }
00205
00206 {
00207 boost::mutex::scoped_lock lock(id_info_mutex_);
00208 id_info_.erase(removal_id);
00209 }
00210 }
00211
00212 CallbackQueue::CallOneResult CallbackQueue::callOne(ros::WallDuration timeout)
00213 {
00214 setupTLS();
00215 TLS* tls = tls_.get();
00216
00217 CallbackInfo cb_info;
00218
00219 {
00220 boost::mutex::scoped_lock lock(mutex_);
00221
00222 if (!enabled_)
00223 {
00224 return Disabled;
00225 }
00226
00227 if (callbacks_.empty())
00228 {
00229 if (!timeout.isZero())
00230 {
00231 condition_.timed_wait(lock, boost::posix_time::microseconds(timeout.toSec() * 1000000.0f));
00232 }
00233
00234 if (callbacks_.empty())
00235 {
00236 return Empty;
00237 }
00238
00239 if (!enabled_)
00240 {
00241 return Disabled;
00242 }
00243 }
00244
00245 D_CallbackInfo::iterator it = callbacks_.begin();
00246 for (; it != callbacks_.end();)
00247 {
00248 CallbackInfo& info = *it;
00249
00250 if (info.marked_for_removal)
00251 {
00252 it = callbacks_.erase(it);
00253 continue;
00254 }
00255
00256 if (info.callback->ready())
00257 {
00258 cb_info = info;
00259 it = callbacks_.erase(it);
00260 break;
00261 }
00262
00263 ++it;
00264 }
00265
00266 if (!cb_info.callback)
00267 {
00268 return TryAgain;
00269 }
00270
00271 ++calling_;
00272 }
00273
00274 bool was_empty = tls->callbacks.empty();
00275 tls->callbacks.push_back(cb_info);
00276 if (was_empty)
00277 {
00278 tls->cb_it = tls->callbacks.begin();
00279 }
00280
00281 CallOneResult res = callOneCB(tls);
00282 if (res != Empty)
00283 {
00284 boost::mutex::scoped_lock lock(mutex_);
00285 --calling_;
00286 }
00287 return res;
00288 }
00289
00290 void CallbackQueue::callAvailable(ros::WallDuration timeout)
00291 {
00292 setupTLS();
00293 TLS* tls = tls_.get();
00294
00295 {
00296 boost::mutex::scoped_lock lock(mutex_);
00297
00298 if (!enabled_)
00299 {
00300 return;
00301 }
00302
00303 if (callbacks_.empty())
00304 {
00305 if (!timeout.isZero())
00306 {
00307 condition_.timed_wait(lock, boost::posix_time::microseconds(timeout.toSec() * 1000000.0f));
00308 }
00309
00310 if (callbacks_.empty() || !enabled_)
00311 {
00312 return;
00313 }
00314 }
00315
00316 bool was_empty = tls->callbacks.empty();
00317
00318 tls->callbacks.insert(tls->callbacks.end(), callbacks_.begin(), callbacks_.end());
00319 callbacks_.clear();
00320
00321 calling_ += tls->callbacks.size();
00322
00323 if (was_empty)
00324 {
00325 tls->cb_it = tls->callbacks.begin();
00326 }
00327 }
00328
00329 size_t called = 0;
00330
00331 while (!tls->callbacks.empty())
00332 {
00333 if (callOneCB(tls) != Empty)
00334 {
00335 ++called;
00336 }
00337 }
00338
00339 {
00340 boost::mutex::scoped_lock lock(mutex_);
00341 calling_ -= called;
00342 }
00343 }
00344
00345 CallbackQueue::CallOneResult CallbackQueue::callOneCB(TLS* tls)
00346 {
00347
00348
00349 if (tls->calling_in_this_thread == 0xffffffffffffffffULL)
00350 {
00351 tls->cb_it = tls->callbacks.begin();
00352 }
00353
00354 if (tls->cb_it == tls->callbacks.end())
00355 {
00356 return Empty;
00357 }
00358
00359 ROS_ASSERT(!tls->callbacks.empty());
00360 ROS_ASSERT(tls->cb_it != tls->callbacks.end());
00361
00362 CallbackInfo info = *tls->cb_it;
00363 CallbackInterfacePtr& cb = info.callback;
00364
00365 IDInfoPtr id_info = getIDInfo(info.removal_id);
00366 if (id_info)
00367 {
00368 boost::shared_lock<boost::shared_mutex> rw_lock(id_info->calling_rw_mutex);
00369
00370 uint64_t last_calling = tls->calling_in_this_thread;
00371 tls->calling_in_this_thread = id_info->id;
00372
00373 CallbackInterface::CallResult result = CallbackInterface::Invalid;
00374 if (info.marked_for_removal)
00375 {
00376 tls->cb_it = tls->callbacks.erase(tls->cb_it);
00377 }
00378 else
00379 {
00380 tls->cb_it = tls->callbacks.erase(tls->cb_it);
00381 result = cb->call();
00382 }
00383
00384 tls->calling_in_this_thread = last_calling;
00385
00386
00387 if (result == CallbackInterface::TryAgain && !info.marked_for_removal)
00388 {
00389 boost::mutex::scoped_lock lock(mutex_);
00390 callbacks_.push_back(info);
00391
00392 return TryAgain;
00393 }
00394
00395 return Called;
00396 }
00397 else
00398 {
00399 tls->cb_it = tls->callbacks.erase(tls->cb_it);
00400 }
00401
00402 return Called;
00403 }
00404
00405 }