bond.cpp
Go to the documentation of this file.
00001 /*
00002  * Copyright (c) 2009, Willow Garage, Inc.
00003  * All rights reserved.
00004  *
00005  * Redistribution and use in source and binary forms, with or without
00006  * modification, are permitted provided that the following conditions are met:
00007  *
00008  *     * Redistributions of source code must retain the above copyright
00009  *       notice, this list of conditions and the following disclaimer.
00010  *     * Redistributions in binary form must reproduce the above copyright
00011  *       notice, this list of conditions and the following disclaimer in the
00012  *       documentation and/or other materials provided with the distribution.
00013  *     * Neither the name of the Willow Garage, Inc. nor the names of its
00014  *       contributors may be used to endorse or promote products derived from
00015  *       this software without specific prior written permission.
00016  *
00017  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00018  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00019  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00020  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00021  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00022  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00023  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00024  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00025  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00026  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00027  * POSSIBILITY OF SUCH DAMAGE.
00028  */
00029 
00030 // Author: Stuart Glaser
00031 
00032 #include <bondcpp/bond.h>
00033 #include <boost/thread/thread_time.hpp>
00034 #include <boost/date_time/posix_time/posix_time_types.hpp>
00035 
00036 #ifdef _WIN32
00037 #include <Rpc.h>
00038 #else
00039 #include <uuid/uuid.h>
00040 #endif
00041 
00042 namespace bond {
00043 
00044 static std::string makeUUID()
00045 {
00046 #ifdef _WIN32
00047   UUID uuid;
00048   UuidCreate(&uuid);
00049   unsigned char *str;
00050   UuidToStringA(&uuid, &str);
00051   std::string return_string(reinterpret_cast<char *>(str));
00052   RpcStringFreeA(&str);
00053   return return_string;
00054 #else
00055   uuid_t uuid;
00056   uuid_generate_random(uuid);
00057   char uuid_str[40];
00058   uuid_unparse(uuid, uuid_str);
00059   return std::string(uuid_str);
00060 #endif
00061 }
00062 
00063 Bond::Bond(const std::string &topic, const std::string &id,
00064            boost::function<void(void)> on_broken,
00065            boost::function<void(void)> on_formed)
00066   :
00067 
00068   bondsm_(new BondSM(this)),
00069   sm_(*bondsm_),
00070   topic_(topic),
00071   id_(id),
00072   instance_id_(makeUUID()),
00073   on_broken_(on_broken),
00074   on_formed_(on_formed),
00075   sisterDiedFirst_(false),
00076   started_(false),
00077 
00078   connect_timer_(ros::WallDuration(), boost::bind(&Bond::onConnectTimeout, this)),
00079   heartbeat_timer_(ros::WallDuration(), boost::bind(&Bond::onHeartbeatTimeout, this)),
00080   disconnect_timer_(ros::WallDuration(), boost::bind(&Bond::onDisconnectTimeout, this))
00081 {
00082   setConnectTimeout(bond::Constants::DEFAULT_CONNECT_TIMEOUT);
00083   setDisconnectTimeout(bond::Constants::DEFAULT_DISCONNECT_TIMEOUT);
00084   setHeartbeatTimeout(bond::Constants::DEFAULT_HEARTBEAT_TIMEOUT);
00085   setHeartbeatPeriod(bond::Constants::DEFAULT_HEARTBEAT_PERIOD);
00086 }
00087 
00088 Bond::~Bond()
00089 {
00090   if (!started_)
00091     return;
00092   
00093   breakBond();
00094   if (!waitUntilBroken(ros::Duration(1.0)))
00095   {
00096     ROS_DEBUG("Bond failed to break on destruction %s (%s)", id_.c_str(), instance_id_.c_str());
00097   }
00098 
00099   // Must destroy the subscription before locking mutex_: shutdown()
00100   // will block until the status callback completes, and the status
00101   // callback locks mutex_ (in flushPendingCallbacks).
00102   sub_.shutdown();
00103 
00104   // Stops the timers before locking the mutex.  Makes sure none of
00105   // the callbacks are running when we aquire the mutex.
00106   publishingTimer_.stop();
00107   connect_timer_.cancel();
00108   heartbeat_timer_.cancel();
00109   disconnect_timer_.cancel();
00110   
00111   boost::mutex::scoped_lock lock(mutex_);
00112   pub_.shutdown();
00113 }
00114 
00115 
00116 void Bond::setConnectTimeout(double dur)
00117 {
00118   if (started_) {
00119     ROS_ERROR("Cannot set timeouts after calling start()");
00120     return;
00121   }
00122 
00123   connect_timeout_ = dur;
00124   connect_timer_.setDuration(ros::WallDuration(connect_timeout_));
00125 }
00126 
00127 void Bond::setDisconnectTimeout(double dur)
00128 {
00129   if (started_) {
00130     ROS_ERROR("Cannot set timeouts after calling start()");
00131     return;
00132   }
00133 
00134   disconnect_timeout_ = dur;
00135   disconnect_timer_.setDuration(ros::WallDuration(disconnect_timeout_));
00136 }
00137 
00138 void Bond::setHeartbeatTimeout(double dur)
00139 {
00140   if (started_) {
00141     ROS_ERROR("Cannot set timeouts after calling start()");
00142     return;
00143   }
00144 
00145   heartbeat_timeout_ = dur;
00146   heartbeat_timer_.setDuration(ros::WallDuration(heartbeat_timeout_));
00147 }
00148 
00149 void Bond::setHeartbeatPeriod(double dur)
00150 {
00151   if (started_) {
00152     ROS_ERROR("Cannot set timeouts after calling start()");
00153     return;
00154   }
00155 
00156   heartbeat_period_ = dur;
00157 }
00158 
00159 void Bond::setCallbackQueue(ros::CallbackQueueInterface *queue)
00160 {
00161   if (started_) {
00162     ROS_ERROR("Cannot set callback queue after calling start()");
00163     return;
00164   }
00165 
00166   nh_.setCallbackQueue(queue);
00167 }
00168 
00169 
00170 void Bond::start()
00171 {
00172   boost::mutex::scoped_lock lock(mutex_);
00173   connect_timer_.reset();
00174   pub_ = nh_.advertise<bond::Status>(topic_, 5);
00175   sub_ = nh_.subscribe<bond::Status>(topic_, 30, boost::bind(&Bond::bondStatusCB, this, _1));
00176 
00177   publishingTimer_ = nh_.createWallTimer(ros::WallDuration(heartbeat_period_), &Bond::doPublishing, this);
00178   started_ = true;
00179 }
00180 
00181 void Bond::setFormedCallback(boost::function<void(void)> on_formed)
00182 {
00183   boost::mutex::scoped_lock lock(mutex_);
00184   on_formed_ = on_formed;
00185 }
00186 
00187 void Bond::setBrokenCallback(boost::function<void(void)> on_broken)
00188 {
00189   boost::mutex::scoped_lock lock(mutex_);
00190   on_broken_ = on_broken;
00191 }
00192 
00193 bool Bond::waitUntilFormed(ros::Duration timeout)
00194 {
00195   return waitUntilFormed(ros::WallDuration(timeout.sec, timeout.nsec));
00196 }
00197 bool Bond::waitUntilFormed(ros::WallDuration timeout)
00198 {
00199   boost::mutex::scoped_lock lock(mutex_);
00200   ros::WallTime deadline(ros::WallTime::now() + timeout);
00201 
00202   while (sm_.getState().getId() == SM::WaitingForSister.getId())
00203   {
00204     if (!ros::ok())
00205       break;
00206 
00207     ros::WallDuration wait_time = ros::WallDuration(0.1);
00208     if (timeout >= ros::WallDuration(0.0))
00209       wait_time = std::min(wait_time, deadline - ros::WallTime::now());
00210 
00211     if (wait_time <= ros::WallDuration(0.0))
00212       break;  // The deadline has expired
00213 
00214     condition_.timed_wait(mutex_, boost::posix_time::milliseconds(wait_time.toSec() * 1000.0f));
00215   }
00216   return sm_.getState().getId() != SM::WaitingForSister.getId();
00217 }
00218 
00219 bool Bond::waitUntilBroken(ros::Duration timeout)
00220 {
00221   return waitUntilBroken(ros::WallDuration(timeout.sec, timeout.nsec));
00222 }
00223 bool Bond::waitUntilBroken(ros::WallDuration timeout)
00224 {
00225   boost::mutex::scoped_lock lock(mutex_);
00226   ros::WallTime deadline(ros::WallTime::now() + timeout);
00227   
00228   while (sm_.getState().getId() != SM::Dead.getId())
00229   {
00230     if (!ros::ok())
00231       break;
00232 
00233     ros::WallDuration wait_time = ros::WallDuration(0.1);
00234     if (timeout >= ros::WallDuration(0.0))
00235       wait_time = std::min(wait_time, deadline - ros::WallTime::now());
00236 
00237     if (wait_time <= ros::WallDuration(0.0))
00238       break; // The deadline has expired
00239 
00240     condition_.timed_wait(mutex_, boost::posix_time::milliseconds(wait_time.toSec() * 1000.0f));
00241   }
00242   return sm_.getState().getId() == SM::Dead.getId();
00243 }
00244 
00245 bool Bond::isBroken()
00246 {
00247   boost::mutex::scoped_lock lock(mutex_);
00248   return sm_.getState().getId() == SM::Dead.getId();
00249 }
00250 
00251 void Bond::breakBond()
00252 {
00253   {
00254     boost::mutex::scoped_lock lock(mutex_);
00255     if (sm_.getState().getId() != SM::Dead.getId())
00256     {
00257       sm_.Die();
00258       publishStatus(false);
00259     }
00260   }
00261   flushPendingCallbacks();
00262 }
00263 
00264 
00265 void Bond::onConnectTimeout()
00266 {
00267   {
00268     boost::mutex::scoped_lock lock(mutex_);
00269     sm_.ConnectTimeout();
00270   }
00271   flushPendingCallbacks();
00272 }
00273 void Bond::onHeartbeatTimeout()
00274 {
00275   // Checks that heartbeat timeouts haven't been disabled globally.
00276   bool disable_heartbeat_timeout;
00277   nh_.param(bond::Constants::DISABLE_HEARTBEAT_TIMEOUT_PARAM, disable_heartbeat_timeout, false);
00278   if (disable_heartbeat_timeout) {
00279     ROS_WARN("Heartbeat timeout is disabled.  Not breaking bond (topic: %s, id: %s)",
00280              topic_.c_str(), id_.c_str());
00281     return;
00282   }
00283   
00284   {
00285     boost::mutex::scoped_lock lock(mutex_);
00286     sm_.HeartbeatTimeout();
00287   }
00288   flushPendingCallbacks();
00289 }
00290 void Bond::onDisconnectTimeout()
00291 {
00292   {
00293     boost::mutex::scoped_lock lock(mutex_);
00294     sm_.DisconnectTimeout();
00295   }
00296   flushPendingCallbacks();
00297 }
00298 
00299 void Bond::bondStatusCB(const bond::Status::ConstPtr &msg)
00300 {
00301   // Filters out messages from other bonds and messages from ourself
00302   if (msg->id == id_ && msg->instance_id != instance_id_)
00303   {
00304     {
00305       boost::mutex::scoped_lock lock(mutex_);
00306 
00307       if (sister_instance_id_.empty())
00308         sister_instance_id_ = msg->instance_id;
00309       if (sister_instance_id_ != msg->instance_id) {
00310         ROS_ERROR("More than two locations are trying to use a single bond (topic: %s, id: %s).  "
00311                   "You should only instantiate at most two bond instances for each (topic, id) pair.",
00312                   topic_.c_str(), id_.c_str());
00313         return;
00314       }
00315 
00316       if (msg->active)
00317       {
00318         sm_.SisterAlive();
00319       }
00320       else
00321       {
00322         sm_.SisterDead();
00323 
00324         // Immediate ack for sister's death notification
00325         if (sisterDiedFirst_)
00326           publishStatus(false);
00327       }
00328     }
00329     flushPendingCallbacks();
00330   }
00331 }
00332 
00333 void Bond::doPublishing(const ros::WallTimerEvent &e)
00334 {
00335   boost::mutex::scoped_lock lock(mutex_);
00336   if (sm_.getState().getId() == SM::WaitingForSister.getId() ||
00337       sm_.getState().getId() == SM::Alive.getId())
00338   {
00339     publishStatus(true);
00340   }
00341   else if (sm_.getState().getId() == SM::AwaitSisterDeath.getId())
00342   {
00343     publishStatus(false);
00344   }
00345   else
00346   {
00347     publishingTimer_.stop();
00348   }
00349 
00350 }
00351 
00352 void Bond::publishStatus(bool active)
00353 {
00354   bond::Status::Ptr msg(new bond::Status);
00355   msg->header.stamp = ros::Time::now();
00356   msg->id = id_;
00357   msg->instance_id = instance_id_;
00358   msg->active = active;
00359   msg->heartbeat_timeout = heartbeat_timeout_;
00360   msg->heartbeat_period = heartbeat_period_;
00361   pub_.publish(msg);
00362 }
00363 
00364 
00365 void Bond::flushPendingCallbacks()
00366 {
00367   std::vector<boost::function<void(void)> > callbacks;
00368   {
00369     boost::mutex::scoped_lock lock(mutex_);
00370     callbacks = pending_callbacks_;
00371     pending_callbacks_.clear();
00372   }
00373 
00374   for (size_t i = 0; i < callbacks.size(); ++i)
00375     callbacks[i]();
00376 }
00377 
00378 } // namespace
00379 
00380 
00381 void BondSM::Connected()
00382 {
00383   b->connect_timer_.cancel();
00384   b->condition_.notify_all();
00385   if (b->on_formed_)
00386     b->pending_callbacks_.push_back(b->on_formed_);
00387 }
00388 
00389 void BondSM::SisterDied()
00390 {
00391   b->sisterDiedFirst_ = true;
00392 }
00393 
00394 void BondSM::Death()
00395 {
00396   b->condition_.notify_all();
00397   b->heartbeat_timer_.cancel();
00398   b->disconnect_timer_.cancel();
00399   if (b->on_broken_)
00400     b->pending_callbacks_.push_back(b->on_broken_);
00401 }
00402 
00403 void BondSM::Heartbeat()
00404 {
00405   b->heartbeat_timer_.reset();
00406 }
00407 
00408 void BondSM::StartDying()
00409 {
00410   b->heartbeat_timer_.cancel();
00411   b->disconnect_timer_.reset();
00412   b->publishingTimer_.setPeriod(ros::WallDuration(bond::Constants::DEAD_PUBLISH_PERIOD));
00413 }


bondcpp
Author(s): Stuart Glaser
autogenerated on Sun Oct 5 2014 22:32:29