bondpy.py
Go to the documentation of this file.
1 # Copyright (c) 2009, Willow Garage, Inc.
2 # All rights reserved.
3 #
4 # Redistribution and use in source and binary forms, with or without
5 # modification, are permitted provided that the following conditions are met:
6 #
7 # * Redistributions of source code must retain the above copyright
8 # notice, this list of conditions and the following disclaimer.
9 # * Redistributions in binary form must reproduce the above copyright
10 # notice, this list of conditions and the following disclaimer in the
11 # documentation and/or other materials provided with the distribution.
12 # * Neither the name of the Willow Garage, Inc. nor the names of its
13 # contributors may be used to endorse or promote products derived from
14 # this software without specific prior written permission.
15 #
16 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 # POSSIBILITY OF SUCH DAMAGE.
27 
28 ## \author Stuart Glaser
29 
30 import threading
31 import time
32 import uuid
33 
34 import rospy
35 
36 from . import BondSM_sm
37 
38 from bond.msg import Constants, Status
39 
40 
42  if not isinstance(d, rospy.Duration):
43  return rospy.Duration.from_sec(d)
44  return d
45 
46 
48  if isinstance(d, rospy.Duration):
49  return d.to_sec()
50  return d
51 
52 
53 ## Internal use only
54 class Timeout:
55  def __init__(self, duration, on_timeout=None):
56  self.duration = duration
57  self.timer = threading.Timer(0, self._on_timer)
58  self.timer.daemon = True
59  self.deadline = time.time()
60  self.on_timeout = on_timeout
61 
62  def reset(self):
63  self.timer.cancel()
64  self.timer = threading.Timer(self.duration.to_sec(), self._on_timer)
65  self.timer.daemon = True
66  self.timer.start()
67  self.deadline = time.time() + self.duration.to_sec()
68  return self
69 
70  def cancel(self):
71  self.timer.cancel()
72 
73  def left(self):
74  return max(rospy.Duration(0), rospy.Duration(self.deadline - time.time()))
75 
76  def _on_timer(self):
77  if self.on_timeout:
78  self.on_timeout()
79 
80 
81 ## \brief Forms a bond to monitor another process.
82 #
83 # The Bond class implements a bond, allowing you to monitor another
84 # process and be notified when it dies. In turn, it will be notified
85 # when you die.
86 class Bond(object):
87  ## \brief Constructs a bond, but does not connect.
88  #
89  # \param topic The topic used to exchange the bond status messages.
90  # \param id The ID of the bond, which should match the ID used on
91  # the sister's end
92  # \param on_broken callback that will be called when the bond is broken.
93  # \param on_formed callback that will be called when the bond is formed.
94  def __init__(self, topic, id, on_broken=None, on_formed=None):
95  self.__started = False
96  self.topic = topic
97  self.id = id
98  self.instance_id = str(uuid.uuid4())
99  self.sister_instance_id = None
100  self.on_broken = on_broken
101  self.on_formed = on_formed
102  self.is_shutdown = False
103  self.sister_died_first = False
104  # Timeout for wait_until_formed
105  self.deadline = None
106 
107  # Callbacks must be called outside of the locks and after the
108  # state machine finishes transitioning.
110 
111  self.sm = BondSM_sm.BondSM_sm(self)
112  # self.sm.setDebugFlag(True)
113  self.lock = threading.RLock()
114  self.condition = threading.Condition(self.lock)
115 
116  # Sets the default timeout values
117  self.connect_timeout = Constants.DEFAULT_CONNECT_TIMEOUT
118  self.heartbeat_timeout = Constants.DEFAULT_HEARTBEAT_TIMEOUT
119  self.disconnect_timeout = Constants.DEFAULT_DISCONNECT_TIMEOUT
120  self.heartbeat_period = Constants.DEFAULT_HEARTBEAT_PERIOD
121 
122  self.sub = None
123 
124  # queue_size 1 : avoid having a client receive backed up, potentially
125  # late heartbearts, discussion@https://github.com/ros/bond_core/pull/10
126  self.pub = rospy.Publisher(self.topic, Status, queue_size=1)
127 
129  return self.__connect_timeout
130 
131  def set_connect_timeout(self, dur):
132  assert not self.__started
133  self.__connect_timeout = dur
135  connect_timeout = property(get_connect_timeout, set_connect_timeout)
136 
138  return self.__heartbeat_timeout
139 
140  def set_heartbeat_timeout(self, dur):
141  assert not self.__started
144  heartbeat_timeout = property(get_heartbeat_timeout, set_heartbeat_timeout)
145 
147  return self.__disconnect_timeout
148 
149  def set_disconnect_timeout(self, dur):
150  assert not self.__started
153  disconnect_timeout = property(get_disconnect_timeout, set_disconnect_timeout)
154 
156  return self.__heartbeat_period
157 
158  def set_heartbeat_period(self, per):
159  assert not self.__started
161  heartbeat_period = property(get_heartbeat_period, set_heartbeat_period)
162 
163  ## \brief Starts the bond and connects to the sister process.
164  def start(self):
165  with self.lock:
166  self.connect_timer.reset()
167  self.sub = rospy.Subscriber(self.topic, Status, self._on_bond_status)
168 
169  self.thread = threading.Thread(target=self._publishing_thread)
170  self.thread.daemon = True
171  self.thread.start()
172  self.__started = True
173 
175  with self.lock:
176  self.sm.ConnectTimeout()
178 
180  # Checks that heartbeat timeouts haven't been disabled globally
181  disable_heartbeat_timeout = rospy.get_param(
182  Constants.DISABLE_HEARTBEAT_TIMEOUT_PARAM, False)
183  if disable_heartbeat_timeout:
184  rospy.logwarn(
185  "Heartbeat timeout is disabled. Not breaking bond (topic: %s, id: %s)" %
186  (self.topic, self.id))
187  return
188 
189  with self.lock:
190  self.sm.HeartbeatTimeout()
192 
194  with self.lock:
195  self.sm.DisconnectTimeout()
197 
198  def __del__(self):
199  self.shutdown()
200 
201  def shutdown(self):
202  if not self.is_shutdown:
203  if self.sub is not None:
204  self.sub.unregister()
205  with self.lock:
206  self.is_shutdown = True
207  if self.sm.getState().getName() != 'SM.Dead':
208  self.break_bond()
209  self.pub.unregister()
210  self.condition.notify_all()
211  self.connect_timer.cancel()
212  if self.deadline:
213  self.deadline.cancel()
214 
215  def _on_bond_status(self, msg):
216  # Filters out messages from other bonds and messages from ourself
217  if msg.id == self.id and msg.instance_id != self.instance_id:
218  with self.lock:
219  if not self.sister_instance_id:
220  self.sister_instance_id = msg.instance_id
221 
222  if msg.instance_id != self.sister_instance_id:
223  rospy.logerr(
224  "More than two locations are trying to use a single bond (topic: %s, id: %s). " +
225  "You should only instantiate at most two bond instances for each (topic, id) pair." %
226  (self.topic, self.id))
227  return
228 
229  if msg.active:
230  self.sm.SisterAlive()
231  else:
232  self.sm.SisterDead()
233 
234  # Immediate ack for sister's death notification
235  if self.sister_died_first:
236  self._publish(False)
238 
239  def _publish(self, active):
240  msg = Status()
241  msg.header.stamp = rospy.Time.now()
242  msg.id = self.id
243  msg.instance_id = self.instance_id
244  msg.active = active
245  msg.heartbeat_timeout = self.heartbeat_timeout
246  msg.heartbeat_period = self.heartbeat_period
247  self.pub.publish(msg)
248 
250  with self.lock:
251  # Publishing ALIVE
252  while not self.is_shutdown and self.sm.getState().getName() in ['SM.WaitingForSister', 'SM.Alive']:
253  self._publish(True)
254  self.condition.wait(self.heartbeat_period)
255 
256  # Publishing DEAD
257  while not self.is_shutdown and self.sm.getState().getName() == 'SM.AwaitSisterDeath':
258  self._publish(False)
259  self.condition.wait(Constants.DEAD_PUBLISH_PERIOD)
260 
262  callbacks = []
263  with self.lock:
264  callbacks = self.pending_callbacks
265  self.pending_callbacks = []
266  for c in callbacks:
267  c()
268 
269  ## \brief INTERNAL
270  def Connected(self):
271  self.connect_timer.cancel()
272  self.condition.notify_all()
273  if self.on_formed:
274  self.pending_callbacks.append(self.on_formed)
275 
276  ## \brief INTERNAL
277  def Heartbeat(self):
278  self.heartbeat_timer.reset()
279 
280  ## \brief INTERNAL
281  def SisterDied(self):
282  self.sister_died_first = True
283 
284  ## \brief INTERNAL
285  def Death(self):
286  self.condition.notify_all()
287  self.heartbeat_timer.cancel()
288  self.disconnect_timer.cancel()
289  if self.on_broken:
290  self.pending_callbacks.append(self.on_broken)
291 
292  ## \brief INTERNAL
293  def StartDying(self):
294  self.heartbeat_timer.cancel()
295  self.disconnect_timer.reset()
296 
297  ## \brief Sets the formed callback
298  def set_formed_callback(self, on_formed):
299  with self.lock:
300  self.on_formed = on_formed
301 
302  ## \brief Sets the broken callback
303  def set_broken_callback(self, on_broken):
304  with self.lock:
305  self.on_broken = on_broken
306 
307  ## \brief Blocks until the bond is formed for at most 'duration'.
308  #
309  # \param timeout Maximum duration to wait. If None then this call will not timeout.
310  # \return true iff the bond has been formed.
311  def wait_until_formed(self, timeout=None):
312  if self.deadline:
313  self.deadline.cancel()
314  self.deadline = None
315  if timeout:
316  self.deadline = Timeout(timeout).reset()
317  with self.lock:
318  while self.sm.getState().getName() == 'SM.WaitingForSister':
319  if rospy.is_shutdown():
320  break
321  if self.deadline and self.deadline.left() == rospy.Duration(0):
322  break
323  wait_duration = 0.1
324  if self.deadline:
325  wait_duration = min(wait_duration, self.deadline.left().to_sec())
326  self.condition.wait(wait_duration)
327  return self.sm.getState().getName() != 'SM.WaitingForSister'
328 
329  ## \brief Blocks until the bond is broken for at most 'duration'.
330  #
331  # \param timeout Maximum duration to wait. If None then this call will not timeout.
332  # \return true iff the bond has been broken, even if it has never been formed.
333  def wait_until_broken(self, timeout=None):
334  if self.deadline:
335  self.deadline.cancel()
336  self.deadline = None
337  if timeout:
338  self.deadline = Timeout(timeout).reset()
339  with self.lock:
340  while self.sm.getState().getName() != 'SM.Dead':
341  if rospy.is_shutdown():
342  break
343  if self.deadline and self.deadline.left() == rospy.Duration(0):
344  break
345  wait_duration = 0.1
346  if self.deadline:
347  wait_duration = min(wait_duration, self.deadline.left().to_sec())
348  self.condition.wait(wait_duration)
349  return self.sm.getState().getName() == 'SM.Dead'
350 
351  ## \brief Indicates if the bond is broken
352  # \return true iff the bond has been broken.
353  def is_broken(self):
354  with self.lock:
355  return self.sm.getState().getName() == 'SM.Dead'
356 
357  ## \brief Breaks the bond, notifying the other process.
358  def break_bond(self):
359  with self.lock:
360  self.sm.Die()
361  self._publish(False)
363 
364  def __repr__(self):
365  return "[Bond %s, Instance %s (%s)]" % \
366  (self.id, self.instance_id, self.sm.getState().getName())
def wait_until_formed(self, timeout=None)
Blocks until the bond is formed for at most 'duration'.
Definition: bondpy.py:311
def shutdown(self)
Definition: bondpy.py:201
def Connected(self)
INTERNAL.
Definition: bondpy.py:270
def _on_bond_status(self, msg)
Definition: bondpy.py:215
def get_disconnect_timeout(self)
Definition: bondpy.py:146
def set_disconnect_timeout(self, dur)
Definition: bondpy.py:149
def __repr__(self)
Definition: bondpy.py:364
def cancel(self)
Definition: bondpy.py:70
def StartDying(self)
INTERNAL.
Definition: bondpy.py:293
def get_connect_timeout(self)
Definition: bondpy.py:128
def get_heartbeat_period(self)
Definition: bondpy.py:155
def _on_heartbeat_timeout(self)
Definition: bondpy.py:179
def as_float_duration(d)
Definition: bondpy.py:47
def get_heartbeat_timeout(self)
Definition: bondpy.py:137
def wait_until_broken(self, timeout=None)
Blocks until the bond is broken for at most 'duration'.
Definition: bondpy.py:333
def as_ros_duration(d)
Definition: bondpy.py:41
def set_heartbeat_timeout(self, dur)
Definition: bondpy.py:140
def is_broken(self)
Indicates if the bond is broken.
Definition: bondpy.py:353
def __init__(self, duration, on_timeout=None)
Definition: bondpy.py:55
def reset(self)
Definition: bondpy.py:62
def _publishing_thread(self)
Definition: bondpy.py:249
def Death(self)
INTERNAL.
Definition: bondpy.py:285
def __init__(self, topic, id, on_broken=None, on_formed=None)
Constructs a bond, but does not connect.
Definition: bondpy.py:94
def _on_disconnect_timeout(self)
Definition: bondpy.py:193
def set_broken_callback(self, on_broken)
Sets the broken callback.
Definition: bondpy.py:303
def left(self)
Definition: bondpy.py:73
def set_connect_timeout(self, dur)
Definition: bondpy.py:131
Internal use only.
Definition: bondpy.py:54
def _flush_pending_callbacks(self)
Definition: bondpy.py:261
def _on_timer(self)
Definition: bondpy.py:76
Forms a bond to monitor another process.
Definition: bondpy.py:86
def start(self)
Starts the bond and connects to the sister process.
Definition: bondpy.py:164
def _publish(self, active)
Definition: bondpy.py:239
def break_bond(self)
Breaks the bond, notifying the other process.
Definition: bondpy.py:358
def set_heartbeat_period(self, per)
Definition: bondpy.py:158
def set_formed_callback(self, on_formed)
Sets the formed callback.
Definition: bondpy.py:298
def Heartbeat(self)
INTERNAL.
Definition: bondpy.py:277
def SisterDied(self)
INTERNAL.
Definition: bondpy.py:281
def _on_connect_timeout(self)
Definition: bondpy.py:174
def __del__(self)
Definition: bondpy.py:198


bondpy
Author(s): Stuart Glaser
autogenerated on Wed Sep 2 2020 03:40:45