00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 #include <bondcpp/bond.h>
00033 #include <boost/thread/thread_time.hpp>
00034 #include <boost/date_time/posix_time/posix_time_types.hpp>
00035 #include <uuid/uuid.h>
00036
00037 namespace bond {
00038
00039 static std::string makeUUID()
00040 {
00041 uuid_t uuid;
00042 uuid_generate_random(uuid);
00043 char uuid_str[40];
00044 uuid_unparse(uuid, uuid_str);
00045 return std::string(uuid_str);
00046 }
00047
00048 Bond::Bond(const std::string &topic, const std::string &id,
00049 boost::function<void(void)> on_broken,
00050 boost::function<void(void)> on_formed)
00051 :
00052
00053 bondsm_(new BondSM(this)),
00054 sm_(*bondsm_),
00055 topic_(topic),
00056 id_(id),
00057 instance_id_(makeUUID()),
00058 on_broken_(on_broken),
00059 on_formed_(on_formed),
00060 sisterDiedFirst_(false),
00061 started_(false),
00062
00063 connect_timer_(ros::WallDuration(), boost::bind(&Bond::onConnectTimeout, this)),
00064 heartbeat_timer_(ros::WallDuration(), boost::bind(&Bond::onHeartbeatTimeout, this)),
00065 disconnect_timer_(ros::WallDuration(), boost::bind(&Bond::onDisconnectTimeout, this))
00066 {
00067 setConnectTimeout(bond::Constants::DEFAULT_CONNECT_TIMEOUT);
00068 setDisconnectTimeout(bond::Constants::DEFAULT_DISCONNECT_TIMEOUT);
00069 setHeartbeatTimeout(bond::Constants::DEFAULT_HEARTBEAT_TIMEOUT);
00070 setHeartbeatPeriod(bond::Constants::DEFAULT_HEARTBEAT_PERIOD);
00071 }
00072
00073 Bond::~Bond()
00074 {
00075 if (!started_)
00076 return;
00077
00078 breakBond();
00079 if (!waitUntilBroken(ros::Duration(1.0)))
00080 {
00081 ROS_DEBUG("Bond failed to break on destruction %s (%s)", id_.c_str(), instance_id_.c_str());
00082 }
00083
00084
00085
00086
00087 sub_.shutdown();
00088
00089
00090
00091 publishingTimer_.stop();
00092 connect_timer_.cancel();
00093 heartbeat_timer_.cancel();
00094 disconnect_timer_.cancel();
00095
00096 boost::mutex::scoped_lock lock(mutex_);
00097 pub_.shutdown();
00098 }
00099
00100
00101 void Bond::setConnectTimeout(double dur)
00102 {
00103 if (started_) {
00104 ROS_ERROR("Cannot set timeouts after calling start()");
00105 return;
00106 }
00107
00108 connect_timeout_ = dur;
00109 connect_timer_.setDuration(ros::WallDuration(connect_timeout_));
00110 }
00111
00112 void Bond::setDisconnectTimeout(double dur)
00113 {
00114 if (started_) {
00115 ROS_ERROR("Cannot set timeouts after calling start()");
00116 return;
00117 }
00118
00119 disconnect_timeout_ = dur;
00120 disconnect_timer_.setDuration(ros::WallDuration(disconnect_timeout_));
00121 }
00122
00123 void Bond::setHeartbeatTimeout(double dur)
00124 {
00125 if (started_) {
00126 ROS_ERROR("Cannot set timeouts after calling start()");
00127 return;
00128 }
00129
00130 heartbeat_timeout_ = dur;
00131 heartbeat_timer_.setDuration(ros::WallDuration(heartbeat_timeout_));
00132 }
00133
00134 void Bond::setHeartbeatPeriod(double dur)
00135 {
00136 if (started_) {
00137 ROS_ERROR("Cannot set timeouts after calling start()");
00138 return;
00139 }
00140
00141 heartbeat_period_ = dur;
00142 }
00143
00144 void Bond::setCallbackQueue(ros::CallbackQueueInterface *queue)
00145 {
00146 if (started_) {
00147 ROS_ERROR("Cannot set callback queue after calling start()");
00148 return;
00149 }
00150
00151 nh_.setCallbackQueue(queue);
00152 }
00153
00154
00155 void Bond::start()
00156 {
00157 boost::mutex::scoped_lock lock(mutex_);
00158 connect_timer_.reset();
00159 pub_ = nh_.advertise<bond::Status>(topic_, 5);
00160 sub_ = nh_.subscribe<bond::Status>(topic_, 30, boost::bind(&Bond::bondStatusCB, this, _1));
00161
00162 publishingTimer_ = nh_.createWallTimer(ros::WallDuration(heartbeat_period_), &Bond::doPublishing, this);
00163 started_ = true;
00164 }
00165
00166 void Bond::setFormedCallback(boost::function<void(void)> on_formed)
00167 {
00168 boost::mutex::scoped_lock lock(mutex_);
00169 on_formed_ = on_formed;
00170 }
00171
00172 void Bond::setBrokenCallback(boost::function<void(void)> on_broken)
00173 {
00174 boost::mutex::scoped_lock lock(mutex_);
00175 on_broken_ = on_broken;
00176 }
00177
00178 bool Bond::waitUntilFormed(ros::Duration timeout)
00179 {
00180 return waitUntilFormed(ros::WallDuration(timeout.sec, timeout.nsec));
00181 }
00182 bool Bond::waitUntilFormed(ros::WallDuration timeout)
00183 {
00184 boost::mutex::scoped_lock lock(mutex_);
00185 ros::WallTime deadline(ros::WallTime::now() + timeout);
00186
00187 while (sm_.getState().getId() == SM::WaitingForSister.getId())
00188 {
00189 if (!ros::ok())
00190 break;
00191
00192 ros::WallDuration wait_time = ros::WallDuration(0.1);
00193 if (timeout >= ros::WallDuration(0.0))
00194 wait_time = std::min(wait_time, deadline - ros::WallTime::now());
00195
00196 if (wait_time <= ros::WallDuration(0.0))
00197 break;
00198
00199 condition_.timed_wait(mutex_, boost::posix_time::milliseconds(wait_time.toSec() * 1000.0f));
00200 }
00201 return sm_.getState().getId() != SM::WaitingForSister.getId();
00202 }
00203
00204 bool Bond::waitUntilBroken(ros::Duration timeout)
00205 {
00206 return waitUntilBroken(ros::WallDuration(timeout.sec, timeout.nsec));
00207 }
00208 bool Bond::waitUntilBroken(ros::WallDuration timeout)
00209 {
00210 boost::mutex::scoped_lock lock(mutex_);
00211 ros::WallTime deadline(ros::WallTime::now() + timeout);
00212
00213 while (sm_.getState().getId() != SM::Dead.getId())
00214 {
00215 if (!ros::ok())
00216 break;
00217
00218 ros::WallDuration wait_time = ros::WallDuration(0.1);
00219 if (timeout >= ros::WallDuration(0.0))
00220 wait_time = std::min(wait_time, deadline - ros::WallTime::now());
00221
00222 if (wait_time <= ros::WallDuration(0.0))
00223 break;
00224
00225 condition_.timed_wait(mutex_, boost::posix_time::milliseconds(wait_time.toSec() * 1000.0f));
00226 }
00227 return sm_.getState().getId() == SM::Dead.getId();
00228 }
00229
00230 bool Bond::isBroken()
00231 {
00232 boost::mutex::scoped_lock lock(mutex_);
00233 return sm_.getState().getId() == SM::Dead.getId();
00234 }
00235
00236 void Bond::breakBond()
00237 {
00238 {
00239 boost::mutex::scoped_lock lock(mutex_);
00240 if (sm_.getState().getId() != SM::Dead.getId())
00241 {
00242 sm_.Die();
00243 publishStatus(false);
00244 }
00245 }
00246 flushPendingCallbacks();
00247 }
00248
00249
00250 void Bond::onConnectTimeout()
00251 {
00252 {
00253 boost::mutex::scoped_lock lock(mutex_);
00254 sm_.ConnectTimeout();
00255 }
00256 flushPendingCallbacks();
00257 }
00258 void Bond::onHeartbeatTimeout()
00259 {
00260
00261 bool disable_heartbeat_timeout;
00262 nh_.param(bond::Constants::DISABLE_HEARTBEAT_TIMEOUT_PARAM, disable_heartbeat_timeout, false);
00263 if (disable_heartbeat_timeout) {
00264 ROS_WARN("Heartbeat timeout is disabled. Not breaking bond (topic: %s, id: %s)",
00265 topic_.c_str(), id_.c_str());
00266 return;
00267 }
00268
00269 {
00270 boost::mutex::scoped_lock lock(mutex_);
00271 sm_.HeartbeatTimeout();
00272 }
00273 flushPendingCallbacks();
00274 }
00275 void Bond::onDisconnectTimeout()
00276 {
00277 {
00278 boost::mutex::scoped_lock lock(mutex_);
00279 sm_.DisconnectTimeout();
00280 }
00281 flushPendingCallbacks();
00282 }
00283
00284 void Bond::bondStatusCB(const bond::Status::ConstPtr &msg)
00285 {
00286
00287 if (msg->id == id_ && msg->instance_id != instance_id_)
00288 {
00289 {
00290 boost::mutex::scoped_lock lock(mutex_);
00291
00292 if (sister_instance_id_.empty())
00293 sister_instance_id_ = msg->instance_id;
00294 if (sister_instance_id_ != msg->instance_id) {
00295 ROS_ERROR("More than two locations are trying to use a single bond (topic: %s, id: %s). "
00296 "You should only instantiate at most two bond instances for each (topic, id) pair.",
00297 topic_.c_str(), id_.c_str());
00298 return;
00299 }
00300
00301 if (msg->active)
00302 {
00303 sm_.SisterAlive();
00304 }
00305 else
00306 {
00307 sm_.SisterDead();
00308
00309
00310 if (sisterDiedFirst_)
00311 publishStatus(false);
00312 }
00313 }
00314 flushPendingCallbacks();
00315 }
00316 }
00317
00318 void Bond::doPublishing(const ros::WallTimerEvent &e)
00319 {
00320 boost::mutex::scoped_lock lock(mutex_);
00321 if (sm_.getState().getId() == SM::WaitingForSister.getId() ||
00322 sm_.getState().getId() == SM::Alive.getId())
00323 {
00324 publishStatus(true);
00325 }
00326 else if (sm_.getState().getId() == SM::AwaitSisterDeath.getId())
00327 {
00328 publishStatus(false);
00329 }
00330 else
00331 {
00332 publishingTimer_.stop();
00333 }
00334
00335 }
00336
00337 void Bond::publishStatus(bool active)
00338 {
00339 bond::Status::Ptr msg(new bond::Status);
00340 msg->header.stamp = ros::Time::now();
00341 msg->id = id_;
00342 msg->instance_id = instance_id_;
00343 msg->active = active;
00344 msg->heartbeat_timeout = heartbeat_timeout_;
00345 msg->heartbeat_period = heartbeat_period_;
00346 pub_.publish(msg);
00347 }
00348
00349
00350 void Bond::flushPendingCallbacks()
00351 {
00352 std::vector<boost::function<void(void)> > callbacks;
00353 {
00354 boost::mutex::scoped_lock lock(mutex_);
00355 callbacks = pending_callbacks_;
00356 pending_callbacks_.clear();
00357 }
00358
00359 for (size_t i = 0; i < callbacks.size(); ++i)
00360 callbacks[i]();
00361 }
00362
00363 }
00364
00365
00366 void BondSM::Connected()
00367 {
00368 b->connect_timer_.cancel();
00369 b->condition_.notify_all();
00370 if (b->on_formed_)
00371 b->pending_callbacks_.push_back(b->on_formed_);
00372 }
00373
00374 void BondSM::SisterDied()
00375 {
00376 b->sisterDiedFirst_ = true;
00377 }
00378
00379 void BondSM::Death()
00380 {
00381 b->condition_.notify_all();
00382 b->heartbeat_timer_.cancel();
00383 b->disconnect_timer_.cancel();
00384 if (b->on_broken_)
00385 b->pending_callbacks_.push_back(b->on_broken_);
00386 }
00387
00388 void BondSM::Heartbeat()
00389 {
00390 b->heartbeat_timer_.reset();
00391 }
00392
00393 void BondSM::StartDying()
00394 {
00395 b->heartbeat_timer_.cancel();
00396 b->disconnect_timer_.reset();
00397 b->publishingTimer_.setPeriod(ros::WallDuration(bond::Constants::DEAD_PUBLISH_PERIOD));
00398 }