$search
00001 /* 00002 * Copyright (c) 2009, Willow Garage, Inc. 00003 * All rights reserved. 00004 * 00005 * Redistribution and use in source and binary forms, with or without 00006 * modification, are permitted provided that the following conditions are met: 00007 * 00008 * * Redistributions of source code must retain the above copyright 00009 * notice, this list of conditions and the following disclaimer. 00010 * * Redistributions in binary form must reproduce the above copyright 00011 * notice, this list of conditions and the following disclaimer in the 00012 * documentation and/or other materials provided with the distribution. 00013 * * Neither the name of the Willow Garage, Inc. nor the names of its 00014 * contributors may be used to endorse or promote products derived from 00015 * this software without specific prior written permission. 00016 * 00017 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 00018 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 00019 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 00020 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 00021 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 00022 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 00023 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00024 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 00025 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 00026 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00027 * POSSIBILITY OF SUCH DAMAGE. 00028 */ 00029 00030 // Author: Stuart Glaser 00031 00032 #include <bondcpp/bond.h> 00033 #include <boost/thread/thread_time.hpp> 00034 #include <boost/date_time/posix_time/posix_time_types.hpp> 00035 #include <uuid/uuid.h> 00036 00037 namespace bond { 00038 00039 static std::string makeUUID() 00040 { 00041 uuid_t uuid; 00042 uuid_generate_random(uuid); 00043 char uuid_str[40]; 00044 uuid_unparse(uuid, uuid_str); 00045 return std::string(uuid_str); 00046 } 00047 00048 Bond::Bond(const std::string &topic, const std::string &id, 00049 boost::function<void(void)> on_broken, 00050 boost::function<void(void)> on_formed) 00051 : 00052 00053 bondsm_(new BondSM(this)), 00054 sm_(*bondsm_), 00055 topic_(topic), 00056 id_(id), 00057 instance_id_(makeUUID()), 00058 on_broken_(on_broken), 00059 on_formed_(on_formed), 00060 sisterDiedFirst_(false), 00061 started_(false), 00062 00063 connect_timer_(ros::WallDuration(), boost::bind(&Bond::onConnectTimeout, this)), 00064 heartbeat_timer_(ros::WallDuration(), boost::bind(&Bond::onHeartbeatTimeout, this)), 00065 disconnect_timer_(ros::WallDuration(), boost::bind(&Bond::onDisconnectTimeout, this)) 00066 { 00067 setConnectTimeout(bond::Constants::DEFAULT_CONNECT_TIMEOUT); 00068 setDisconnectTimeout(bond::Constants::DEFAULT_DISCONNECT_TIMEOUT); 00069 setHeartbeatTimeout(bond::Constants::DEFAULT_HEARTBEAT_TIMEOUT); 00070 setHeartbeatPeriod(bond::Constants::DEFAULT_HEARTBEAT_PERIOD); 00071 } 00072 00073 Bond::~Bond() 00074 { 00075 if (!started_) 00076 return; 00077 00078 breakBond(); 00079 if (!waitUntilBroken(ros::Duration(1.0))) 00080 { 00081 ROS_DEBUG("Bond failed to break on destruction %s (%s)", id_.c_str(), instance_id_.c_str()); 00082 } 00083 00084 // Must destroy the subscription before locking mutex_: shutdown() 00085 // will block until the status callback completes, and the status 00086 // callback locks mutex_ (in flushPendingCallbacks). 00087 sub_.shutdown(); 00088 00089 // Stops the timers before locking the mutex. Makes sure none of 00090 // the callbacks are running when we aquire the mutex. 00091 publishingTimer_.stop(); 00092 connect_timer_.cancel(); 00093 heartbeat_timer_.cancel(); 00094 disconnect_timer_.cancel(); 00095 00096 boost::mutex::scoped_lock lock(mutex_); 00097 pub_.shutdown(); 00098 } 00099 00100 00101 void Bond::setConnectTimeout(double dur) 00102 { 00103 if (started_) { 00104 ROS_ERROR("Cannot set timeouts after calling start()"); 00105 return; 00106 } 00107 00108 connect_timeout_ = dur; 00109 connect_timer_.setDuration(ros::WallDuration(connect_timeout_)); 00110 } 00111 00112 void Bond::setDisconnectTimeout(double dur) 00113 { 00114 if (started_) { 00115 ROS_ERROR("Cannot set timeouts after calling start()"); 00116 return; 00117 } 00118 00119 disconnect_timeout_ = dur; 00120 disconnect_timer_.setDuration(ros::WallDuration(disconnect_timeout_)); 00121 } 00122 00123 void Bond::setHeartbeatTimeout(double dur) 00124 { 00125 if (started_) { 00126 ROS_ERROR("Cannot set timeouts after calling start()"); 00127 return; 00128 } 00129 00130 heartbeat_timeout_ = dur; 00131 heartbeat_timer_.setDuration(ros::WallDuration(heartbeat_timeout_)); 00132 } 00133 00134 void Bond::setHeartbeatPeriod(double dur) 00135 { 00136 if (started_) { 00137 ROS_ERROR("Cannot set timeouts after calling start()"); 00138 return; 00139 } 00140 00141 heartbeat_period_ = dur; 00142 } 00143 00144 void Bond::setCallbackQueue(ros::CallbackQueueInterface *queue) 00145 { 00146 if (started_) { 00147 ROS_ERROR("Cannot set callback queue after calling start()"); 00148 return; 00149 } 00150 00151 nh_.setCallbackQueue(queue); 00152 } 00153 00154 00155 void Bond::start() 00156 { 00157 boost::mutex::scoped_lock lock(mutex_); 00158 connect_timer_.reset(); 00159 pub_ = nh_.advertise<bond::Status>(topic_, 5); 00160 sub_ = nh_.subscribe<bond::Status>(topic_, 30, boost::bind(&Bond::bondStatusCB, this, _1)); 00161 00162 publishingTimer_ = nh_.createWallTimer(ros::WallDuration(heartbeat_period_), &Bond::doPublishing, this); 00163 started_ = true; 00164 } 00165 00166 void Bond::setFormedCallback(boost::function<void(void)> on_formed) 00167 { 00168 boost::mutex::scoped_lock lock(mutex_); 00169 on_formed_ = on_formed; 00170 } 00171 00172 void Bond::setBrokenCallback(boost::function<void(void)> on_broken) 00173 { 00174 boost::mutex::scoped_lock lock(mutex_); 00175 on_broken_ = on_broken; 00176 } 00177 00178 bool Bond::waitUntilFormed(ros::Duration timeout) 00179 { 00180 return waitUntilFormed(ros::WallDuration(timeout.sec, timeout.nsec)); 00181 } 00182 bool Bond::waitUntilFormed(ros::WallDuration timeout) 00183 { 00184 boost::mutex::scoped_lock lock(mutex_); 00185 ros::WallTime deadline(ros::WallTime::now() + timeout); 00186 00187 while (sm_.getState().getId() == SM::WaitingForSister.getId()) 00188 { 00189 if (!ros::ok()) 00190 break; 00191 00192 ros::WallDuration wait_time = ros::WallDuration(0.1); 00193 if (timeout >= ros::WallDuration(0.0)) 00194 wait_time = std::min(wait_time, deadline - ros::WallTime::now()); 00195 00196 if (wait_time <= ros::WallDuration(0.0)) 00197 break; // The deadline has expired 00198 00199 condition_.timed_wait(mutex_, boost::posix_time::milliseconds(wait_time.toSec() * 1000.0f)); 00200 } 00201 return sm_.getState().getId() != SM::WaitingForSister.getId(); 00202 } 00203 00204 bool Bond::waitUntilBroken(ros::Duration timeout) 00205 { 00206 return waitUntilBroken(ros::WallDuration(timeout.sec, timeout.nsec)); 00207 } 00208 bool Bond::waitUntilBroken(ros::WallDuration timeout) 00209 { 00210 boost::mutex::scoped_lock lock(mutex_); 00211 ros::WallTime deadline(ros::WallTime::now() + timeout); 00212 00213 while (sm_.getState().getId() != SM::Dead.getId()) 00214 { 00215 if (!ros::ok()) 00216 break; 00217 00218 ros::WallDuration wait_time = ros::WallDuration(0.1); 00219 if (timeout >= ros::WallDuration(0.0)) 00220 wait_time = std::min(wait_time, deadline - ros::WallTime::now()); 00221 00222 if (wait_time <= ros::WallDuration(0.0)) 00223 break; // The deadline has expired 00224 00225 condition_.timed_wait(mutex_, boost::posix_time::milliseconds(wait_time.toSec() * 1000.0f)); 00226 } 00227 return sm_.getState().getId() == SM::Dead.getId(); 00228 } 00229 00230 bool Bond::isBroken() 00231 { 00232 boost::mutex::scoped_lock lock(mutex_); 00233 return sm_.getState().getId() == SM::Dead.getId(); 00234 } 00235 00236 void Bond::breakBond() 00237 { 00238 { 00239 boost::mutex::scoped_lock lock(mutex_); 00240 if (sm_.getState().getId() != SM::Dead.getId()) 00241 { 00242 sm_.Die(); 00243 publishStatus(false); 00244 } 00245 } 00246 flushPendingCallbacks(); 00247 } 00248 00249 00250 void Bond::onConnectTimeout() 00251 { 00252 { 00253 boost::mutex::scoped_lock lock(mutex_); 00254 sm_.ConnectTimeout(); 00255 } 00256 flushPendingCallbacks(); 00257 } 00258 void Bond::onHeartbeatTimeout() 00259 { 00260 // Checks that heartbeat timeouts haven't been disabled globally. 00261 bool disable_heartbeat_timeout; 00262 nh_.param(bond::Constants::DISABLE_HEARTBEAT_TIMEOUT_PARAM, disable_heartbeat_timeout, false); 00263 if (disable_heartbeat_timeout) { 00264 ROS_WARN("Heartbeat timeout is disabled. Not breaking bond (topic: %s, id: %s)", 00265 topic_.c_str(), id_.c_str()); 00266 return; 00267 } 00268 00269 { 00270 boost::mutex::scoped_lock lock(mutex_); 00271 sm_.HeartbeatTimeout(); 00272 } 00273 flushPendingCallbacks(); 00274 } 00275 void Bond::onDisconnectTimeout() 00276 { 00277 { 00278 boost::mutex::scoped_lock lock(mutex_); 00279 sm_.DisconnectTimeout(); 00280 } 00281 flushPendingCallbacks(); 00282 } 00283 00284 void Bond::bondStatusCB(const bond::Status::ConstPtr &msg) 00285 { 00286 // Filters out messages from other bonds and messages from ourself 00287 if (msg->id == id_ && msg->instance_id != instance_id_) 00288 { 00289 { 00290 boost::mutex::scoped_lock lock(mutex_); 00291 00292 if (sister_instance_id_.empty()) 00293 sister_instance_id_ = msg->instance_id; 00294 if (sister_instance_id_ != msg->instance_id) { 00295 ROS_ERROR("More than two locations are trying to use a single bond (topic: %s, id: %s). " 00296 "You should only instantiate at most two bond instances for each (topic, id) pair.", 00297 topic_.c_str(), id_.c_str()); 00298 return; 00299 } 00300 00301 if (msg->active) 00302 { 00303 sm_.SisterAlive(); 00304 } 00305 else 00306 { 00307 sm_.SisterDead(); 00308 00309 // Immediate ack for sister's death notification 00310 if (sisterDiedFirst_) 00311 publishStatus(false); 00312 } 00313 } 00314 flushPendingCallbacks(); 00315 } 00316 } 00317 00318 void Bond::doPublishing(const ros::WallTimerEvent &e) 00319 { 00320 boost::mutex::scoped_lock lock(mutex_); 00321 if (sm_.getState().getId() == SM::WaitingForSister.getId() || 00322 sm_.getState().getId() == SM::Alive.getId()) 00323 { 00324 publishStatus(true); 00325 } 00326 else if (sm_.getState().getId() == SM::AwaitSisterDeath.getId()) 00327 { 00328 publishStatus(false); 00329 } 00330 else 00331 { 00332 publishingTimer_.stop(); 00333 } 00334 00335 } 00336 00337 void Bond::publishStatus(bool active) 00338 { 00339 bond::Status::Ptr msg(new bond::Status); 00340 msg->header.stamp = ros::Time::now(); 00341 msg->id = id_; 00342 msg->instance_id = instance_id_; 00343 msg->active = active; 00344 msg->heartbeat_timeout = heartbeat_timeout_; 00345 msg->heartbeat_period = heartbeat_period_; 00346 pub_.publish(msg); 00347 } 00348 00349 00350 void Bond::flushPendingCallbacks() 00351 { 00352 std::vector<boost::function<void(void)> > callbacks; 00353 { 00354 boost::mutex::scoped_lock lock(mutex_); 00355 callbacks = pending_callbacks_; 00356 pending_callbacks_.clear(); 00357 } 00358 00359 for (size_t i = 0; i < callbacks.size(); ++i) 00360 callbacks[i](); 00361 } 00362 00363 } // namespace 00364 00365 00366 void BondSM::Connected() 00367 { 00368 b->connect_timer_.cancel(); 00369 b->condition_.notify_all(); 00370 if (b->on_formed_) 00371 b->pending_callbacks_.push_back(b->on_formed_); 00372 } 00373 00374 void BondSM::SisterDied() 00375 { 00376 b->sisterDiedFirst_ = true; 00377 } 00378 00379 void BondSM::Death() 00380 { 00381 b->condition_.notify_all(); 00382 b->heartbeat_timer_.cancel(); 00383 b->disconnect_timer_.cancel(); 00384 if (b->on_broken_) 00385 b->pending_callbacks_.push_back(b->on_broken_); 00386 } 00387 00388 void BondSM::Heartbeat() 00389 { 00390 b->heartbeat_timer_.reset(); 00391 } 00392 00393 void BondSM::StartDying() 00394 { 00395 b->heartbeat_timer_.cancel(); 00396 b->disconnect_timer_.reset(); 00397 b->publishingTimer_.setPeriod(ros::WallDuration(bond::Constants::DEAD_PUBLISH_PERIOD)); 00398 }