bondpy.py
Go to the documentation of this file.
1 # Copyright (c) 2009, Willow Garage, Inc.
2 # All rights reserved.
3 #
4 # Redistribution and use in source and binary forms, with or without
5 # modification, are permitted provided that the following conditions are met:
6 #
7 # * Redistributions of source code must retain the above copyright
8 # notice, this list of conditions and the following disclaimer.
9 # * Redistributions in binary form must reproduce the above copyright
10 # notice, this list of conditions and the following disclaimer in the
11 # documentation and/or other materials provided with the distribution.
12 # * Neither the name of the Willow Garage, Inc. nor the names of its
13 # contributors may be used to endorse or promote products derived from
14 # this software without specific prior written permission.
15 #
16 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 # POSSIBILITY OF SUCH DAMAGE.
27 
28 ## \author Stuart Glaser
29 
30 import threading
31 import time
32 import uuid
33 
34 import rospy
35 
36 import BondSM_sm
37 
38 from bond.msg import Constants, Status
39 
40 
42  if not isinstance(d, rospy.Duration):
43  return rospy.Duration.from_sec(d)
44  return d
45 
46 
48  if isinstance(d, rospy.Duration):
49  return d.to_sec()
50  return d
51 
52 
53 ## Internal use only
54 class Timeout:
55  def __init__(self, duration, on_timeout=None):
56  self.duration = duration
57  self.timer = threading.Timer(0, self._on_timer)
58  self.timer.daemon = True
59  self.deadline = time.time()
60  self.on_timeout = on_timeout
61 
62  def reset(self):
63  self.timer.cancel()
64  self.timer = threading.Timer(self.duration.to_sec(), self._on_timer)
65  self.timer.daemon = True
66  self.timer.start()
67  self.deadline = time.time() + self.duration.to_sec()
68  return self
69 
70  def cancel(self):
71  self.timer.cancel()
72 
73  def left(self):
74  return max(rospy.Duration(0), rospy.Duration(self.deadline - time.time()))
75 
76  def _on_timer(self):
77  if self.on_timeout:
78  self.on_timeout()
79 
80 
81 ## \brief Forms a bond to monitor another process.
82 #
83 # The Bond class implements a bond, allowing you to monitor another
84 # process and be notified when it dies. In turn, it will be notified
85 # when you die.
86 class Bond(object):
87  ## \brief Constructs a bond, but does not connect.
88  #
89  # \param topic The topic used to exchange the bond status messages.
90  # \param id The ID of the bond, which should match the ID used on
91  # the sister's end
92  # \param on_broken callback that will be called when the bond is broken.
93  # \param on_formed callback that will be called when the bond is formed.
94  def __init__(self, topic, id, on_broken=None, on_formed=None):
95  self.__started = False
96  self.topic = topic
97  self.id = id
98  self.instance_id = str(uuid.uuid4())
99  self.sister_instance_id = None
100  self.on_broken = on_broken
101  self.on_formed = on_formed
102  self.is_shutdown = False
103  self.sister_died_first = False
104  # Timeout for wait_until_formed
105  self.deadline = None
106 
107  # Callbacks must be called outside of the locks and after the
108  # state machine finishes transitioning.
110 
111  self.sm = BondSM_sm.BondSM_sm(self)
112  # self.sm.setDebugFlag(True)
113  self.lock = threading.RLock()
114  self.condition = threading.Condition(self.lock)
115 
116  # Sets the default timeout values
117  self.connect_timeout = Constants.DEFAULT_CONNECT_TIMEOUT
118  self.heartbeat_timeout = Constants.DEFAULT_HEARTBEAT_TIMEOUT
119  self.disconnect_timeout = Constants.DEFAULT_DISCONNECT_TIMEOUT
120  self.heartbeat_period = Constants.DEFAULT_HEARTBEAT_PERIOD
121 
122  # queue_size 1 : avoid having a client receive backed up, potentially
123  # late heartbearts, discussion@https://github.com/ros/bond_core/pull/10
124  self.pub = rospy.Publisher(self.topic, Status, queue_size=1)
125 
127  return self.__connect_timeout
128 
129  def set_connect_timeout(self, dur):
130  assert not self.__started
131  self.__connect_timeout = dur
133  connect_timeout = property(get_connect_timeout, set_connect_timeout)
134 
136  return self.__heartbeat_timeout
137 
138  def set_heartbeat_timeout(self, dur):
139  assert not self.__started
142  heartbeat_timeout = property(get_heartbeat_timeout, set_heartbeat_timeout)
143 
145  return self.__disconnect_timeout
146 
147  def set_disconnect_timeout(self, dur):
148  assert not self.__started
151  disconnect_timeout = property(get_disconnect_timeout, set_disconnect_timeout)
152 
154  return self.__heartbeat_period
155 
156  def set_heartbeat_period(self, per):
157  assert not self.__started
159  heartbeat_period = property(get_heartbeat_period, set_heartbeat_period)
160 
161  ## \brief Starts the bond and connects to the sister process.
162  def start(self):
163  with self.lock:
164  self.connect_timer.reset()
165  self.sub = rospy.Subscriber(self.topic, Status, self._on_bond_status)
166 
167  self.thread = threading.Thread(target=self._publishing_thread)
168  self.thread.daemon = True
169  self.thread.start()
170  self.__started = True
171 
173  with self.lock:
174  self.sm.ConnectTimeout()
176 
178  # Checks that heartbeat timeouts haven't been disabled globally
179  disable_heartbeat_timeout = rospy.get_param(
180  Constants.DISABLE_HEARTBEAT_TIMEOUT_PARAM, False)
181  if disable_heartbeat_timeout:
182  rospy.logwarn(
183  "Heartbeat timeout is disabled. Not breaking bond (topic: %s, id: %s)" %
184  (self.topic, self.id))
185  return
186 
187  with self.lock:
188  self.sm.HeartbeatTimeout()
190 
192  with self.lock:
193  self.sm.DisconnectTimeout()
195 
196  def __del__(self):
197  self.shutdown()
198 
199  def shutdown(self):
200  if not self.is_shutdown:
201  self.sub.unregister()
202  with self.lock:
203  self.is_shutdown = True
204  if self.sm.getState().getName() != 'SM.Dead':
205  self.break_bond()
206  self.pub.unregister()
207  self.condition.notify_all()
208  self.connect_timer.cancel()
209  if self.deadline:
210  self.deadline.cancel()
211 
212  def _on_bond_status(self, msg):
213  # Filters out messages from other bonds and messages from ourself
214  if msg.id == self.id and msg.instance_id != self.instance_id:
215  with self.lock:
216  if not self.sister_instance_id:
217  self.sister_instance_id = msg.instance_id
218 
219  if msg.instance_id != self.sister_instance_id:
220  rospy.logerr(
221  "More than two locations are trying to use a single bond (topic: %s, id: %s). " +
222  "You should only instantiate at most two bond instances for each (topic, id) pair." %
223  (self.topic, self.id))
224  return
225 
226  if msg.active:
227  self.sm.SisterAlive()
228  else:
229  self.sm.SisterDead()
230 
231  # Immediate ack for sister's death notification
232  if self.sister_died_first:
233  self._publish(False)
235 
236  def _publish(self, active):
237  msg = Status()
238  msg.header.stamp = rospy.Time.now()
239  msg.id = self.id
240  msg.instance_id = self.instance_id
241  msg.active = active
242  msg.heartbeat_timeout = self.heartbeat_timeout
243  msg.heartbeat_period = self.heartbeat_period
244  self.pub.publish(msg)
245 
247  with self.lock:
248  # Publishing ALIVE
249  while not self.is_shutdown and self.sm.getState().getName() in ['SM.WaitingForSister', 'SM.Alive']:
250  self._publish(True)
251  self.condition.wait(self.heartbeat_period)
252 
253  # Publishing DEAD
254  while not self.is_shutdown and self.sm.getState().getName() == 'SM.AwaitSisterDeath':
255  self._publish(False)
256  self.condition.wait(Constants.DEAD_PUBLISH_PERIOD)
257 
259  callbacks = []
260  with self.lock:
261  callbacks = self.pending_callbacks
262  self.pending_callbacks = []
263  for c in callbacks:
264  c()
265 
266  ## \brief INTERNAL
267  def Connected(self):
268  self.connect_timer.cancel()
269  self.condition.notify_all()
270  if self.on_formed:
271  self.pending_callbacks.append(self.on_formed)
272 
273  ## \brief INTERNAL
274  def Heartbeat(self):
275  self.heartbeat_timer.reset()
276 
277  ## \brief INTERNAL
278  def SisterDied(self):
279  self.sister_died_first = True
280 
281  ## \brief INTERNAL
282  def Death(self):
283  self.condition.notify_all()
284  self.heartbeat_timer.cancel()
285  self.disconnect_timer.cancel()
286  if self.on_broken:
287  self.pending_callbacks.append(self.on_broken)
288 
289  ## \brief INTERNAL
290  def StartDying(self):
291  self.heartbeat_timer.cancel()
292  self.disconnect_timer.reset()
293 
294  ## \brief Sets the formed callback
295  def set_formed_callback(self, on_formed):
296  with self.lock:
297  self.on_formed = on_formed
298 
299  ## \brief Sets the broken callback
300  def set_broken_callback(self, on_broken):
301  with self.lock:
302  self.on_broken = on_broken
303 
304  ## \brief Blocks until the bond is formed for at most 'duration'.
305  #
306  # \param timeout Maximum duration to wait. If None then this call will not timeout.
307  # \return true iff the bond has been formed.
308  def wait_until_formed(self, timeout=None):
309  if self.deadline:
310  self.deadline.cancel()
311  self.deadline = None
312  if timeout:
313  self.deadline = Timeout(timeout).reset()
314  with self.lock:
315  while self.sm.getState().getName() == 'SM.WaitingForSister':
316  if rospy.is_shutdown():
317  break
318  if self.deadline and self.deadline.left() == rospy.Duration(0):
319  break
320  wait_duration = 0.1
321  if self.deadline:
322  wait_duration = min(wait_duration, self.deadline.left().to_sec())
323  self.condition.wait(wait_duration)
324  return self.sm.getState().getName() != 'SM.WaitingForSister'
325 
326  ## \brief Blocks until the bond is broken for at most 'duration'.
327  #
328  # \param timeout Maximum duration to wait. If None then this call will not timeout.
329  # \return true iff the bond has been broken, even if it has never been formed.
330  def wait_until_broken(self, timeout=None):
331  if self.deadline:
332  self.deadline.cancel()
333  self.deadline = None
334  if timeout:
335  self.deadline = Timeout(timeout).reset()
336  with self.lock:
337  while self.sm.getState().getName() != 'SM.Dead':
338  if rospy.is_shutdown():
339  break
340  if self.deadline and self.deadline.left() == rospy.Duration(0):
341  break
342  wait_duration = 0.1
343  if self.deadline:
344  wait_duration = min(wait_duration, self.deadline.left().to_sec())
345  self.condition.wait(wait_duration)
346  return self.sm.getState().getName() == 'SM.Dead'
347 
348  ## \brief Indicates if the bond is broken
349  # \return true iff the bond has been broken.
350  def is_broken(self):
351  with self.lock:
352  return self.sm.getState().getName() == 'SM.Dead'
353 
354  ## \brief Breaks the bond, notifying the other process.
355  def break_bond(self):
356  with self.lock:
357  self.sm.Die()
358  self._publish(False)
360 
361  def __repr__(self):
362  return "[Bond %s, Instance %s (%s)]" % \
363  (self.id, self.instance_id, self.sm.getState().getName())
def wait_until_formed(self, timeout=None)
Blocks until the bond is formed for at most 'duration'.
Definition: bondpy.py:308
def shutdown(self)
Definition: bondpy.py:199
def Connected(self)
INTERNAL.
Definition: bondpy.py:267
def _on_bond_status(self, msg)
Definition: bondpy.py:212
def get_disconnect_timeout(self)
Definition: bondpy.py:144
def set_disconnect_timeout(self, dur)
Definition: bondpy.py:147
def __repr__(self)
Definition: bondpy.py:361
def cancel(self)
Definition: bondpy.py:70
def StartDying(self)
INTERNAL.
Definition: bondpy.py:290
def get_connect_timeout(self)
Definition: bondpy.py:126
def get_heartbeat_period(self)
Definition: bondpy.py:153
def _on_heartbeat_timeout(self)
Definition: bondpy.py:177
def as_float_duration(d)
Definition: bondpy.py:47
def get_heartbeat_timeout(self)
Definition: bondpy.py:135
def wait_until_broken(self, timeout=None)
Blocks until the bond is broken for at most 'duration'.
Definition: bondpy.py:330
def as_ros_duration(d)
Definition: bondpy.py:41
def set_heartbeat_timeout(self, dur)
Definition: bondpy.py:138
def is_broken(self)
Indicates if the bond is broken.
Definition: bondpy.py:350
def __init__(self, duration, on_timeout=None)
Definition: bondpy.py:55
def reset(self)
Definition: bondpy.py:62
def _publishing_thread(self)
Definition: bondpy.py:246
def Death(self)
INTERNAL.
Definition: bondpy.py:282
def __init__(self, topic, id, on_broken=None, on_formed=None)
Constructs a bond, but does not connect.
Definition: bondpy.py:94
def _on_disconnect_timeout(self)
Definition: bondpy.py:191
def set_broken_callback(self, on_broken)
Sets the broken callback.
Definition: bondpy.py:300
def left(self)
Definition: bondpy.py:73
def set_connect_timeout(self, dur)
Definition: bondpy.py:129
Internal use only.
Definition: bondpy.py:54
def _flush_pending_callbacks(self)
Definition: bondpy.py:258
def _on_timer(self)
Definition: bondpy.py:76
Forms a bond to monitor another process.
Definition: bondpy.py:86
def start(self)
Starts the bond and connects to the sister process.
Definition: bondpy.py:162
def _publish(self, active)
Definition: bondpy.py:236
def break_bond(self)
Breaks the bond, notifying the other process.
Definition: bondpy.py:355
def set_heartbeat_period(self, per)
Definition: bondpy.py:156
def set_formed_callback(self, on_formed)
Sets the formed callback.
Definition: bondpy.py:295
def Heartbeat(self)
INTERNAL.
Definition: bondpy.py:274
def SisterDied(self)
INTERNAL.
Definition: bondpy.py:278
def _on_connect_timeout(self)
Definition: bondpy.py:172
def __del__(self)
Definition: bondpy.py:196


bondpy
Author(s): Stuart Glaser
autogenerated on Wed Mar 20 2019 07:55:37