00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 #include <bondcpp/bond.h>
00033 #include <boost/thread/thread_time.hpp>
00034 #include <boost/date_time/posix_time/posix_time_types.hpp>
00035
00036 #ifdef _WIN32
00037 #include <Rpc.h>
00038 #else
00039 #include <uuid/uuid.h>
00040 #endif
00041
00042 namespace bond {
00043
00044 static std::string makeUUID()
00045 {
00046 #ifdef _WIN32
00047 UUID uuid;
00048 UuidCreate(&uuid);
00049 unsigned char *str;
00050 UuidToStringA(&uuid, &str);
00051 std::string return_string(reinterpret_cast<char *>(str));
00052 RpcStringFreeA(&str);
00053 return return_string;
00054 #else
00055 uuid_t uuid;
00056 uuid_generate_random(uuid);
00057 char uuid_str[40];
00058 uuid_unparse(uuid, uuid_str);
00059 return std::string(uuid_str);
00060 #endif
00061 }
00062
00063 Bond::Bond(const std::string &topic, const std::string &id,
00064 boost::function<void(void)> on_broken,
00065 boost::function<void(void)> on_formed)
00066 :
00067
00068 bondsm_(new BondSM(this)),
00069 sm_(*bondsm_),
00070 topic_(topic),
00071 id_(id),
00072 instance_id_(makeUUID()),
00073 on_broken_(on_broken),
00074 on_formed_(on_formed),
00075 sisterDiedFirst_(false),
00076 started_(false),
00077
00078 connect_timer_(ros::WallDuration(), boost::bind(&Bond::onConnectTimeout, this)),
00079 heartbeat_timer_(ros::WallDuration(), boost::bind(&Bond::onHeartbeatTimeout, this)),
00080 disconnect_timer_(ros::WallDuration(), boost::bind(&Bond::onDisconnectTimeout, this))
00081 {
00082 setConnectTimeout(bond::Constants::DEFAULT_CONNECT_TIMEOUT);
00083 setDisconnectTimeout(bond::Constants::DEFAULT_DISCONNECT_TIMEOUT);
00084 setHeartbeatTimeout(bond::Constants::DEFAULT_HEARTBEAT_TIMEOUT);
00085 setHeartbeatPeriod(bond::Constants::DEFAULT_HEARTBEAT_PERIOD);
00086 }
00087
00088 Bond::~Bond()
00089 {
00090 if (!started_)
00091 return;
00092
00093 breakBond();
00094 if (!waitUntilBroken(ros::Duration(1.0)))
00095 {
00096 ROS_DEBUG("Bond failed to break on destruction %s (%s)", id_.c_str(), instance_id_.c_str());
00097 }
00098
00099
00100
00101
00102 sub_.shutdown();
00103
00104
00105
00106 publishingTimer_.stop();
00107 connect_timer_.cancel();
00108 heartbeat_timer_.cancel();
00109 disconnect_timer_.cancel();
00110
00111 boost::mutex::scoped_lock lock(mutex_);
00112 pub_.shutdown();
00113 }
00114
00115
00116 void Bond::setConnectTimeout(double dur)
00117 {
00118 if (started_) {
00119 ROS_ERROR("Cannot set timeouts after calling start()");
00120 return;
00121 }
00122
00123 connect_timeout_ = dur;
00124 connect_timer_.setDuration(ros::WallDuration(connect_timeout_));
00125 }
00126
00127 void Bond::setDisconnectTimeout(double dur)
00128 {
00129 if (started_) {
00130 ROS_ERROR("Cannot set timeouts after calling start()");
00131 return;
00132 }
00133
00134 disconnect_timeout_ = dur;
00135 disconnect_timer_.setDuration(ros::WallDuration(disconnect_timeout_));
00136 }
00137
00138 void Bond::setHeartbeatTimeout(double dur)
00139 {
00140 if (started_) {
00141 ROS_ERROR("Cannot set timeouts after calling start()");
00142 return;
00143 }
00144
00145 heartbeat_timeout_ = dur;
00146 heartbeat_timer_.setDuration(ros::WallDuration(heartbeat_timeout_));
00147 }
00148
00149 void Bond::setHeartbeatPeriod(double dur)
00150 {
00151 if (started_) {
00152 ROS_ERROR("Cannot set timeouts after calling start()");
00153 return;
00154 }
00155
00156 heartbeat_period_ = dur;
00157 }
00158
00159 void Bond::setCallbackQueue(ros::CallbackQueueInterface *queue)
00160 {
00161 if (started_) {
00162 ROS_ERROR("Cannot set callback queue after calling start()");
00163 return;
00164 }
00165
00166 nh_.setCallbackQueue(queue);
00167 }
00168
00169
00170 void Bond::start()
00171 {
00172 boost::mutex::scoped_lock lock(mutex_);
00173 connect_timer_.reset();
00174 pub_ = nh_.advertise<bond::Status>(topic_, 5);
00175 sub_ = nh_.subscribe<bond::Status>(topic_, 30, boost::bind(&Bond::bondStatusCB, this, _1));
00176
00177 publishingTimer_ = nh_.createWallTimer(ros::WallDuration(heartbeat_period_), &Bond::doPublishing, this);
00178 started_ = true;
00179 }
00180
00181 void Bond::setFormedCallback(boost::function<void(void)> on_formed)
00182 {
00183 boost::mutex::scoped_lock lock(mutex_);
00184 on_formed_ = on_formed;
00185 }
00186
00187 void Bond::setBrokenCallback(boost::function<void(void)> on_broken)
00188 {
00189 boost::mutex::scoped_lock lock(mutex_);
00190 on_broken_ = on_broken;
00191 }
00192
00193 bool Bond::waitUntilFormed(ros::Duration timeout)
00194 {
00195 return waitUntilFormed(ros::WallDuration(timeout.sec, timeout.nsec));
00196 }
00197 bool Bond::waitUntilFormed(ros::WallDuration timeout)
00198 {
00199 boost::mutex::scoped_lock lock(mutex_);
00200 ros::WallTime deadline(ros::WallTime::now() + timeout);
00201
00202 while (sm_.getState().getId() == SM::WaitingForSister.getId())
00203 {
00204 if (!ros::ok())
00205 break;
00206
00207 ros::WallDuration wait_time = ros::WallDuration(0.1);
00208 if (timeout >= ros::WallDuration(0.0))
00209 wait_time = std::min(wait_time, deadline - ros::WallTime::now());
00210
00211 if (wait_time <= ros::WallDuration(0.0))
00212 break;
00213
00214 condition_.timed_wait(mutex_, boost::posix_time::milliseconds(wait_time.toSec() * 1000.0f));
00215 }
00216 return sm_.getState().getId() != SM::WaitingForSister.getId();
00217 }
00218
00219 bool Bond::waitUntilBroken(ros::Duration timeout)
00220 {
00221 return waitUntilBroken(ros::WallDuration(timeout.sec, timeout.nsec));
00222 }
00223 bool Bond::waitUntilBroken(ros::WallDuration timeout)
00224 {
00225 boost::mutex::scoped_lock lock(mutex_);
00226 ros::WallTime deadline(ros::WallTime::now() + timeout);
00227
00228 while (sm_.getState().getId() != SM::Dead.getId())
00229 {
00230 if (!ros::ok())
00231 break;
00232
00233 ros::WallDuration wait_time = ros::WallDuration(0.1);
00234 if (timeout >= ros::WallDuration(0.0))
00235 wait_time = std::min(wait_time, deadline - ros::WallTime::now());
00236
00237 if (wait_time <= ros::WallDuration(0.0))
00238 break;
00239
00240 condition_.timed_wait(mutex_, boost::posix_time::milliseconds(wait_time.toSec() * 1000.0f));
00241 }
00242 return sm_.getState().getId() == SM::Dead.getId();
00243 }
00244
00245 bool Bond::isBroken()
00246 {
00247 boost::mutex::scoped_lock lock(mutex_);
00248 return sm_.getState().getId() == SM::Dead.getId();
00249 }
00250
00251 void Bond::breakBond()
00252 {
00253 {
00254 boost::mutex::scoped_lock lock(mutex_);
00255 if (sm_.getState().getId() != SM::Dead.getId())
00256 {
00257 sm_.Die();
00258 publishStatus(false);
00259 }
00260 }
00261 flushPendingCallbacks();
00262 }
00263
00264
00265 void Bond::onConnectTimeout()
00266 {
00267 {
00268 boost::mutex::scoped_lock lock(mutex_);
00269 sm_.ConnectTimeout();
00270 }
00271 flushPendingCallbacks();
00272 }
00273 void Bond::onHeartbeatTimeout()
00274 {
00275
00276 bool disable_heartbeat_timeout;
00277 nh_.param(bond::Constants::DISABLE_HEARTBEAT_TIMEOUT_PARAM, disable_heartbeat_timeout, false);
00278 if (disable_heartbeat_timeout) {
00279 ROS_WARN("Heartbeat timeout is disabled. Not breaking bond (topic: %s, id: %s)",
00280 topic_.c_str(), id_.c_str());
00281 return;
00282 }
00283
00284 {
00285 boost::mutex::scoped_lock lock(mutex_);
00286 sm_.HeartbeatTimeout();
00287 }
00288 flushPendingCallbacks();
00289 }
00290 void Bond::onDisconnectTimeout()
00291 {
00292 {
00293 boost::mutex::scoped_lock lock(mutex_);
00294 sm_.DisconnectTimeout();
00295 }
00296 flushPendingCallbacks();
00297 }
00298
00299 void Bond::bondStatusCB(const bond::Status::ConstPtr &msg)
00300 {
00301
00302 if (msg->id == id_ && msg->instance_id != instance_id_)
00303 {
00304 {
00305 boost::mutex::scoped_lock lock(mutex_);
00306
00307 if (sister_instance_id_.empty())
00308 sister_instance_id_ = msg->instance_id;
00309 if (sister_instance_id_ != msg->instance_id) {
00310 ROS_ERROR("More than two locations are trying to use a single bond (topic: %s, id: %s). "
00311 "You should only instantiate at most two bond instances for each (topic, id) pair.",
00312 topic_.c_str(), id_.c_str());
00313 return;
00314 }
00315
00316 if (msg->active)
00317 {
00318 sm_.SisterAlive();
00319 }
00320 else
00321 {
00322 sm_.SisterDead();
00323
00324
00325 if (sisterDiedFirst_)
00326 publishStatus(false);
00327 }
00328 }
00329 flushPendingCallbacks();
00330 }
00331 }
00332
00333 void Bond::doPublishing(const ros::WallTimerEvent &e)
00334 {
00335 boost::mutex::scoped_lock lock(mutex_);
00336 if (sm_.getState().getId() == SM::WaitingForSister.getId() ||
00337 sm_.getState().getId() == SM::Alive.getId())
00338 {
00339 publishStatus(true);
00340 }
00341 else if (sm_.getState().getId() == SM::AwaitSisterDeath.getId())
00342 {
00343 publishStatus(false);
00344 }
00345 else
00346 {
00347 publishingTimer_.stop();
00348 }
00349
00350 }
00351
00352 void Bond::publishStatus(bool active)
00353 {
00354 bond::Status::Ptr msg(new bond::Status);
00355 msg->header.stamp = ros::Time::now();
00356 msg->id = id_;
00357 msg->instance_id = instance_id_;
00358 msg->active = active;
00359 msg->heartbeat_timeout = heartbeat_timeout_;
00360 msg->heartbeat_period = heartbeat_period_;
00361 pub_.publish(msg);
00362 }
00363
00364
00365 void Bond::flushPendingCallbacks()
00366 {
00367 std::vector<boost::function<void(void)> > callbacks;
00368 {
00369 boost::mutex::scoped_lock lock(mutex_);
00370 callbacks = pending_callbacks_;
00371 pending_callbacks_.clear();
00372 }
00373
00374 for (size_t i = 0; i < callbacks.size(); ++i)
00375 callbacks[i]();
00376 }
00377
00378 }
00379
00380
00381 void BondSM::Connected()
00382 {
00383 b->connect_timer_.cancel();
00384 b->condition_.notify_all();
00385 if (b->on_formed_)
00386 b->pending_callbacks_.push_back(b->on_formed_);
00387 }
00388
00389 void BondSM::SisterDied()
00390 {
00391 b->sisterDiedFirst_ = true;
00392 }
00393
00394 void BondSM::Death()
00395 {
00396 b->condition_.notify_all();
00397 b->heartbeat_timer_.cancel();
00398 b->disconnect_timer_.cancel();
00399 if (b->on_broken_)
00400 b->pending_callbacks_.push_back(b->on_broken_);
00401 }
00402
00403 void BondSM::Heartbeat()
00404 {
00405 b->heartbeat_timer_.reset();
00406 }
00407
00408 void BondSM::StartDying()
00409 {
00410 b->heartbeat_timer_.cancel();
00411 b->disconnect_timer_.reset();
00412 b->publishingTimer_.setPeriod(ros::WallDuration(bond::Constants::DEAD_PUBLISH_PERIOD));
00413 }