00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 #include <bondcpp/bond.h>
00033 #include <boost/thread/thread_time.hpp>
00034 #include <boost/date_time/posix_time/posix_time_types.hpp>
00035 #include <uuid/uuid.h>
00036
00037 namespace bond {
00038
00039 static std::string makeUUID()
00040 {
00041 uuid_t uuid;
00042 uuid_generate_random(uuid);
00043 char uuid_str[40];
00044 uuid_unparse(uuid, uuid_str);
00045 return std::string(uuid_str);
00046 }
00047
00048 Bond::Bond(const std::string &topic, const std::string &id,
00049 boost::function<void(void)> on_broken,
00050 boost::function<void(void)> on_formed)
00051 :
00052
00053 bondsm_(new BondSM(this)),
00054 sm_(*bondsm_),
00055 topic_(topic),
00056 id_(id),
00057 instance_id_(makeUUID()),
00058 on_broken_(on_broken),
00059 on_formed_(on_formed),
00060 sisterDiedFirst_(false),
00061 started_(false),
00062
00063 connect_timer_(ros::WallDuration(), boost::bind(&Bond::onConnectTimeout, this)),
00064 heartbeat_timer_(ros::WallDuration(), boost::bind(&Bond::onHeartbeatTimeout, this)),
00065 disconnect_timer_(ros::WallDuration(), boost::bind(&Bond::onDisconnectTimeout, this))
00066 {
00067 setConnectTimeout(bond::Constants::DEFAULT_CONNECT_TIMEOUT);
00068 setDisconnectTimeout(bond::Constants::DEFAULT_DISCONNECT_TIMEOUT);
00069 setHeartbeatTimeout(bond::Constants::DEFAULT_HEARTBEAT_TIMEOUT);
00070 setHeartbeatPeriod(bond::Constants::DEFAULT_HEARTBEAT_PERIOD);
00071 pub_ = nh_.advertise<bond::Status>(topic_, 5);
00072 }
00073
00074 Bond::~Bond()
00075 {
00076 breakBond();
00077 if (!waitUntilBroken(ros::Duration(1.0)))
00078 {
00079 ROS_DEBUG("Bond failed to break on destruction %s (%s)", id_.c_str(), instance_id_.c_str());
00080 }
00081
00082
00083
00084
00085 sub_.shutdown();
00086
00087
00088
00089 publishingTimer_.stop();
00090 connect_timer_.cancel();
00091 heartbeat_timer_.cancel();
00092 disconnect_timer_.cancel();
00093
00094 boost::mutex::scoped_lock lock(mutex_);
00095 pub_.shutdown();
00096 }
00097
00098
00099 void Bond::setConnectTimeout(double dur)
00100 {
00101 if (started_) {
00102 ROS_ERROR("Cannot set timeouts after calling start()");
00103 return;
00104 }
00105
00106 connect_timeout_ = dur;
00107 connect_timer_.setDuration(ros::WallDuration(connect_timeout_));
00108 }
00109
00110 void Bond::setDisconnectTimeout(double dur)
00111 {
00112 if (started_) {
00113 ROS_ERROR("Cannot set timeouts after calling start()");
00114 return;
00115 }
00116
00117 disconnect_timeout_ = dur;
00118 disconnect_timer_.setDuration(ros::WallDuration(disconnect_timeout_));
00119 }
00120
00121 void Bond::setHeartbeatTimeout(double dur)
00122 {
00123 if (started_) {
00124 ROS_ERROR("Cannot set timeouts after calling start()");
00125 return;
00126 }
00127
00128 heartbeat_timeout_ = dur;
00129 heartbeat_timer_.setDuration(ros::WallDuration(heartbeat_timeout_));
00130 }
00131
00132 void Bond::setHeartbeatPeriod(double dur)
00133 {
00134 if (started_) {
00135 ROS_ERROR("Cannot set timeouts after calling start()");
00136 return;
00137 }
00138
00139 heartbeat_period_ = dur;
00140 }
00141
00142
00143 void Bond::start()
00144 {
00145 boost::mutex::scoped_lock lock(mutex_);
00146 connect_timer_.reset();
00147 sub_ = nh_.subscribe<bond::Status>(topic_, 30, boost::bind(&Bond::bondStatusCB, this, _1));
00148
00149 publishingTimer_ = nh_.createWallTimer(ros::WallDuration(heartbeat_period_), &Bond::doPublishing, this);
00150 started_ = true;
00151 }
00152
00153 void Bond::setFormedCallback(boost::function<void(void)> on_formed)
00154 {
00155 boost::mutex::scoped_lock lock(mutex_);
00156 on_formed_ = on_formed;
00157 }
00158
00159 void Bond::setBrokenCallback(boost::function<void(void)> on_broken)
00160 {
00161 boost::mutex::scoped_lock lock(mutex_);
00162 on_broken_ = on_broken;
00163 }
00164
00165 bool Bond::waitUntilFormed(ros::Duration timeout)
00166 {
00167 boost::mutex::scoped_lock lock(mutex_);
00168 ros::Time deadline(ros::Time::now() + timeout);
00169
00170 while (sm_.getState().getId() == SM::WaitingForSister.getId())
00171 {
00172 if (!ros::ok())
00173 break;
00174
00175 ros::Duration wait_time = ros::Duration(0.1);
00176 if (timeout >= ros::Duration(0.0))
00177 wait_time = std::min(wait_time, deadline - ros::Time::now());
00178
00179 if (wait_time <= ros::Duration(0.0))
00180 break;
00181
00182 condition_.timed_wait(mutex_, boost::posix_time::milliseconds(wait_time.toSec() * 1000.0f));
00183 }
00184 return sm_.getState().getId() != SM::WaitingForSister.getId();
00185 }
00186
00187 bool Bond::waitUntilBroken(ros::Duration timeout)
00188 {
00189 boost::mutex::scoped_lock lock(mutex_);
00190 ros::Time deadline(ros::Time::now() + timeout);
00191
00192 while (sm_.getState().getId() != SM::Dead.getId())
00193 {
00194 if (!ros::ok())
00195 break;
00196
00197 ros::Duration wait_time = ros::Duration(0.1);
00198 if (timeout >= ros::Duration(0.0))
00199 wait_time = std::min(wait_time, deadline - ros::Time::now());
00200
00201 if (wait_time <= ros::Duration(0.0))
00202 break;
00203
00204 condition_.timed_wait(mutex_, boost::posix_time::milliseconds(wait_time.toSec() * 1000.0f));
00205 }
00206 return sm_.getState().getId() == SM::Dead.getId();
00207 }
00208
00209 bool Bond::isBroken()
00210 {
00211 boost::mutex::scoped_lock lock(mutex_);
00212 return sm_.getState().getId() == SM::Dead.getId();
00213 }
00214
00215 void Bond::breakBond()
00216 {
00217 {
00218 boost::mutex::scoped_lock lock(mutex_);
00219 if (sm_.getState().getId() != SM::Dead.getId())
00220 {
00221 sm_.Die();
00222 publishStatus(false);
00223 }
00224 }
00225 flushPendingCallbacks();
00226 }
00227
00228
00229 void Bond::onConnectTimeout()
00230 {
00231 {
00232 boost::mutex::scoped_lock lock(mutex_);
00233 sm_.ConnectTimeout();
00234 }
00235 flushPendingCallbacks();
00236 }
00237 void Bond::onHeartbeatTimeout()
00238 {
00239
00240 bool disable_heartbeat_timeout;
00241 nh_.param(bond::Constants::DISABLE_HEARTBEAT_TIMEOUT_PARAM, disable_heartbeat_timeout, false);
00242 if (disable_heartbeat_timeout) {
00243 ROS_WARN("Heartbeat timeout is disabled. Not breaking bond (topic: %s, id: %s)",
00244 topic_.c_str(), id_.c_str());
00245 return;
00246 }
00247
00248 {
00249 boost::mutex::scoped_lock lock(mutex_);
00250 sm_.HeartbeatTimeout();
00251 }
00252 flushPendingCallbacks();
00253 }
00254 void Bond::onDisconnectTimeout()
00255 {
00256 {
00257 boost::mutex::scoped_lock lock(mutex_);
00258 sm_.DisconnectTimeout();
00259 }
00260 flushPendingCallbacks();
00261 }
00262
00263 void Bond::bondStatusCB(const bond::Status::ConstPtr &msg)
00264 {
00265
00266 if (msg->id == id_ && msg->instance_id != instance_id_)
00267 {
00268 {
00269 boost::mutex::scoped_lock lock(mutex_);
00270
00271 if (sister_instance_id_.empty())
00272 sister_instance_id_ = msg->instance_id;
00273 if (sister_instance_id_ != msg->instance_id) {
00274 ROS_ERROR("More than two locations are trying to use a single bond (topic: %s, id: %s). "
00275 "You should only instantiate at most two bond instances for each (topic, id) pair.",
00276 topic_.c_str(), id_.c_str());
00277 return;
00278 }
00279
00280 if (msg->active)
00281 {
00282 sm_.SisterAlive();
00283 }
00284 else
00285 {
00286 sm_.SisterDead();
00287
00288
00289 if (sisterDiedFirst_)
00290 publishStatus(false);
00291 }
00292 }
00293 flushPendingCallbacks();
00294 }
00295 }
00296
00297 void Bond::doPublishing(const ros::WallTimerEvent &e)
00298 {
00299 boost::mutex::scoped_lock lock(mutex_);
00300 if (sm_.getState().getId() == SM::WaitingForSister.getId() ||
00301 sm_.getState().getId() == SM::Alive.getId())
00302 {
00303 publishStatus(true);
00304 }
00305 else if (sm_.getState().getId() == SM::AwaitSisterDeath.getId())
00306 {
00307 publishStatus(false);
00308 }
00309 else
00310 {
00311 publishingTimer_.stop();
00312 }
00313
00314 }
00315
00316 void Bond::publishStatus(bool active)
00317 {
00318 bond::Status::Ptr msg(new bond::Status);
00319 msg->header.stamp = ros::Time::now();
00320 msg->id = id_;
00321 msg->instance_id = instance_id_;
00322 msg->active = active;
00323 msg->heartbeat_timeout = heartbeat_timeout_;
00324 msg->heartbeat_period = heartbeat_period_;
00325 pub_.publish(msg);
00326 }
00327
00328
00329 void Bond::flushPendingCallbacks()
00330 {
00331 std::vector<boost::function<void(void)> > callbacks;
00332 {
00333 boost::mutex::scoped_lock lock(mutex_);
00334 callbacks = pending_callbacks_;
00335 pending_callbacks_.clear();
00336 }
00337
00338 for (size_t i = 0; i < callbacks.size(); ++i)
00339 callbacks[i]();
00340 }
00341
00342 }
00343
00344
00345 void BondSM::Connected()
00346 {
00347 b->connect_timer_.cancel();
00348 b->condition_.notify_all();
00349 if (b->on_formed_)
00350 b->pending_callbacks_.push_back(b->on_formed_);
00351 }
00352
00353 void BondSM::SisterDied()
00354 {
00355 b->sisterDiedFirst_ = true;
00356 }
00357
00358 void BondSM::Death()
00359 {
00360 b->condition_.notify_all();
00361 b->heartbeat_timer_.cancel();
00362 b->disconnect_timer_.cancel();
00363 if (b->on_broken_)
00364 b->pending_callbacks_.push_back(b->on_broken_);
00365 }
00366
00367 void BondSM::Heartbeat()
00368 {
00369 b->heartbeat_timer_.reset();
00370 }
00371
00372 void BondSM::StartDying()
00373 {
00374 b->heartbeat_timer_.cancel();
00375 b->disconnect_timer_.reset();
00376 b->publishingTimer_.setPeriod(ros::WallDuration(bond::Constants::DEAD_PUBLISH_PERIOD));
00377 }