bond.cpp
Go to the documentation of this file.
00001 /*
00002  * Copyright (c) 2009, Willow Garage, Inc.
00003  * All rights reserved.
00004  *
00005  * Redistribution and use in source and binary forms, with or without
00006  * modification, are permitted provided that the following conditions are met:
00007  *
00008  *     * Redistributions of source code must retain the above copyright
00009  *       notice, this list of conditions and the following disclaimer.
00010  *     * Redistributions in binary form must reproduce the above copyright
00011  *       notice, this list of conditions and the following disclaimer in the
00012  *       documentation and/or other materials provided with the distribution.
00013  *     * Neither the name of the Willow Garage, Inc. nor the names of its
00014  *       contributors may be used to endorse or promote products derived from
00015  *       this software without specific prior written permission.
00016  *
00017  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00018  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00019  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00020  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00021  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00022  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00023  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00024  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00025  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00026  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00027  * POSSIBILITY OF SUCH DAMAGE.
00028  */
00029 
00030 // Author: Stuart Glaser
00031 
00032 #include <bondcpp/bond.h>
00033 #include <boost/thread/thread_time.hpp>
00034 #include <boost/date_time/posix_time/posix_time_types.hpp>
00035 
00036 #ifdef _WIN32
00037 #include <Rpc.h>
00038 #else
00039 #include <uuid/uuid.h>
00040 #endif
00041 
00042 #include <algorithm>
00043 #include <string>
00044 #include <vector>
00045 
00046 namespace bond {
00047 
00048 static std::string makeUUID()
00049 {
00050 #ifdef _WIN32
00051   UUID uuid;
00052   UuidCreate(&uuid);
00053   unsigned char *str;
00054   UuidToStringA(&uuid, &str);
00055   std::string return_string(reinterpret_cast<char *>(str));
00056   RpcStringFreeA(&str);
00057   return return_string;
00058 #else
00059   uuid_t uuid;
00060   uuid_generate_random(uuid);
00061   char uuid_str[40];
00062   uuid_unparse(uuid, uuid_str);
00063   return std::string(uuid_str);
00064 #endif
00065 }
00066 
00067 Bond::Bond(const std::string &topic, const std::string &id,
00068            boost::function<void(void)> on_broken,
00069            boost::function<void(void)> on_formed)
00070   :
00071 
00072   bondsm_(new BondSM(this)),
00073   sm_(*bondsm_),
00074   topic_(topic),
00075   id_(id),
00076   instance_id_(makeUUID()),
00077   on_broken_(on_broken),
00078   on_formed_(on_formed),
00079   sisterDiedFirst_(false),
00080   started_(false),
00081 
00082   connect_timer_(ros::WallDuration(), boost::bind(&Bond::onConnectTimeout, this)),
00083   heartbeat_timer_(ros::WallDuration(), boost::bind(&Bond::onHeartbeatTimeout, this)),
00084   disconnect_timer_(ros::WallDuration(), boost::bind(&Bond::onDisconnectTimeout, this))
00085 {
00086   setConnectTimeout(bond::Constants::DEFAULT_CONNECT_TIMEOUT);
00087   setDisconnectTimeout(bond::Constants::DEFAULT_DISCONNECT_TIMEOUT);
00088   setHeartbeatTimeout(bond::Constants::DEFAULT_HEARTBEAT_TIMEOUT);
00089   setHeartbeatPeriod(bond::Constants::DEFAULT_HEARTBEAT_PERIOD);
00090 }
00091 
00092 Bond::~Bond()
00093 {
00094   if (!started_) {
00095     return;
00096   }
00097 
00098   breakBond();
00099   if (!waitUntilBroken(ros::Duration(1.0))) {
00100     ROS_DEBUG("Bond failed to break on destruction %s (%s)", id_.c_str(), instance_id_.c_str());
00101   }
00102 
00103   // Must destroy the subscription before locking mutex_: shutdown()
00104   // will block until the status callback completes, and the status
00105   // callback locks mutex_ (in flushPendingCallbacks).
00106   sub_.shutdown();
00107 
00108   // Stops the timers before locking the mutex.  Makes sure none of
00109   // the callbacks are running when we aquire the mutex.
00110   publishingTimer_.stop();
00111   connect_timer_.cancel();
00112   heartbeat_timer_.cancel();
00113   disconnect_timer_.cancel();
00114 
00115   boost::mutex::scoped_lock lock(mutex_);
00116   pub_.shutdown();
00117 }
00118 
00119 
00120 void Bond::setConnectTimeout(double dur)
00121 {
00122   if (started_) {
00123     ROS_ERROR("Cannot set timeouts after calling start()");
00124     return;
00125   }
00126 
00127   connect_timeout_ = dur;
00128   connect_timer_.setDuration(ros::WallDuration(connect_timeout_));
00129 }
00130 
00131 void Bond::setDisconnectTimeout(double dur)
00132 {
00133   if (started_) {
00134     ROS_ERROR("Cannot set timeouts after calling start()");
00135     return;
00136   }
00137 
00138   disconnect_timeout_ = dur;
00139   disconnect_timer_.setDuration(ros::WallDuration(disconnect_timeout_));
00140 }
00141 
00142 void Bond::setHeartbeatTimeout(double dur)
00143 {
00144   if (started_) {
00145     ROS_ERROR("Cannot set timeouts after calling start()");
00146     return;
00147   }
00148 
00149   heartbeat_timeout_ = dur;
00150   heartbeat_timer_.setDuration(ros::WallDuration(heartbeat_timeout_));
00151 }
00152 
00153 void Bond::setHeartbeatPeriod(double dur)
00154 {
00155   if (started_) {
00156     ROS_ERROR("Cannot set timeouts after calling start()");
00157     return;
00158   }
00159 
00160   heartbeat_period_ = dur;
00161 }
00162 
00163 void Bond::setCallbackQueue(ros::CallbackQueueInterface *queue)
00164 {
00165   if (started_) {
00166     ROS_ERROR("Cannot set callback queue after calling start()");
00167     return;
00168   }
00169 
00170   nh_.setCallbackQueue(queue);
00171 }
00172 
00173 
00174 void Bond::start()
00175 {
00176   boost::mutex::scoped_lock lock(mutex_);
00177   connect_timer_.reset();
00178   pub_ = nh_.advertise<bond::Status>(topic_, 5);
00179   sub_ = nh_.subscribe<bond::Status>(topic_, 30, boost::bind(&Bond::bondStatusCB, this, _1));
00180 
00181   publishingTimer_ = nh_.createWallTimer(
00182     ros::WallDuration(heartbeat_period_), &Bond::doPublishing, this);
00183   started_ = true;
00184 }
00185 
00186 void Bond::setFormedCallback(boost::function<void(void)> on_formed)
00187 {
00188   boost::mutex::scoped_lock lock(mutex_);
00189   on_formed_ = on_formed;
00190 }
00191 
00192 void Bond::setBrokenCallback(boost::function<void(void)> on_broken)
00193 {
00194   boost::mutex::scoped_lock lock(mutex_);
00195   on_broken_ = on_broken;
00196 }
00197 
00198 bool Bond::waitUntilFormed(ros::Duration timeout)
00199 {
00200   return waitUntilFormed(ros::WallDuration(timeout.sec, timeout.nsec));
00201 }
00202 bool Bond::waitUntilFormed(ros::WallDuration timeout)
00203 {
00204   boost::mutex::scoped_lock lock(mutex_);
00205   ros::WallTime deadline(ros::WallTime::now() + timeout);
00206 
00207   while (sm_.getState().getId() == SM::WaitingForSister.getId()) {
00208     if (!ros::ok()) {
00209       break;
00210     }
00211 
00212     ros::WallDuration wait_time = ros::WallDuration(0.1);
00213     if (timeout >= ros::WallDuration(0.0)) {
00214       wait_time = std::min(wait_time, deadline - ros::WallTime::now());
00215     }
00216 
00217     if (wait_time <= ros::WallDuration(0.0)) {
00218       break;  // The deadline has expired
00219     }
00220 
00221     condition_.timed_wait(mutex_, boost::posix_time::milliseconds(wait_time.toSec() * 1000.0f));
00222   }
00223   return sm_.getState().getId() != SM::WaitingForSister.getId();
00224 }
00225 
00226 bool Bond::waitUntilBroken(ros::Duration timeout)
00227 {
00228   return waitUntilBroken(ros::WallDuration(timeout.sec, timeout.nsec));
00229 }
00230 bool Bond::waitUntilBroken(ros::WallDuration timeout)
00231 {
00232   boost::mutex::scoped_lock lock(mutex_);
00233   ros::WallTime deadline(ros::WallTime::now() + timeout);
00234 
00235   while (sm_.getState().getId() != SM::Dead.getId()) {
00236     if (!ros::ok()) {
00237       break;
00238     }
00239 
00240     ros::WallDuration wait_time = ros::WallDuration(0.1);
00241     if (timeout >= ros::WallDuration(0.0)) {
00242       wait_time = std::min(wait_time, deadline - ros::WallTime::now());
00243     }
00244 
00245     if (wait_time <= ros::WallDuration(0.0)) {
00246       break;  // The deadline has expired
00247     }
00248 
00249     condition_.timed_wait(mutex_, boost::posix_time::milliseconds(wait_time.toSec() * 1000.0f));
00250   }
00251   return sm_.getState().getId() == SM::Dead.getId();
00252 }
00253 
00254 bool Bond::isBroken()
00255 {
00256   boost::mutex::scoped_lock lock(mutex_);
00257   return sm_.getState().getId() == SM::Dead.getId();
00258 }
00259 
00260 void Bond::breakBond()
00261 {
00262   {
00263     boost::mutex::scoped_lock lock(mutex_);
00264     if (sm_.getState().getId() != SM::Dead.getId()) {
00265       sm_.Die();
00266       publishStatus(false);
00267     }
00268   }
00269   flushPendingCallbacks();
00270 }
00271 
00272 
00273 void Bond::onConnectTimeout()
00274 {
00275   {
00276     boost::mutex::scoped_lock lock(mutex_);
00277     sm_.ConnectTimeout();
00278   }
00279   flushPendingCallbacks();
00280 }
00281 void Bond::onHeartbeatTimeout()
00282 {
00283   // Checks that heartbeat timeouts haven't been disabled globally.
00284   bool disable_heartbeat_timeout;
00285   nh_.param(bond::Constants::DISABLE_HEARTBEAT_TIMEOUT_PARAM, disable_heartbeat_timeout, false);
00286   if (disable_heartbeat_timeout) {
00287     ROS_WARN("Heartbeat timeout is disabled.  Not breaking bond (topic: %s, id: %s)",
00288              topic_.c_str(), id_.c_str());
00289     return;
00290   }
00291 
00292   {
00293     boost::mutex::scoped_lock lock(mutex_);
00294     sm_.HeartbeatTimeout();
00295   }
00296   flushPendingCallbacks();
00297 }
00298 void Bond::onDisconnectTimeout()
00299 {
00300   {
00301     boost::mutex::scoped_lock lock(mutex_);
00302     sm_.DisconnectTimeout();
00303   }
00304   flushPendingCallbacks();
00305 }
00306 
00307 void Bond::bondStatusCB(const bond::Status::ConstPtr &msg)
00308 {
00309   // Filters out messages from other bonds and messages from ourself
00310   if (msg->id == id_ && msg->instance_id != instance_id_) {
00311     {
00312       boost::mutex::scoped_lock lock(mutex_);
00313 
00314       if (sister_instance_id_.empty()) {
00315         sister_instance_id_ = msg->instance_id;
00316       }
00317       if (sister_instance_id_ != msg->instance_id) {
00318         ROS_ERROR(
00319           "More than two locations are trying to use a single bond (topic: %s, id: %s).  "
00320           "You should only instantiate at most two bond instances for each (topic, id) pair.",
00321           topic_.c_str(), id_.c_str());
00322         return;
00323       }
00324 
00325       if (msg->active) {
00326         sm_.SisterAlive();
00327       } else {
00328         sm_.SisterDead();
00329 
00330         // Immediate ack for sister's death notification
00331         if (sisterDiedFirst_) {
00332           publishStatus(false);
00333         }
00334       }
00335     }
00336     flushPendingCallbacks();
00337   }
00338 }
00339 
00340 void Bond::doPublishing(const ros::WallTimerEvent &)
00341 {
00342   boost::mutex::scoped_lock lock(mutex_);
00343   if (sm_.getState().getId() == SM::WaitingForSister.getId() ||
00344       sm_.getState().getId() == SM::Alive.getId()) {
00345     publishStatus(true);
00346   } else if (sm_.getState().getId() == SM::AwaitSisterDeath.getId()) {
00347     publishStatus(false);
00348   } else {
00349     publishingTimer_.stop();
00350   }
00351 }
00352 
00353 void Bond::publishStatus(bool active)
00354 {
00355   bond::Status::Ptr msg(new bond::Status);
00356   msg->header.stamp = ros::Time::now();
00357   msg->id = id_;
00358   msg->instance_id = instance_id_;
00359   msg->active = active;
00360   msg->heartbeat_timeout = heartbeat_timeout_;
00361   msg->heartbeat_period = heartbeat_period_;
00362   pub_.publish(msg);
00363 }
00364 
00365 
00366 void Bond::flushPendingCallbacks()
00367 {
00368   std::vector<boost::function<void(void)> > callbacks;
00369   {
00370     boost::mutex::scoped_lock lock(mutex_);
00371     callbacks = pending_callbacks_;
00372     pending_callbacks_.clear();
00373   }
00374 
00375   for (size_t i = 0; i < callbacks.size(); ++i) {
00376     callbacks[i]();
00377   }
00378 }
00379 
00380 }  // namespace bond
00381 
00382 
00383 void BondSM::Connected()
00384 {
00385   b->connect_timer_.cancel();
00386   b->condition_.notify_all();
00387   if (b->on_formed_) {
00388     b->pending_callbacks_.push_back(b->on_formed_);
00389   }
00390 }
00391 
00392 void BondSM::SisterDied()
00393 {
00394   b->sisterDiedFirst_ = true;
00395 }
00396 
00397 void BondSM::Death()
00398 {
00399   b->condition_.notify_all();
00400   b->heartbeat_timer_.cancel();
00401   b->disconnect_timer_.cancel();
00402   if (b->on_broken_) {
00403     b->pending_callbacks_.push_back(b->on_broken_);
00404   }
00405 }
00406 
00407 void BondSM::Heartbeat()
00408 {
00409   b->heartbeat_timer_.reset();
00410 }
00411 
00412 void BondSM::StartDying()
00413 {
00414   b->heartbeat_timer_.cancel();
00415   b->disconnect_timer_.reset();
00416   b->publishingTimer_.setPeriod(ros::WallDuration(bond::Constants::DEAD_PUBLISH_PERIOD));
00417 }


bondcpp
Author(s): Stuart Glaser
autogenerated on Thu Jun 6 2019 20:40:39