bondpy.py
Go to the documentation of this file.
1 # Copyright (c) 2009, Willow Garage, Inc.
2 # All rights reserved.
3 #
4 # Redistribution and use in source and binary forms, with or without
5 # modification, are permitted provided that the following conditions are met:
6 #
7 # * Redistributions of source code must retain the above copyright
8 # notice, this list of conditions and the following disclaimer.
9 # * Redistributions in binary form must reproduce the above copyright
10 # notice, this list of conditions and the following disclaimer in the
11 # documentation and/or other materials provided with the distribution.
12 # * Neither the name of the Willow Garage, Inc. nor the names of its
13 # contributors may be used to endorse or promote products derived from
14 # this software without specific prior written permission.
15 #
16 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 # POSSIBILITY OF SUCH DAMAGE.
27 
28 
29 
30 import threading
31 import time
32 import uuid
33 
34 import rospy
35 
36 from . import BondSM_sm
37 
38 from bond.msg import Constants, Status
39 
40 
42  if not isinstance(d, rospy.Duration):
43  return rospy.Duration.from_sec(d)
44  return d
45 
46 
48  if isinstance(d, rospy.Duration):
49  return d.to_sec()
50  return d
51 
52 
53 
54 class Timeout:
55  def __init__(self, duration, on_timeout=None):
56  self.duration = duration
57  self.timer = threading.Timer(0, self._on_timer)
58  self.timer.daemon = True
59  self.deadline = time.time()
60  self.on_timeout = on_timeout
61 
62  def reset(self):
63  self.timer.cancel()
64  self.timer = threading.Timer(self.duration.to_sec(), self._on_timer)
65  self.timer.daemon = True
66  self.timer.start()
67  self.deadline = time.time() + self.duration.to_sec()
68  return self
69 
70  def cancel(self):
71  self.timer.cancel()
72 
73  def left(self):
74  return max(rospy.Duration(0), rospy.Duration(self.deadline - time.time()))
75 
76  def _on_timer(self):
77  if self.on_timeout:
78  self.on_timeout()
79 
80 
81 
86 class Bond(object):
87 
94  def __init__(self, topic, id, on_broken=None, on_formed=None):
95  self.__started = False
96  self.topic = topic
97  self.id = id
98  self.instance_id = str(uuid.uuid4())
99  self.sister_instance_id = None
100  self.on_broken = on_broken
101  self.on_formed = on_formed
102  self.is_shutdown = False
103  self.sister_died_first = False
104  # Timeout for wait_until_formed
105  self.deadline = None
106 
107  # Callbacks must be called outside of the locks and after the
108  # state machine finishes transitioning.
110 
111  self.sm = BondSM_sm.BondSM_sm(self)
112  # self.sm.setDebugFlag(True)
113  self.lock = threading.RLock()
114  self.condition = threading.Condition(self.lock)
115 
116  # Sets the default timeout values
117  self.connect_timeout = Constants.DEFAULT_CONNECT_TIMEOUT
118  self.heartbeat_timeout = Constants.DEFAULT_HEARTBEAT_TIMEOUT
119  self.disconnect_timeout = Constants.DEFAULT_DISCONNECT_TIMEOUT
120  self.heartbeat_period = Constants.DEFAULT_HEARTBEAT_PERIOD
121 
122  self.sub = None
123 
124  # queue_size 1 : avoid having a client receive backed up, potentially
125  # late heartbearts, discussion@https://github.com/ros/bond_core/pull/10
126  self.pub = rospy.Publisher(self.topic, Status, queue_size=1)
127 
129  return self.__connect_timeout
130 
131  def set_connect_timeout(self, dur):
132  assert not self.__started
133  self.__connect_timeout = dur
135  connect_timeout = property(get_connect_timeout, set_connect_timeout)
136 
138  return self.__heartbeat_timeout
139 
140  def set_heartbeat_timeout(self, dur):
141  assert not self.__started
144  heartbeat_timeout = property(get_heartbeat_timeout, set_heartbeat_timeout)
145 
147  return self.__disconnect_timeout
148 
149  def set_disconnect_timeout(self, dur):
150  assert not self.__started
153  disconnect_timeout = property(get_disconnect_timeout, set_disconnect_timeout)
154 
156  return self.__heartbeat_period
157 
158  def set_heartbeat_period(self, per):
159  assert not self.__started
161  heartbeat_period = property(get_heartbeat_period, set_heartbeat_period)
162 
163 
164  def start(self):
165  with self.lock:
166  self.connect_timer.reset()
167  self.sub = rospy.Subscriber(self.topic, Status, self._on_bond_status)
168 
169  self.thread = threading.Thread(target=self._publishing_thread)
170  self.thread.daemon = True
171  self.thread.start()
172  self.__started = True
173 
175  with self.lock:
176  self.sm.ConnectTimeout()
178 
180  # Checks that heartbeat timeouts haven't been disabled globally
181  disable_heartbeat_timeout = rospy.get_param(
182  Constants.DISABLE_HEARTBEAT_TIMEOUT_PARAM, False)
183  if disable_heartbeat_timeout:
184  rospy.logwarn(
185  "Heartbeat timeout is disabled. Not breaking bond (topic: %s, id: %s)" %
186  (self.topic, self.id))
187  return
188 
189  with self.lock:
190  self.sm.HeartbeatTimeout()
192 
194  with self.lock:
195  self.sm.DisconnectTimeout()
197 
198  def __del__(self):
199  self.shutdown()
200 
201  def shutdown(self):
202  if not self.is_shutdown:
203  if self.sub is not None:
204  self.sub.unregister()
205  with self.lock:
206  self.is_shutdown = True
207  if self.sm.getState().getName() != 'SM.Dead':
208  self.break_bond()
209  self.pub.unregister()
210  self.condition.notify_all()
211  self.connect_timer.cancel()
212  if self.deadline:
213  self.deadline.cancel()
214 
215  def _on_bond_status(self, msg):
216  # Filters out messages from other bonds and messages from ourself
217  if msg.id == self.id and msg.instance_id != self.instance_id:
218  with self.lock:
219  if not self.sister_instance_id:
220  self.sister_instance_id = msg.instance_id
221 
222  if msg.instance_id != self.sister_instance_id:
223  rospy.logerr(
224  "More than two locations are trying to use a single bond (topic: %s, id: %s). " +
225  "You should only instantiate at most two bond instances for each (topic, id) pair." %
226  (self.topic, self.id))
227  return
228 
229  if msg.active:
230  self.sm.SisterAlive()
231  else:
232  self.sm.SisterDead()
233 
234  # Immediate ack for sister's death notification
235  if self.sister_died_first:
236  self._publish(False)
238 
239  def _publish(self, active):
240  msg = Status()
241  msg.header.stamp = rospy.Time.now()
242  msg.id = self.id
243  msg.instance_id = self.instance_id
244  msg.active = active
245  msg.heartbeat_timeout = self.heartbeat_timeout
246  msg.heartbeat_period = self.heartbeat_period
247  self.pub.publish(msg)
248 
250  with self.lock:
251  # Publishing ALIVE
252  while not self.is_shutdown and self.sm.getState().getName() in ['SM.WaitingForSister', 'SM.Alive']:
253  self._publish(True)
254  self.condition.wait(self.heartbeat_period)
255 
256  # Publishing DEAD
257  while not self.is_shutdown and self.sm.getState().getName() == 'SM.AwaitSisterDeath':
258  self._publish(False)
259  self.condition.wait(Constants.DEAD_PUBLISH_PERIOD)
260 
262  callbacks = []
263  with self.lock:
264  callbacks = self.pending_callbacks
265  self.pending_callbacks = []
266  for c in callbacks:
267  c()
268 
269 
270  def Connected(self):
271  self.connect_timer.cancel()
272  self.condition.notify_all()
273  if self.on_formed:
274  self.pending_callbacks.append(self.on_formed)
275 
276 
277  def Heartbeat(self):
278  self.heartbeat_timer.reset()
279 
280 
281  def SisterDied(self):
282  self.sister_died_first = True
283 
284 
285  def Death(self):
286  self.condition.notify_all()
287  self.heartbeat_timer.cancel()
288  self.disconnect_timer.cancel()
289  if self.on_broken:
290  self.pending_callbacks.append(self.on_broken)
291 
292 
293  def StartDying(self):
294  self.heartbeat_timer.cancel()
295  self.disconnect_timer.reset()
296 
297 
298  def set_formed_callback(self, on_formed):
299  with self.lock:
300  self.on_formed = on_formed
301 
302 
303  def set_broken_callback(self, on_broken):
304  with self.lock:
305  self.on_broken = on_broken
306 
307 
311  def wait_until_formed(self, timeout=None):
312  if self.deadline:
313  self.deadline.cancel()
314  self.deadline = None
315  if timeout:
316  self.deadline = Timeout(timeout).reset()
317  with self.lock:
318  while self.sm.getState().getName() == 'SM.WaitingForSister':
319  if rospy.is_shutdown():
320  break
321  if self.deadline and self.deadline.left() == rospy.Duration(0):
322  break
323  wait_duration = 0.1
324  if self.deadline:
325  wait_duration = min(wait_duration, self.deadline.left().to_sec())
326  self.condition.wait(wait_duration)
327  return self.sm.getState().getName() != 'SM.WaitingForSister'
328 
329 
333  def wait_until_broken(self, timeout=None):
334  if self.deadline:
335  self.deadline.cancel()
336  self.deadline = None
337  if timeout:
338  self.deadline = Timeout(timeout).reset()
339  with self.lock:
340  while self.sm.getState().getName() != 'SM.Dead':
341  if rospy.is_shutdown():
342  break
343  if self.deadline and self.deadline.left() == rospy.Duration(0):
344  break
345  wait_duration = 0.1
346  if self.deadline:
347  wait_duration = min(wait_duration, self.deadline.left().to_sec())
348  self.condition.wait(wait_duration)
349  return self.sm.getState().getName() == 'SM.Dead'
350 
351 
353  def is_broken(self):
354  with self.lock:
355  return self.sm.getState().getName() == 'SM.Dead'
356 
357 
358  def break_bond(self):
359  with self.lock:
360  self.sm.Die()
361  self._publish(False)
363 
364  def __repr__(self):
365  return "[Bond %s, Instance %s (%s)]" % \
366  (self.id, self.instance_id, self.sm.getState().getName())
bondpy.bondpy.Timeout.reset
def reset(self)
Definition: bondpy.py:62
bondpy.bondpy.Bond._on_connect_timeout
def _on_connect_timeout(self)
Definition: bondpy.py:174
bondpy.bondpy.Bond.heartbeat_period
heartbeat_period
Definition: bondpy.py:161
bondpy.bondpy.Timeout.duration
duration
Definition: bondpy.py:56
bondpy.BondSM_sm.BondSM_sm
Definition: BondSM_sm.py:214
bondpy.bondpy.Bond.__disconnect_timeout
__disconnect_timeout
Definition: bondpy.py:151
bondpy.bondpy.Bond.instance_id
instance_id
Definition: bondpy.py:98
bondpy.bondpy.Bond.set_disconnect_timeout
def set_disconnect_timeout(self, dur)
Definition: bondpy.py:149
bondpy.bondpy.Bond.sm
sm
Definition: bondpy.py:111
bondpy.bondpy.Bond.start
def start(self)
Starts the bond and connects to the sister process.
Definition: bondpy.py:164
bondpy.bondpy.Bond.deadline
deadline
Definition: bondpy.py:105
bondpy.bondpy.Bond.StartDying
def StartDying(self)
INTERNAL.
Definition: bondpy.py:293
bondpy.bondpy.Bond._on_bond_status
def _on_bond_status(self, msg)
Definition: bondpy.py:215
bondpy.bondpy.Timeout.__init__
def __init__(self, duration, on_timeout=None)
Definition: bondpy.py:55
bondpy.bondpy.Timeout.on_timeout
on_timeout
Definition: bondpy.py:60
bondpy.bondpy.Bond.condition
condition
Definition: bondpy.py:114
bondpy.bondpy.Bond.__started
__started
Definition: bondpy.py:95
bondpy.bondpy.Bond.Death
def Death(self)
INTERNAL.
Definition: bondpy.py:285
bondpy.bondpy.Bond.__repr__
def __repr__(self)
Definition: bondpy.py:364
bondpy.bondpy.Bond._publish
def _publish(self, active)
Definition: bondpy.py:239
bondpy.bondpy.Timeout.left
def left(self)
Definition: bondpy.py:73
bondpy.bondpy.Bond.__heartbeat_timeout
__heartbeat_timeout
Definition: bondpy.py:142
bondpy.bondpy.Bond.set_broken_callback
def set_broken_callback(self, on_broken)
Sets the broken callback.
Definition: bondpy.py:303
bondpy.bondpy.Bond.pending_callbacks
pending_callbacks
Definition: bondpy.py:109
bondpy.bondpy.Bond.connect_timer
connect_timer
Definition: bondpy.py:134
bondpy.bondpy.as_ros_duration
def as_ros_duration(d)
Definition: bondpy.py:41
bondpy.bondpy.Bond.lock
lock
Definition: bondpy.py:113
bondpy.bondpy.Bond.set_heartbeat_period
def set_heartbeat_period(self, per)
Definition: bondpy.py:158
bondpy.bondpy.Bond.heartbeat_timer
heartbeat_timer
Definition: bondpy.py:143
bondpy.bondpy.Bond.sister_instance_id
sister_instance_id
Definition: bondpy.py:99
bondpy.bondpy.Bond.set_formed_callback
def set_formed_callback(self, on_formed)
Sets the formed callback.
Definition: bondpy.py:298
bondpy.bondpy.Bond.heartbeat_timeout
heartbeat_timeout
Definition: bondpy.py:144
bondpy.bondpy.Bond.Heartbeat
def Heartbeat(self)
INTERNAL.
Definition: bondpy.py:277
bondpy.bondpy.Bond.wait_until_formed
def wait_until_formed(self, timeout=None)
Blocks until the bond is formed for at most 'duration'.
Definition: bondpy.py:311
bondpy.bondpy.Bond.is_broken
def is_broken(self)
Indicates if the bond is broken.
Definition: bondpy.py:353
bondpy.bondpy.Bond.pub
pub
Definition: bondpy.py:126
bondpy.bondpy.Bond.SisterDied
def SisterDied(self)
INTERNAL.
Definition: bondpy.py:281
bondpy.bondpy.Bond.break_bond
def break_bond(self)
Breaks the bond, notifying the other process.
Definition: bondpy.py:358
bondpy.bondpy.Bond._publishing_thread
def _publishing_thread(self)
Definition: bondpy.py:249
bondpy.bondpy.Bond.disconnect_timer
disconnect_timer
Definition: bondpy.py:152
bondpy.bondpy.Bond.sub
sub
Definition: bondpy.py:122
bondpy.bondpy.Timeout.deadline
deadline
Definition: bondpy.py:59
bondpy.bondpy.Bond.wait_until_broken
def wait_until_broken(self, timeout=None)
Blocks until the bond is broken for at most 'duration'.
Definition: bondpy.py:333
bondpy.bondpy.Bond
Forms a bond to monitor another process.
Definition: bondpy.py:86
bondpy.bondpy.Bond.__init__
def __init__(self, topic, id, on_broken=None, on_formed=None)
Constructs a bond, but does not connect.
Definition: bondpy.py:94
bondpy.bondpy.Bond.topic
topic
Definition: bondpy.py:96
bondpy.bondpy.Bond.disconnect_timeout
disconnect_timeout
Definition: bondpy.py:153
bondpy.bondpy.Bond.sister_died_first
sister_died_first
Definition: bondpy.py:103
bondpy.bondpy.Bond.__del__
def __del__(self)
Definition: bondpy.py:198
bondpy.bondpy.Bond.set_connect_timeout
def set_connect_timeout(self, dur)
Definition: bondpy.py:131
bondpy.bondpy.Timeout.timer
timer
Definition: bondpy.py:57
bondpy.bondpy.Bond.connect_timeout
connect_timeout
Definition: bondpy.py:135
bondpy.bondpy.Bond.__connect_timeout
__connect_timeout
Definition: bondpy.py:133
bondpy.bondpy.Timeout.cancel
def cancel(self)
Definition: bondpy.py:70
bondpy.bondpy.Bond.is_shutdown
is_shutdown
Definition: bondpy.py:102
bondpy.bondpy.Bond._on_heartbeat_timeout
def _on_heartbeat_timeout(self)
Definition: bondpy.py:179
bondpy.bondpy.Bond.get_disconnect_timeout
def get_disconnect_timeout(self)
Definition: bondpy.py:146
bondpy.bondpy.Bond.get_heartbeat_period
def get_heartbeat_period(self)
Definition: bondpy.py:155
bondpy.bondpy.Bond.thread
thread
Definition: bondpy.py:169
bondpy.bondpy.Bond._on_disconnect_timeout
def _on_disconnect_timeout(self)
Definition: bondpy.py:193
bondpy.bondpy.Bond.get_heartbeat_timeout
def get_heartbeat_timeout(self)
Definition: bondpy.py:137
bondpy.bondpy.Bond.id
id
Definition: bondpy.py:97
bondpy.bondpy.Bond.on_formed
on_formed
Definition: bondpy.py:101
bondpy.bondpy.Bond._flush_pending_callbacks
def _flush_pending_callbacks(self)
Definition: bondpy.py:261
bondpy.bondpy.Timeout._on_timer
def _on_timer(self)
Definition: bondpy.py:76
bondpy.bondpy.as_float_duration
def as_float_duration(d)
Definition: bondpy.py:47
bondpy.bondpy.Bond.shutdown
def shutdown(self)
Definition: bondpy.py:201
bondpy.bondpy.Timeout
Internal use only.
Definition: bondpy.py:54
bondpy.bondpy.Bond.set_heartbeat_timeout
def set_heartbeat_timeout(self, dur)
Definition: bondpy.py:140
bondpy.bondpy.Bond.get_connect_timeout
def get_connect_timeout(self)
Definition: bondpy.py:128
bondpy.bondpy.Bond.on_broken
on_broken
Definition: bondpy.py:100
bondpy.bondpy.Bond.__heartbeat_period
__heartbeat_period
Definition: bondpy.py:160
bondpy.bondpy.Bond.Connected
def Connected(self)
INTERNAL.
Definition: bondpy.py:270


bondpy
Author(s): Stuart Glaser
autogenerated on Tue Mar 1 2022 23:53:38