bond.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2009, Willow Garage, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * * Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  * * Redistributions in binary form must reproduce the above copyright
11  * notice, this list of conditions and the following disclaimer in the
12  * documentation and/or other materials provided with the distribution.
13  * * Neither the name of the Willow Garage, Inc. nor the names of its
14  * contributors may be used to endorse or promote products derived from
15  * this software without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 // Author: Stuart Glaser
31 
32 #include <bondcpp/bond.h>
33 #include <boost/thread/thread_time.hpp>
34 #include <boost/date_time/posix_time/posix_time_types.hpp>
35 
36 #ifdef _WIN32
37 #include <Rpc.h>
38 #else
39 #include <uuid/uuid.h>
40 #endif
41 
42 #include <algorithm>
43 #include <string>
44 #include <vector>
45 
46 namespace bond {
47 
48 static std::string makeUUID()
49 {
50 #ifdef _WIN32
51  UUID uuid;
52  UuidCreate(&uuid);
53  unsigned char *str;
54  UuidToStringA(&uuid, &str);
55  std::string return_string(reinterpret_cast<char *>(str));
56  RpcStringFreeA(&str);
57  return return_string;
58 #else
59  uuid_t uuid;
60  uuid_generate_random(uuid);
61  char uuid_str[40];
62  uuid_unparse(uuid, uuid_str);
63  return std::string(uuid_str);
64 #endif
65 }
66 
67 Bond::Bond(const std::string &topic, const std::string &id,
68  boost::function<void(void)> on_broken,
69  boost::function<void(void)> on_formed)
70  :
71 
72  bondsm_(new BondSM(this)),
73  sm_(*bondsm_),
74  topic_(topic),
75  id_(id),
76  instance_id_(makeUUID()),
77  on_broken_(on_broken),
78  on_formed_(on_formed),
79  sisterDiedFirst_(false),
80  started_(false),
81 
82  connect_timer_(ros::WallDuration(), boost::bind(&Bond::onConnectTimeout, this)),
83  heartbeat_timer_(ros::WallDuration(), boost::bind(&Bond::onHeartbeatTimeout, this)),
84  disconnect_timer_(ros::WallDuration(), boost::bind(&Bond::onDisconnectTimeout, this))
85 {
86  setConnectTimeout(bond::Constants::DEFAULT_CONNECT_TIMEOUT);
87  setDisconnectTimeout(bond::Constants::DEFAULT_DISCONNECT_TIMEOUT);
88  setHeartbeatTimeout(bond::Constants::DEFAULT_HEARTBEAT_TIMEOUT);
89  setHeartbeatPeriod(bond::Constants::DEFAULT_HEARTBEAT_PERIOD);
90 }
91 
93 {
94  if (!started_) {
95  return;
96  }
97 
98  breakBond();
99  if (!waitUntilBroken(ros::Duration(1.0))) {
100  ROS_DEBUG("Bond failed to break on destruction %s (%s)", id_.c_str(), instance_id_.c_str());
101  }
102 
103  // Must destroy the subscription before locking mutex_: shutdown()
104  // will block until the status callback completes, and the status
105  // callback locks mutex_ (in flushPendingCallbacks).
106  sub_.shutdown();
107 
108  // Stops the timers before locking the mutex. Makes sure none of
109  // the callbacks are running when we aquire the mutex.
114 
115  boost::mutex::scoped_lock lock(mutex_);
116  pub_.shutdown();
117 }
118 
119 
120 void Bond::setConnectTimeout(double dur)
121 {
122  if (started_) {
123  ROS_ERROR("Cannot set timeouts after calling start()");
124  return;
125  }
126 
127  connect_timeout_ = dur;
129 }
130 
132 {
133  if (started_) {
134  ROS_ERROR("Cannot set timeouts after calling start()");
135  return;
136  }
137 
138  disconnect_timeout_ = dur;
140 }
141 
143 {
144  if (started_) {
145  ROS_ERROR("Cannot set timeouts after calling start()");
146  return;
147  }
148 
149  heartbeat_timeout_ = dur;
151 }
152 
153 void Bond::setHeartbeatPeriod(double dur)
154 {
155  if (started_) {
156  ROS_ERROR("Cannot set timeouts after calling start()");
157  return;
158  }
159 
160  heartbeat_period_ = dur;
161 }
162 
164 {
165  if (started_) {
166  ROS_ERROR("Cannot set callback queue after calling start()");
167  return;
168  }
169 
170  nh_.setCallbackQueue(queue);
171 }
172 
173 
175 {
176  boost::mutex::scoped_lock lock(mutex_);
178  pub_ = nh_.advertise<bond::Status>(topic_, 5);
179  sub_ = nh_.subscribe<bond::Status>(topic_, 30, boost::bind(&Bond::bondStatusCB, this, _1));
180 
183  started_ = true;
184 }
185 
186 void Bond::setFormedCallback(boost::function<void(void)> on_formed)
187 {
188  boost::mutex::scoped_lock lock(mutex_);
189  on_formed_ = on_formed;
190 }
191 
192 void Bond::setBrokenCallback(boost::function<void(void)> on_broken)
193 {
194  boost::mutex::scoped_lock lock(mutex_);
195  on_broken_ = on_broken;
196 }
197 
199 {
200  return waitUntilFormed(ros::WallDuration(timeout.sec, timeout.nsec));
201 }
203 {
204  boost::mutex::scoped_lock lock(mutex_);
205  ros::SteadyTime deadline(ros::SteadyTime::now() + timeout);
206 
207  while (sm_.getState().getId() == SM::WaitingForSister.getId()) {
208  if (!ros::ok()) {
209  break;
210  }
211 
212  ros::WallDuration wait_time = ros::WallDuration(0.1);
213  if (timeout >= ros::WallDuration(0.0)) {
214  wait_time = std::min(wait_time, deadline - ros::SteadyTime::now());
215  }
216 
217  if (wait_time <= ros::WallDuration(0.0)) {
218  break; // The deadline has expired
219  }
220 
221  condition_.timed_wait(mutex_, boost::posix_time::milliseconds(
222  static_cast<int64_t>(wait_time.toSec() * 1000.0f)));
223  }
224  return sm_.getState().getId() != SM::WaitingForSister.getId();
225 }
226 
228 {
229  return waitUntilBroken(ros::WallDuration(timeout.sec, timeout.nsec));
230 }
232 {
233  boost::mutex::scoped_lock lock(mutex_);
234  ros::SteadyTime deadline(ros::SteadyTime::now() + timeout);
235 
236  while (sm_.getState().getId() != SM::Dead.getId()) {
237  if (!ros::ok()) {
238  break;
239  }
240 
241  ros::WallDuration wait_time = ros::WallDuration(0.1);
242  if (timeout >= ros::WallDuration(0.0)) {
243  wait_time = std::min(wait_time, deadline - ros::SteadyTime::now());
244  }
245 
246  if (wait_time <= ros::WallDuration(0.0)) {
247  break; // The deadline has expired
248  }
249 
250  condition_.timed_wait(mutex_, boost::posix_time::milliseconds(
251  static_cast<int64_t>(wait_time.toSec() * 1000.0f)));
252  }
253  return sm_.getState().getId() == SM::Dead.getId();
254 }
255 
257 {
258  boost::mutex::scoped_lock lock(mutex_);
259  return sm_.getState().getId() == SM::Dead.getId();
260 }
261 
263 {
264  {
265  boost::mutex::scoped_lock lock(mutex_);
266  if (sm_.getState().getId() != SM::Dead.getId()) {
267  sm_.Die();
268  publishStatus(false);
269  }
270  }
272 }
273 
274 
276 {
277  {
278  boost::mutex::scoped_lock lock(mutex_);
280  }
282 }
284 {
285  // Checks that heartbeat timeouts haven't been disabled globally.
286  bool disable_heartbeat_timeout;
287  nh_.param(bond::Constants::DISABLE_HEARTBEAT_TIMEOUT_PARAM, disable_heartbeat_timeout, false);
288  if (disable_heartbeat_timeout) {
289  ROS_WARN("Heartbeat timeout is disabled. Not breaking bond (topic: %s, id: %s)",
290  topic_.c_str(), id_.c_str());
291  return;
292  }
293 
294  {
295  boost::mutex::scoped_lock lock(mutex_);
297  }
299 }
301 {
302  {
303  boost::mutex::scoped_lock lock(mutex_);
305  }
307 }
308 
309 void Bond::bondStatusCB(const bond::Status::ConstPtr &msg)
310 {
311  // Filters out messages from other bonds and messages from ourself
312  if (msg->id == id_ && msg->instance_id != instance_id_) {
313  {
314  boost::mutex::scoped_lock lock(mutex_);
315 
316  if (sister_instance_id_.empty()) {
317  sister_instance_id_ = msg->instance_id;
318  }
319  if (sister_instance_id_ != msg->instance_id) {
320  ROS_ERROR(
321  "More than two locations are trying to use a single bond (topic: %s, id: %s). "
322  "You should only instantiate at most two bond instances for each (topic, id) pair.",
323  topic_.c_str(), id_.c_str());
324  return;
325  }
326 
327  if (msg->active) {
328  sm_.SisterAlive();
329  } else {
330  sm_.SisterDead();
331 
332  // Immediate ack for sister's death notification
333  if (sisterDiedFirst_) {
334  publishStatus(false);
335  }
336  }
337  }
339  }
340 }
341 
343 {
344  boost::mutex::scoped_lock lock(mutex_);
346  sm_.getState().getId() == SM::Alive.getId()) {
347  publishStatus(true);
348  } else if (sm_.getState().getId() == SM::AwaitSisterDeath.getId()) {
349  publishStatus(false);
350  } else {
352  }
353 }
354 
355 void Bond::publishStatus(bool active)
356 {
357  bond::Status::Ptr msg(new bond::Status);
358  msg->header.stamp = ros::Time::now();
359  msg->id = id_;
360  msg->instance_id = instance_id_;
361  msg->active = active;
362  msg->heartbeat_timeout = heartbeat_timeout_;
363  msg->heartbeat_period = heartbeat_period_;
364  pub_.publish(msg);
365 }
366 
367 
369 {
370  std::vector<boost::function<void(void)> > callbacks;
371  {
372  boost::mutex::scoped_lock lock(mutex_);
373  callbacks = pending_callbacks_;
374  pending_callbacks_.clear();
375  }
376 
377  for (size_t i = 0; i < callbacks.size(); ++i) {
378  callbacks[i]();
379  }
380 }
381 
382 } // namespace bond
383 
384 
386 {
387  b->connect_timer_.cancel();
388  b->condition_.notify_all();
389  if (b->on_formed_) {
390  b->pending_callbacks_.push_back(b->on_formed_);
391  }
392 }
393 
395 {
396  b->sisterDiedFirst_ = true;
397 }
398 
400 {
401  b->condition_.notify_all();
402  b->heartbeat_timer_.cancel();
403  b->disconnect_timer_.cancel();
404  if (b->on_broken_) {
405  b->pending_callbacks_.push_back(b->on_broken_);
406  }
407 }
408 
410 {
411  b->heartbeat_timer_.reset();
412 }
413 
415 {
416  b->heartbeat_timer_.cancel();
417  b->disconnect_timer_.reset();
418  b->publishingTimer_.setPeriod(ros::WallDuration(bond::Constants::DEAD_PUBLISH_PERIOD));
419 }
std::string id_
Definition: bond.h:159
double heartbeat_timeout_
Definition: bond.h:171
void DisconnectTimeout()
Definition: BondSM_sm.h:171
void setHeartbeatPeriod(double dur)
Definition: bond.cpp:153
void breakBond()
Breaks the bond, notifying the other process.
Definition: bond.cpp:262
void ConnectTimeout()
Definition: BondSM_sm.h:161
ros::Subscriber sub_
Definition: bond.h:179
void publish(const boost::shared_ptr< M > &message) const
Bond(const std::string &topic, const std::string &id, boost::function< void(void)> on_broken=boost::function< void(void)>(), boost::function< void(void)> on_formed=boost::function< void(void)>())
Constructs a bond, but does not connect.
Definition: bond.cpp:67
void setBrokenCallback(boost::function< void(void)> on_broken)
Sets the broken callback.
Definition: bond.cpp:192
void setCallbackQueue(ros::CallbackQueueInterface *queue)
Definition: bond.cpp:163
Definition: bond.h:200
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
void setHeartbeatTimeout(double dur)
Definition: bond.cpp:142
BondSMContext sm_
Definition: bond.h:156
int getId() const
void SisterAlive()
Definition: BondSM_sm.h:181
ros::NodeHandle nh_
Definition: bond.h:154
void bondStatusCB(const bond::Status::ConstPtr &msg)
Definition: bond.cpp:309
~Bond()
Destructs the object, breaking the bond if it is still formed.
Definition: bond.cpp:92
void onDisconnectTimeout()
Definition: bond.cpp:300
bool sisterDiedFirst_
Definition: bond.h:164
Timeout connect_timer_
Definition: bond.h:175
#define ROS_WARN(...)
ros::SteadyTimer publishingTimer_
Definition: bond.h:181
std::vector< boost::function< void(void)> > pending_callbacks_
Definition: bond.h:192
static std::string makeUUID()
Definition: bond.cpp:48
void setConnectTimeout(double dur)
Definition: bond.cpp:120
bool started_
Definition: bond.h:165
void flushPendingCallbacks()
Definition: bond.cpp:368
boost::mutex mutex_
Definition: bond.h:167
boost::function< void(void)> on_broken_
Definition: bond.h:162
void Die()
Definition: BondSM_sm.h:166
void publishStatus(bool active)
Definition: bond.cpp:355
bool waitUntilBroken(ros::Duration timeout=ros::Duration(-1))
Blocks until the bond is broken for at most &#39;duration&#39;.
Definition: bond.cpp:227
bool waitUntilFormed(ros::Duration timeout=ros::Duration(-1))
Blocks until the bond is formed for at most &#39;duration&#39;.
Definition: bond.cpp:198
void start()
Starts the bond and connects to the sister process.
Definition: bond.cpp:174
Timeout heartbeat_timer_
Definition: bond.h:176
std::string instance_id_
Definition: bond.h:160
static SM_WaitingForSister WaitingForSister
Definition: BondSM_sm.h:52
void setDisconnectTimeout(double dur)
Definition: bond.cpp:131
void SisterDead()
Definition: BondSM_sm.h:186
SteadyTimer createSteadyTimer(WallDuration period, void(T::*callback)(const SteadyTimerEvent &), T *obj, bool oneshot=false, bool autostart=true) const
static SteadyTime now()
static SM_Alive Alive
Definition: BondSM_sm.h:53
boost::condition condition_
Definition: bond.h:168
void Connected()
Definition: bond.cpp:385
bool param(const std::string &param_name, T &param_val, const T &default_val) const
void StartDying()
Definition: bond.cpp:414
void setCallbackQueue(CallbackQueueInterface *queue)
std::string sister_instance_id_
Definition: bond.h:161
static SM_AwaitSisterDeath AwaitSisterDeath
Definition: BondSM_sm.h:54
boost::function< void(void)> on_formed_
Definition: bond.h:163
void HeartbeatTimeout()
Definition: BondSM_sm.h:176
ROSCPP_DECL bool ok()
double connect_timeout_
Definition: bond.h:170
Forms a bond to monitor another process.
Definition: bond.h:68
Publisher advertise(const std::string &topic, uint32_t queue_size, bool latch=false)
void SisterDied()
Definition: bond.cpp:394
void cancel()
Definition: timeout.cpp:71
double heartbeat_period_
Definition: bond.h:173
void Death()
Definition: bond.cpp:399
Timeout disconnect_timer_
Definition: bond.h:177
bool isBroken()
Indicates if the bond is broken.
Definition: bond.cpp:256
double disconnect_timeout_
Definition: bond.h:172
static SM_Dead Dead
Definition: BondSM_sm.h:55
BondSMState & getState() const
Definition: BondSM_sm.h:151
std::string topic_
Definition: bond.h:158
Definition: bond.h:60
static Time now()
void onHeartbeatTimeout()
Definition: bond.cpp:283
void doPublishing(const ros::SteadyTimerEvent &e)
Definition: bond.cpp:342
void reset()
Definition: timeout.cpp:64
void setDuration(const ros::Duration &d)
Definition: timeout.cpp:53
ros::Publisher pub_
Definition: bond.h:180
void setFormedCallback(boost::function< void(void)> on_formed)
Sets the formed callback.
Definition: bond.cpp:186
#define ROS_ERROR(...)
void Heartbeat()
Definition: bond.cpp:409
#define ROS_DEBUG(...)
void onConnectTimeout()
Definition: bond.cpp:275


bondcpp
Author(s): Stuart Glaser
autogenerated on Wed Sep 2 2020 03:40:44