bond.cpp
Go to the documentation of this file.
00001 /*
00002  * Copyright (c) 2009, Willow Garage, Inc.
00003  * All rights reserved.
00004  *
00005  * Redistribution and use in source and binary forms, with or without
00006  * modification, are permitted provided that the following conditions are met:
00007  *
00008  *     * Redistributions of source code must retain the above copyright
00009  *       notice, this list of conditions and the following disclaimer.
00010  *     * Redistributions in binary form must reproduce the above copyright
00011  *       notice, this list of conditions and the following disclaimer in the
00012  *       documentation and/or other materials provided with the distribution.
00013  *     * Neither the name of the Willow Garage, Inc. nor the names of its
00014  *       contributors may be used to endorse or promote products derived from
00015  *       this software without specific prior written permission.
00016  *
00017  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00018  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00019  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00020  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00021  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00022  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00023  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00024  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00025  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00026  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00027  * POSSIBILITY OF SUCH DAMAGE.
00028  */
00029 
00030 // Author: Stuart Glaser
00031 
00032 #include <bondcpp/bond.h>
00033 #include <boost/thread/thread_time.hpp>
00034 #include <boost/date_time/posix_time/posix_time_types.hpp>
00035 #include <uuid/uuid.h>
00036 
00037 namespace bond {
00038 
00039 static std::string makeUUID()
00040 {
00041   uuid_t uuid;
00042   uuid_generate_random(uuid);
00043   char uuid_str[40];
00044   uuid_unparse(uuid, uuid_str);
00045   return std::string(uuid_str);
00046 }
00047 
00048 Bond::Bond(const std::string &topic, const std::string &id,
00049            boost::function<void(void)> on_broken,
00050            boost::function<void(void)> on_formed)
00051   :
00052 
00053   bondsm_(new BondSM(this)),
00054   sm_(*bondsm_),
00055   topic_(topic),
00056   id_(id),
00057   instance_id_(makeUUID()),
00058   on_broken_(on_broken),
00059   on_formed_(on_formed),
00060   sisterDiedFirst_(false),
00061   started_(false),
00062 
00063   connect_timer_(ros::WallDuration(), boost::bind(&Bond::onConnectTimeout, this)),
00064   heartbeat_timer_(ros::WallDuration(), boost::bind(&Bond::onHeartbeatTimeout, this)),
00065   disconnect_timer_(ros::WallDuration(), boost::bind(&Bond::onDisconnectTimeout, this))
00066 {
00067   setConnectTimeout(bond::Constants::DEFAULT_CONNECT_TIMEOUT);
00068   setDisconnectTimeout(bond::Constants::DEFAULT_DISCONNECT_TIMEOUT);
00069   setHeartbeatTimeout(bond::Constants::DEFAULT_HEARTBEAT_TIMEOUT);
00070   setHeartbeatPeriod(bond::Constants::DEFAULT_HEARTBEAT_PERIOD);
00071 }
00072 
00073 Bond::~Bond()
00074 {
00075   if (!started_)
00076     return;
00077   
00078   breakBond();
00079   if (!waitUntilBroken(ros::Duration(1.0)))
00080   {
00081     ROS_DEBUG("Bond failed to break on destruction %s (%s)", id_.c_str(), instance_id_.c_str());
00082   }
00083 
00084   // Must destroy the subscription before locking mutex_: shutdown()
00085   // will block until the status callback completes, and the status
00086   // callback locks mutex_ (in flushPendingCallbacks).
00087   sub_.shutdown();
00088 
00089   // Stops the timers before locking the mutex.  Makes sure none of
00090   // the callbacks are running when we aquire the mutex.
00091   publishingTimer_.stop();
00092   connect_timer_.cancel();
00093   heartbeat_timer_.cancel();
00094   disconnect_timer_.cancel();
00095   
00096   boost::mutex::scoped_lock lock(mutex_);
00097   pub_.shutdown();
00098 }
00099 
00100 
00101 void Bond::setConnectTimeout(double dur)
00102 {
00103   if (started_) {
00104     ROS_ERROR("Cannot set timeouts after calling start()");
00105     return;
00106   }
00107 
00108   connect_timeout_ = dur;
00109   connect_timer_.setDuration(ros::WallDuration(connect_timeout_));
00110 }
00111 
00112 void Bond::setDisconnectTimeout(double dur)
00113 {
00114   if (started_) {
00115     ROS_ERROR("Cannot set timeouts after calling start()");
00116     return;
00117   }
00118 
00119   disconnect_timeout_ = dur;
00120   disconnect_timer_.setDuration(ros::WallDuration(disconnect_timeout_));
00121 }
00122 
00123 void Bond::setHeartbeatTimeout(double dur)
00124 {
00125   if (started_) {
00126     ROS_ERROR("Cannot set timeouts after calling start()");
00127     return;
00128   }
00129 
00130   heartbeat_timeout_ = dur;
00131   heartbeat_timer_.setDuration(ros::WallDuration(heartbeat_timeout_));
00132 }
00133 
00134 void Bond::setHeartbeatPeriod(double dur)
00135 {
00136   if (started_) {
00137     ROS_ERROR("Cannot set timeouts after calling start()");
00138     return;
00139   }
00140 
00141   heartbeat_period_ = dur;
00142 }
00143 
00144 void Bond::setCallbackQueue(ros::CallbackQueueInterface *queue)
00145 {
00146   if (started_) {
00147     ROS_ERROR("Cannot set callback queue after calling start()");
00148     return;
00149   }
00150 
00151   nh_.setCallbackQueue(queue);
00152 }
00153 
00154 
00155 void Bond::start()
00156 {
00157   boost::mutex::scoped_lock lock(mutex_);
00158   connect_timer_.reset();
00159   pub_ = nh_.advertise<bond::Status>(topic_, 5);
00160   sub_ = nh_.subscribe<bond::Status>(topic_, 30, boost::bind(&Bond::bondStatusCB, this, _1));
00161 
00162   publishingTimer_ = nh_.createWallTimer(ros::WallDuration(heartbeat_period_), &Bond::doPublishing, this);
00163   started_ = true;
00164 }
00165 
00166 void Bond::setFormedCallback(boost::function<void(void)> on_formed)
00167 {
00168   boost::mutex::scoped_lock lock(mutex_);
00169   on_formed_ = on_formed;
00170 }
00171 
00172 void Bond::setBrokenCallback(boost::function<void(void)> on_broken)
00173 {
00174   boost::mutex::scoped_lock lock(mutex_);
00175   on_broken_ = on_broken;
00176 }
00177 
00178 bool Bond::waitUntilFormed(ros::Duration timeout)
00179 {
00180   return waitUntilFormed(ros::WallDuration(timeout.sec, timeout.nsec));
00181 }
00182 bool Bond::waitUntilFormed(ros::WallDuration timeout)
00183 {
00184   boost::mutex::scoped_lock lock(mutex_);
00185   ros::WallTime deadline(ros::WallTime::now() + timeout);
00186 
00187   while (sm_.getState().getId() == SM::WaitingForSister.getId())
00188   {
00189     if (!ros::ok())
00190       break;
00191 
00192     ros::WallDuration wait_time = ros::WallDuration(0.1);
00193     if (timeout >= ros::WallDuration(0.0))
00194       wait_time = std::min(wait_time, deadline - ros::WallTime::now());
00195 
00196     if (wait_time <= ros::WallDuration(0.0))
00197       break;  // The deadline has expired
00198 
00199     condition_.timed_wait(mutex_, boost::posix_time::milliseconds(wait_time.toSec() * 1000.0f));
00200   }
00201   return sm_.getState().getId() != SM::WaitingForSister.getId();
00202 }
00203 
00204 bool Bond::waitUntilBroken(ros::Duration timeout)
00205 {
00206   return waitUntilBroken(ros::WallDuration(timeout.sec, timeout.nsec));
00207 }
00208 bool Bond::waitUntilBroken(ros::WallDuration timeout)
00209 {
00210   boost::mutex::scoped_lock lock(mutex_);
00211   ros::WallTime deadline(ros::WallTime::now() + timeout);
00212   
00213   while (sm_.getState().getId() != SM::Dead.getId())
00214   {
00215     if (!ros::ok())
00216       break;
00217 
00218     ros::WallDuration wait_time = ros::WallDuration(0.1);
00219     if (timeout >= ros::WallDuration(0.0))
00220       wait_time = std::min(wait_time, deadline - ros::WallTime::now());
00221 
00222     if (wait_time <= ros::WallDuration(0.0))
00223       break; // The deadline has expired
00224 
00225     condition_.timed_wait(mutex_, boost::posix_time::milliseconds(wait_time.toSec() * 1000.0f));
00226   }
00227   return sm_.getState().getId() == SM::Dead.getId();
00228 }
00229 
00230 bool Bond::isBroken()
00231 {
00232   boost::mutex::scoped_lock lock(mutex_);
00233   return sm_.getState().getId() == SM::Dead.getId();
00234 }
00235 
00236 void Bond::breakBond()
00237 {
00238   {
00239     boost::mutex::scoped_lock lock(mutex_);
00240     if (sm_.getState().getId() != SM::Dead.getId())
00241     {
00242       sm_.Die();
00243       publishStatus(false);
00244     }
00245   }
00246   flushPendingCallbacks();
00247 }
00248 
00249 
00250 void Bond::onConnectTimeout()
00251 {
00252   {
00253     boost::mutex::scoped_lock lock(mutex_);
00254     sm_.ConnectTimeout();
00255   }
00256   flushPendingCallbacks();
00257 }
00258 void Bond::onHeartbeatTimeout()
00259 {
00260   // Checks that heartbeat timeouts haven't been disabled globally.
00261   bool disable_heartbeat_timeout;
00262   nh_.param(bond::Constants::DISABLE_HEARTBEAT_TIMEOUT_PARAM, disable_heartbeat_timeout, false);
00263   if (disable_heartbeat_timeout) {
00264     ROS_WARN("Heartbeat timeout is disabled.  Not breaking bond (topic: %s, id: %s)",
00265              topic_.c_str(), id_.c_str());
00266     return;
00267   }
00268   
00269   {
00270     boost::mutex::scoped_lock lock(mutex_);
00271     sm_.HeartbeatTimeout();
00272   }
00273   flushPendingCallbacks();
00274 }
00275 void Bond::onDisconnectTimeout()
00276 {
00277   {
00278     boost::mutex::scoped_lock lock(mutex_);
00279     sm_.DisconnectTimeout();
00280   }
00281   flushPendingCallbacks();
00282 }
00283 
00284 void Bond::bondStatusCB(const bond::Status::ConstPtr &msg)
00285 {
00286   // Filters out messages from other bonds and messages from ourself
00287   if (msg->id == id_ && msg->instance_id != instance_id_)
00288   {
00289     {
00290       boost::mutex::scoped_lock lock(mutex_);
00291 
00292       if (sister_instance_id_.empty())
00293         sister_instance_id_ = msg->instance_id;
00294       if (sister_instance_id_ != msg->instance_id) {
00295         ROS_ERROR("More than two locations are trying to use a single bond (topic: %s, id: %s).  "
00296                   "You should only instantiate at most two bond instances for each (topic, id) pair.",
00297                   topic_.c_str(), id_.c_str());
00298         return;
00299       }
00300 
00301       if (msg->active)
00302       {
00303         sm_.SisterAlive();
00304       }
00305       else
00306       {
00307         sm_.SisterDead();
00308 
00309         // Immediate ack for sister's death notification
00310         if (sisterDiedFirst_)
00311           publishStatus(false);
00312       }
00313     }
00314     flushPendingCallbacks();
00315   }
00316 }
00317 
00318 void Bond::doPublishing(const ros::WallTimerEvent &e)
00319 {
00320   boost::mutex::scoped_lock lock(mutex_);
00321   if (sm_.getState().getId() == SM::WaitingForSister.getId() ||
00322       sm_.getState().getId() == SM::Alive.getId())
00323   {
00324     publishStatus(true);
00325   }
00326   else if (sm_.getState().getId() == SM::AwaitSisterDeath.getId())
00327   {
00328     publishStatus(false);
00329   }
00330   else
00331   {
00332     publishingTimer_.stop();
00333   }
00334 
00335 }
00336 
00337 void Bond::publishStatus(bool active)
00338 {
00339   bond::Status::Ptr msg(new bond::Status);
00340   msg->header.stamp = ros::Time::now();
00341   msg->id = id_;
00342   msg->instance_id = instance_id_;
00343   msg->active = active;
00344   msg->heartbeat_timeout = heartbeat_timeout_;
00345   msg->heartbeat_period = heartbeat_period_;
00346   pub_.publish(msg);
00347 }
00348 
00349 
00350 void Bond::flushPendingCallbacks()
00351 {
00352   std::vector<boost::function<void(void)> > callbacks;
00353   {
00354     boost::mutex::scoped_lock lock(mutex_);
00355     callbacks = pending_callbacks_;
00356     pending_callbacks_.clear();
00357   }
00358 
00359   for (size_t i = 0; i < callbacks.size(); ++i)
00360     callbacks[i]();
00361 }
00362 
00363 } // namespace
00364 
00365 
00366 void BondSM::Connected()
00367 {
00368   b->connect_timer_.cancel();
00369   b->condition_.notify_all();
00370   if (b->on_formed_)
00371     b->pending_callbacks_.push_back(b->on_formed_);
00372 }
00373 
00374 void BondSM::SisterDied()
00375 {
00376   b->sisterDiedFirst_ = true;
00377 }
00378 
00379 void BondSM::Death()
00380 {
00381   b->condition_.notify_all();
00382   b->heartbeat_timer_.cancel();
00383   b->disconnect_timer_.cancel();
00384   if (b->on_broken_)
00385     b->pending_callbacks_.push_back(b->on_broken_);
00386 }
00387 
00388 void BondSM::Heartbeat()
00389 {
00390   b->heartbeat_timer_.reset();
00391 }
00392 
00393 void BondSM::StartDying()
00394 {
00395   b->heartbeat_timer_.cancel();
00396   b->disconnect_timer_.reset();
00397   b->publishingTimer_.setPeriod(ros::WallDuration(bond::Constants::DEAD_PUBLISH_PERIOD));
00398 }
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Friends Defines


bondcpp
Author(s): Stuart Glaser
autogenerated on Thu Aug 15 2013 10:10:38