Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 import sys
00031 import threading
00032 import time
00033 import uuid
00034
00035 import roslib; roslib.load_manifest('bondpy')
00036 import rospy
00037 from bond.msg import *
00038
00039 import BondSM_sm
00040
00041 def as_ros_duration(d):
00042 if not isinstance(d, roslib.rostime.Duration):
00043 return rospy.Duration.from_sec(d)
00044 return d
00045 def as_float_duration(d):
00046 if isinstance(d, roslib.rostime.Duration):
00047 return d.to_sec()
00048 return d
00049
00050
00051
00052 class Timeout:
00053 def __init__(self, duration, on_timeout=None):
00054 self.duration = duration
00055 self.timer = threading.Timer(0, self._on_timer)
00056 self.deadline = time.time()
00057 self.on_timeout = on_timeout
00058
00059 def reset(self):
00060 self.timer.cancel()
00061 self.timer = threading.Timer(self.duration.to_sec(), self._on_timer)
00062 self.timer.start()
00063 self.deadline = time.time() + self.duration.to_sec()
00064 return self
00065
00066 def cancel(self):
00067 self.timer.cancel()
00068
00069 def left(self):
00070 return max(rospy.Duration(0), rospy.Duration(self.deadline - time.time()))
00071
00072 def _on_timer(self):
00073 if self.on_timeout:
00074 self.on_timeout()
00075
00076
00077
00078
00079
00080
00081 class Bond(object):
00082
00083
00084
00085
00086
00087
00088
00089 def __init__(self, topic, id, on_broken=None, on_formed=None):
00090 self.__started = False
00091 self.topic = topic
00092 self.id = id
00093 self.instance_id = str(uuid.uuid4())
00094 self.sister_instance_id = None
00095 self.on_broken = on_broken
00096 self.on_formed = on_formed
00097 self.is_shutdown = False
00098 self.sister_died_first = False
00099
00100
00101
00102 self.pending_callbacks = []
00103
00104 self.sm = BondSM_sm.BondSM_sm(self)
00105
00106 self.lock = threading.RLock()
00107 self.condition = threading.Condition(self.lock)
00108
00109
00110 self.connect_timeout = Constants.DEFAULT_CONNECT_TIMEOUT
00111 self.heartbeat_timeout = Constants.DEFAULT_HEARTBEAT_TIMEOUT
00112 self.disconnect_timeout = Constants.DEFAULT_DISCONNECT_TIMEOUT
00113 self.heartbeat_period = Constants.DEFAULT_HEARTBEAT_PERIOD
00114
00115 self.pub = rospy.Publisher(self.topic, Status)
00116
00117
00118 def get_connect_timeout(self):
00119 return self.__connect_timeout
00120 def set_connect_timeout(self, dur):
00121 assert not self.__started
00122 self.__connect_timeout = dur
00123 self.connect_timer = Timeout(as_ros_duration(dur), self._on_connect_timeout)
00124 connect_timeout = property(get_connect_timeout, set_connect_timeout)
00125
00126 def get_heartbeat_timeout(self):
00127 return self.__heartbeat_timeout
00128 def set_heartbeat_timeout(self, dur):
00129 assert not self.__started
00130 self.__heartbeat_timeout = dur
00131 self.heartbeat_timer = Timeout(as_ros_duration(dur), self._on_heartbeat_timeout)
00132 heartbeat_timeout = property(get_heartbeat_timeout, set_heartbeat_timeout)
00133
00134 def get_disconnect_timeout(self):
00135 return self.__disconnect_timeout
00136 def set_disconnect_timeout(self, dur):
00137 assert not self.__started
00138 self.__disconnect_timeout = dur
00139 self.disconnect_timer = Timeout(as_ros_duration(dur), self._on_disconnect_timeout)
00140 disconnect_timeout = property(get_disconnect_timeout, set_disconnect_timeout)
00141
00142 def get_heartbeat_period(self):
00143 return self.__heartbeat_period
00144 def set_heartbeat_period(self, per):
00145 assert not self.__started
00146 self.__heartbeat_period = as_float_duration(per)
00147 heartbeat_period = property(get_heartbeat_period, set_heartbeat_period)
00148
00149
00150
00151 def start(self):
00152 with self.lock:
00153 self.connect_timer.reset()
00154 self.sub = rospy.Subscriber(self.topic, Status, self._on_bond_status)
00155
00156 self.thread = threading.Thread(target=self._publishing_thread)
00157 self.thread.daemon = True
00158 self.thread.start()
00159 self.__started = True
00160
00161
00162 def _on_connect_timeout(self):
00163 with self.lock:
00164 self.sm.ConnectTimeout()
00165 self._flush_pending_callbacks()
00166
00167 def _on_heartbeat_timeout(self):
00168
00169 disable_heartbeat_timeout = rospy.get_param(Constants.DISABLE_HEARTBEAT_TIMEOUT_PARAM, False)
00170 if disable_heartbeat_timeout:
00171 rospy.logwarn("Heartbeat timeout is disabled. Not breaking bond (topic: %s, id: %s)" % \
00172 (self.topic, self.id))
00173 return
00174
00175 with self.lock:
00176 self.sm.HeartbeatTimeout()
00177 self._flush_pending_callbacks()
00178
00179 def _on_disconnect_timeout(self):
00180 with self.lock:
00181 self.sm.DisconnectTimeout()
00182 self._flush_pending_callbacks()
00183
00184 def __del__(self):
00185 self.shutdown()
00186
00187 def shutdown(self):
00188 if not self.is_shutdown:
00189 self.sub.unregister()
00190 with self.lock:
00191 self.is_shutdown = True
00192 if self.sm.getState().getName() != 'SM.Dead':
00193 self.break_bond()
00194 self.pub.unregister()
00195 self.condition.notify_all()
00196
00197 def _on_bond_status(self, msg):
00198
00199 if msg.id == self.id and msg.instance_id != self.instance_id:
00200 with self.lock:
00201 if not self.sister_instance_id:
00202 self.sister_instance_id = msg.instance_id
00203
00204 if msg.instance_id != self.sister_instance_id:
00205 rospy.logerr("More than two locations are trying to use a single bond (topic: %s, id: %s). " + \
00206 "You should only instantiate at most two bond instances for each (topic, id) pair." % \
00207 (self.topic, self.id))
00208 return
00209
00210 if msg.active:
00211 self.sm.SisterAlive()
00212 else:
00213 self.sm.SisterDead()
00214
00215
00216 if self.sister_died_first:
00217 self._publish(False)
00218 self._flush_pending_callbacks()
00219
00220
00221
00222 def _publish(self, active):
00223 msg = Status()
00224 msg.header.stamp = rospy.Time.now()
00225 msg.id = self.id
00226 msg.instance_id = self.instance_id
00227 msg.active = active
00228 msg.heartbeat_timeout = self.heartbeat_timeout
00229 msg.heartbeat_period = self.heartbeat_period
00230 self.pub.publish(msg)
00231
00232 def _publishing_thread(self):
00233 with self.lock:
00234
00235 while not self.is_shutdown and self.sm.getState().getName() in ['SM.WaitingForSister', 'SM.Alive']:
00236 self._publish(True)
00237 self.condition.wait(self.heartbeat_period)
00238
00239
00240 while not self.is_shutdown and self.sm.getState().getName() == 'SM.AwaitSisterDeath':
00241 self._publish(False)
00242 self.condition.wait(Constants.DEAD_PUBLISH_PERIOD)
00243
00244 def _flush_pending_callbacks(self):
00245 callbacks = []
00246 with self.lock:
00247 callbacks = self.pending_callbacks
00248 self.pending_callbacks = []
00249 for c in callbacks:
00250 c()
00251
00252
00253 def Connected(self):
00254 self.connect_timer.cancel()
00255 self.condition.notify_all()
00256 if self.on_formed:
00257 self.pending_callbacks.append(self.on_formed)
00258
00259
00260 def Heartbeat(self):
00261 self.heartbeat_timer.reset()
00262
00263
00264 def SisterDied(self):
00265 self.sister_died_first = True
00266
00267
00268 def Death(self):
00269 self.condition.notify_all()
00270 self.heartbeat_timer.cancel()
00271 self.disconnect_timer.cancel()
00272 if self.on_broken:
00273 self.pending_callbacks.append(self.on_broken)
00274
00275
00276 def StartDying(self):
00277 self.heartbeat_timer.cancel()
00278 self.disconnect_timer.reset()
00279
00280
00281 def set_formed_callback(self, on_formed):
00282 with self.lock:
00283 self.on_formed = on_formed
00284
00285
00286 def set_broken_callback(self, on_broken):
00287 with self.lock:
00288 self.on_broken = on_broken
00289
00290
00291
00292
00293
00294 def wait_until_formed(self, timeout = None):
00295 deadline = timeout and Timeout(timeout).reset()
00296 with self.lock:
00297 while self.sm.getState().getName() == 'SM.WaitingForSister':
00298 if rospy.is_shutdown():
00299 break
00300 if deadline and deadline.left() == rospy.Duration(0):
00301 break
00302 wait_duration = 0.1
00303 if deadline:
00304 wait_duration = min(wait_duration, deadline.left().to_sec())
00305 self.condition.wait(wait_duration)
00306 return self.sm.getState().getName() != 'SM.WaitingForSister'
00307
00308
00309
00310
00311
00312 def wait_until_broken(self, timeout = None):
00313 deadline = timeout and Timeout(timeout).reset()
00314 with self.lock:
00315 while self.sm.getState().getName() != 'SM.Dead':
00316 if rospy.is_shutdown():
00317 break
00318 if deadline and deadline.left() == rospy.Duration(0):
00319 break
00320 wait_duration = 0.1
00321 if deadline:
00322 wait_duration = min(wait_duration, deadline.left().to_sec())
00323 self.condition.wait(wait_duration)
00324 return self.sm.getState().getName() == 'SM.Dead'
00325
00326
00327
00328 def is_broken(self):
00329 with self.lock:
00330 return self.sm.getState().getName() == 'SM.Dead'
00331
00332
00333 def break_bond(self):
00334 with self.lock:
00335 self.sm.Die()
00336 self._publish(False)
00337 self._flush_pending_callbacks()
00338
00339
00340 def __repr__(self):
00341 return "[Bond %s, Instance %s (%s)]" % (self.id, self.instance_id, self.sm.getState().getName())