00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 #include <bondcpp/bond.h>
00033 #include <boost/thread/thread_time.hpp>
00034 #include <boost/date_time/posix_time/posix_time_types.hpp>
00035
00036 #ifdef _WIN32
00037 #include <Rpc.h>
00038 #else
00039 #include <uuid/uuid.h>
00040 #endif
00041
00042 #include <algorithm>
00043 #include <string>
00044 #include <vector>
00045
00046 namespace bond {
00047
00048 static std::string makeUUID()
00049 {
00050 #ifdef _WIN32
00051 UUID uuid;
00052 UuidCreate(&uuid);
00053 unsigned char *str;
00054 UuidToStringA(&uuid, &str);
00055 std::string return_string(reinterpret_cast<char *>(str));
00056 RpcStringFreeA(&str);
00057 return return_string;
00058 #else
00059 uuid_t uuid;
00060 uuid_generate_random(uuid);
00061 char uuid_str[40];
00062 uuid_unparse(uuid, uuid_str);
00063 return std::string(uuid_str);
00064 #endif
00065 }
00066
00067 Bond::Bond(const std::string &topic, const std::string &id,
00068 boost::function<void(void)> on_broken,
00069 boost::function<void(void)> on_formed)
00070 :
00071
00072 bondsm_(new BondSM(this)),
00073 sm_(*bondsm_),
00074 topic_(topic),
00075 id_(id),
00076 instance_id_(makeUUID()),
00077 on_broken_(on_broken),
00078 on_formed_(on_formed),
00079 sisterDiedFirst_(false),
00080 started_(false),
00081
00082 connect_timer_(ros::WallDuration(), boost::bind(&Bond::onConnectTimeout, this)),
00083 heartbeat_timer_(ros::WallDuration(), boost::bind(&Bond::onHeartbeatTimeout, this)),
00084 disconnect_timer_(ros::WallDuration(), boost::bind(&Bond::onDisconnectTimeout, this))
00085 {
00086 setConnectTimeout(bond::Constants::DEFAULT_CONNECT_TIMEOUT);
00087 setDisconnectTimeout(bond::Constants::DEFAULT_DISCONNECT_TIMEOUT);
00088 setHeartbeatTimeout(bond::Constants::DEFAULT_HEARTBEAT_TIMEOUT);
00089 setHeartbeatPeriod(bond::Constants::DEFAULT_HEARTBEAT_PERIOD);
00090 }
00091
00092 Bond::~Bond()
00093 {
00094 if (!started_) {
00095 return;
00096 }
00097
00098 breakBond();
00099 if (!waitUntilBroken(ros::Duration(1.0))) {
00100 ROS_DEBUG("Bond failed to break on destruction %s (%s)", id_.c_str(), instance_id_.c_str());
00101 }
00102
00103
00104
00105
00106 sub_.shutdown();
00107
00108
00109
00110 publishingTimer_.stop();
00111 connect_timer_.cancel();
00112 heartbeat_timer_.cancel();
00113 disconnect_timer_.cancel();
00114
00115 boost::mutex::scoped_lock lock(mutex_);
00116 pub_.shutdown();
00117 }
00118
00119
00120 void Bond::setConnectTimeout(double dur)
00121 {
00122 if (started_) {
00123 ROS_ERROR("Cannot set timeouts after calling start()");
00124 return;
00125 }
00126
00127 connect_timeout_ = dur;
00128 connect_timer_.setDuration(ros::WallDuration(connect_timeout_));
00129 }
00130
00131 void Bond::setDisconnectTimeout(double dur)
00132 {
00133 if (started_) {
00134 ROS_ERROR("Cannot set timeouts after calling start()");
00135 return;
00136 }
00137
00138 disconnect_timeout_ = dur;
00139 disconnect_timer_.setDuration(ros::WallDuration(disconnect_timeout_));
00140 }
00141
00142 void Bond::setHeartbeatTimeout(double dur)
00143 {
00144 if (started_) {
00145 ROS_ERROR("Cannot set timeouts after calling start()");
00146 return;
00147 }
00148
00149 heartbeat_timeout_ = dur;
00150 heartbeat_timer_.setDuration(ros::WallDuration(heartbeat_timeout_));
00151 }
00152
00153 void Bond::setHeartbeatPeriod(double dur)
00154 {
00155 if (started_) {
00156 ROS_ERROR("Cannot set timeouts after calling start()");
00157 return;
00158 }
00159
00160 heartbeat_period_ = dur;
00161 }
00162
00163 void Bond::setCallbackQueue(ros::CallbackQueueInterface *queue)
00164 {
00165 if (started_) {
00166 ROS_ERROR("Cannot set callback queue after calling start()");
00167 return;
00168 }
00169
00170 nh_.setCallbackQueue(queue);
00171 }
00172
00173
00174 void Bond::start()
00175 {
00176 boost::mutex::scoped_lock lock(mutex_);
00177 connect_timer_.reset();
00178 pub_ = nh_.advertise<bond::Status>(topic_, 5);
00179 sub_ = nh_.subscribe<bond::Status>(topic_, 30, boost::bind(&Bond::bondStatusCB, this, _1));
00180
00181 publishingTimer_ = nh_.createWallTimer(
00182 ros::WallDuration(heartbeat_period_), &Bond::doPublishing, this);
00183 started_ = true;
00184 }
00185
00186 void Bond::setFormedCallback(boost::function<void(void)> on_formed)
00187 {
00188 boost::mutex::scoped_lock lock(mutex_);
00189 on_formed_ = on_formed;
00190 }
00191
00192 void Bond::setBrokenCallback(boost::function<void(void)> on_broken)
00193 {
00194 boost::mutex::scoped_lock lock(mutex_);
00195 on_broken_ = on_broken;
00196 }
00197
00198 bool Bond::waitUntilFormed(ros::Duration timeout)
00199 {
00200 return waitUntilFormed(ros::WallDuration(timeout.sec, timeout.nsec));
00201 }
00202 bool Bond::waitUntilFormed(ros::WallDuration timeout)
00203 {
00204 boost::mutex::scoped_lock lock(mutex_);
00205 ros::WallTime deadline(ros::WallTime::now() + timeout);
00206
00207 while (sm_.getState().getId() == SM::WaitingForSister.getId()) {
00208 if (!ros::ok()) {
00209 break;
00210 }
00211
00212 ros::WallDuration wait_time = ros::WallDuration(0.1);
00213 if (timeout >= ros::WallDuration(0.0)) {
00214 wait_time = std::min(wait_time, deadline - ros::WallTime::now());
00215 }
00216
00217 if (wait_time <= ros::WallDuration(0.0)) {
00218 break;
00219 }
00220
00221 condition_.timed_wait(mutex_, boost::posix_time::milliseconds(wait_time.toSec() * 1000.0f));
00222 }
00223 return sm_.getState().getId() != SM::WaitingForSister.getId();
00224 }
00225
00226 bool Bond::waitUntilBroken(ros::Duration timeout)
00227 {
00228 return waitUntilBroken(ros::WallDuration(timeout.sec, timeout.nsec));
00229 }
00230 bool Bond::waitUntilBroken(ros::WallDuration timeout)
00231 {
00232 boost::mutex::scoped_lock lock(mutex_);
00233 ros::WallTime deadline(ros::WallTime::now() + timeout);
00234
00235 while (sm_.getState().getId() != SM::Dead.getId()) {
00236 if (!ros::ok()) {
00237 break;
00238 }
00239
00240 ros::WallDuration wait_time = ros::WallDuration(0.1);
00241 if (timeout >= ros::WallDuration(0.0)) {
00242 wait_time = std::min(wait_time, deadline - ros::WallTime::now());
00243 }
00244
00245 if (wait_time <= ros::WallDuration(0.0)) {
00246 break;
00247 }
00248
00249 condition_.timed_wait(mutex_, boost::posix_time::milliseconds(wait_time.toSec() * 1000.0f));
00250 }
00251 return sm_.getState().getId() == SM::Dead.getId();
00252 }
00253
00254 bool Bond::isBroken()
00255 {
00256 boost::mutex::scoped_lock lock(mutex_);
00257 return sm_.getState().getId() == SM::Dead.getId();
00258 }
00259
00260 void Bond::breakBond()
00261 {
00262 {
00263 boost::mutex::scoped_lock lock(mutex_);
00264 if (sm_.getState().getId() != SM::Dead.getId()) {
00265 sm_.Die();
00266 publishStatus(false);
00267 }
00268 }
00269 flushPendingCallbacks();
00270 }
00271
00272
00273 void Bond::onConnectTimeout()
00274 {
00275 {
00276 boost::mutex::scoped_lock lock(mutex_);
00277 sm_.ConnectTimeout();
00278 }
00279 flushPendingCallbacks();
00280 }
00281 void Bond::onHeartbeatTimeout()
00282 {
00283
00284 bool disable_heartbeat_timeout;
00285 nh_.param(bond::Constants::DISABLE_HEARTBEAT_TIMEOUT_PARAM, disable_heartbeat_timeout, false);
00286 if (disable_heartbeat_timeout) {
00287 ROS_WARN("Heartbeat timeout is disabled. Not breaking bond (topic: %s, id: %s)",
00288 topic_.c_str(), id_.c_str());
00289 return;
00290 }
00291
00292 {
00293 boost::mutex::scoped_lock lock(mutex_);
00294 sm_.HeartbeatTimeout();
00295 }
00296 flushPendingCallbacks();
00297 }
00298 void Bond::onDisconnectTimeout()
00299 {
00300 {
00301 boost::mutex::scoped_lock lock(mutex_);
00302 sm_.DisconnectTimeout();
00303 }
00304 flushPendingCallbacks();
00305 }
00306
00307 void Bond::bondStatusCB(const bond::Status::ConstPtr &msg)
00308 {
00309
00310 if (msg->id == id_ && msg->instance_id != instance_id_) {
00311 {
00312 boost::mutex::scoped_lock lock(mutex_);
00313
00314 if (sister_instance_id_.empty()) {
00315 sister_instance_id_ = msg->instance_id;
00316 }
00317 if (sister_instance_id_ != msg->instance_id) {
00318 ROS_ERROR(
00319 "More than two locations are trying to use a single bond (topic: %s, id: %s). "
00320 "You should only instantiate at most two bond instances for each (topic, id) pair.",
00321 topic_.c_str(), id_.c_str());
00322 return;
00323 }
00324
00325 if (msg->active) {
00326 sm_.SisterAlive();
00327 } else {
00328 sm_.SisterDead();
00329
00330
00331 if (sisterDiedFirst_) {
00332 publishStatus(false);
00333 }
00334 }
00335 }
00336 flushPendingCallbacks();
00337 }
00338 }
00339
00340 void Bond::doPublishing(const ros::WallTimerEvent &)
00341 {
00342 boost::mutex::scoped_lock lock(mutex_);
00343 if (sm_.getState().getId() == SM::WaitingForSister.getId() ||
00344 sm_.getState().getId() == SM::Alive.getId()) {
00345 publishStatus(true);
00346 } else if (sm_.getState().getId() == SM::AwaitSisterDeath.getId()) {
00347 publishStatus(false);
00348 } else {
00349 publishingTimer_.stop();
00350 }
00351 }
00352
00353 void Bond::publishStatus(bool active)
00354 {
00355 bond::Status::Ptr msg(new bond::Status);
00356 msg->header.stamp = ros::Time::now();
00357 msg->id = id_;
00358 msg->instance_id = instance_id_;
00359 msg->active = active;
00360 msg->heartbeat_timeout = heartbeat_timeout_;
00361 msg->heartbeat_period = heartbeat_period_;
00362 pub_.publish(msg);
00363 }
00364
00365
00366 void Bond::flushPendingCallbacks()
00367 {
00368 std::vector<boost::function<void(void)> > callbacks;
00369 {
00370 boost::mutex::scoped_lock lock(mutex_);
00371 callbacks = pending_callbacks_;
00372 pending_callbacks_.clear();
00373 }
00374
00375 for (size_t i = 0; i < callbacks.size(); ++i) {
00376 callbacks[i]();
00377 }
00378 }
00379
00380 }
00381
00382
00383 void BondSM::Connected()
00384 {
00385 b->connect_timer_.cancel();
00386 b->condition_.notify_all();
00387 if (b->on_formed_) {
00388 b->pending_callbacks_.push_back(b->on_formed_);
00389 }
00390 }
00391
00392 void BondSM::SisterDied()
00393 {
00394 b->sisterDiedFirst_ = true;
00395 }
00396
00397 void BondSM::Death()
00398 {
00399 b->condition_.notify_all();
00400 b->heartbeat_timer_.cancel();
00401 b->disconnect_timer_.cancel();
00402 if (b->on_broken_) {
00403 b->pending_callbacks_.push_back(b->on_broken_);
00404 }
00405 }
00406
00407 void BondSM::Heartbeat()
00408 {
00409 b->heartbeat_timer_.reset();
00410 }
00411
00412 void BondSM::StartDying()
00413 {
00414 b->heartbeat_timer_.cancel();
00415 b->disconnect_timer_.reset();
00416 b->publishingTimer_.setPeriod(ros::WallDuration(bond::Constants::DEAD_PUBLISH_PERIOD));
00417 }