Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 import sys
00031 import threading
00032 import time
00033 import uuid
00034
00035 import rospy
00036 from bond.msg import *
00037
00038 import BondSM_sm
00039
00040 def as_ros_duration(d):
00041 if not isinstance(d, rospy.Duration):
00042 return rospy.Duration.from_sec(d)
00043 return d
00044 def as_float_duration(d):
00045 if isinstance(d, rospy.Duration):
00046 return d.to_sec()
00047 return d
00048
00049
00050
00051 class Timeout:
00052 def __init__(self, duration, on_timeout=None):
00053 self.duration = duration
00054 self.timer = threading.Timer(0, self._on_timer)
00055 self.deadline = time.time()
00056 self.on_timeout = on_timeout
00057
00058 def reset(self):
00059 self.timer.cancel()
00060 self.timer = threading.Timer(self.duration.to_sec(), self._on_timer)
00061 self.timer.start()
00062 self.deadline = time.time() + self.duration.to_sec()
00063 return self
00064
00065 def cancel(self):
00066 self.timer.cancel()
00067
00068 def left(self):
00069 return max(rospy.Duration(0), rospy.Duration(self.deadline - time.time()))
00070
00071 def _on_timer(self):
00072 if self.on_timeout:
00073 self.on_timeout()
00074
00075
00076
00077
00078
00079
00080 class Bond(object):
00081
00082
00083
00084
00085
00086
00087
00088 def __init__(self, topic, id, on_broken=None, on_formed=None):
00089 self.__started = False
00090 self.topic = topic
00091 self.id = id
00092 self.instance_id = str(uuid.uuid4())
00093 self.sister_instance_id = None
00094 self.on_broken = on_broken
00095 self.on_formed = on_formed
00096 self.is_shutdown = False
00097 self.sister_died_first = False
00098
00099
00100
00101 self.pending_callbacks = []
00102
00103 self.sm = BondSM_sm.BondSM_sm(self)
00104
00105 self.lock = threading.RLock()
00106 self.condition = threading.Condition(self.lock)
00107
00108
00109 self.connect_timeout = Constants.DEFAULT_CONNECT_TIMEOUT
00110 self.heartbeat_timeout = Constants.DEFAULT_HEARTBEAT_TIMEOUT
00111 self.disconnect_timeout = Constants.DEFAULT_DISCONNECT_TIMEOUT
00112 self.heartbeat_period = Constants.DEFAULT_HEARTBEAT_PERIOD
00113
00114 self.pub = rospy.Publisher(self.topic, Status)
00115
00116
00117 def get_connect_timeout(self):
00118 return self.__connect_timeout
00119 def set_connect_timeout(self, dur):
00120 assert not self.__started
00121 self.__connect_timeout = dur
00122 self.connect_timer = Timeout(as_ros_duration(dur), self._on_connect_timeout)
00123 connect_timeout = property(get_connect_timeout, set_connect_timeout)
00124
00125 def get_heartbeat_timeout(self):
00126 return self.__heartbeat_timeout
00127 def set_heartbeat_timeout(self, dur):
00128 assert not self.__started
00129 self.__heartbeat_timeout = dur
00130 self.heartbeat_timer = Timeout(as_ros_duration(dur), self._on_heartbeat_timeout)
00131 heartbeat_timeout = property(get_heartbeat_timeout, set_heartbeat_timeout)
00132
00133 def get_disconnect_timeout(self):
00134 return self.__disconnect_timeout
00135 def set_disconnect_timeout(self, dur):
00136 assert not self.__started
00137 self.__disconnect_timeout = dur
00138 self.disconnect_timer = Timeout(as_ros_duration(dur), self._on_disconnect_timeout)
00139 disconnect_timeout = property(get_disconnect_timeout, set_disconnect_timeout)
00140
00141 def get_heartbeat_period(self):
00142 return self.__heartbeat_period
00143 def set_heartbeat_period(self, per):
00144 assert not self.__started
00145 self.__heartbeat_period = as_float_duration(per)
00146 heartbeat_period = property(get_heartbeat_period, set_heartbeat_period)
00147
00148
00149
00150 def start(self):
00151 with self.lock:
00152 self.connect_timer.reset()
00153 self.sub = rospy.Subscriber(self.topic, Status, self._on_bond_status)
00154
00155 self.thread = threading.Thread(target=self._publishing_thread)
00156 self.thread.daemon = True
00157 self.thread.start()
00158 self.__started = True
00159
00160
00161 def _on_connect_timeout(self):
00162 with self.lock:
00163 self.sm.ConnectTimeout()
00164 self._flush_pending_callbacks()
00165
00166 def _on_heartbeat_timeout(self):
00167
00168 disable_heartbeat_timeout = rospy.get_param(Constants.DISABLE_HEARTBEAT_TIMEOUT_PARAM, False)
00169 if disable_heartbeat_timeout:
00170 rospy.logwarn("Heartbeat timeout is disabled. Not breaking bond (topic: %s, id: %s)" % \
00171 (self.topic, self.id))
00172 return
00173
00174 with self.lock:
00175 self.sm.HeartbeatTimeout()
00176 self._flush_pending_callbacks()
00177
00178 def _on_disconnect_timeout(self):
00179 with self.lock:
00180 self.sm.DisconnectTimeout()
00181 self._flush_pending_callbacks()
00182
00183 def __del__(self):
00184 self.shutdown()
00185
00186 def shutdown(self):
00187 if not self.is_shutdown:
00188 self.sub.unregister()
00189 with self.lock:
00190 self.is_shutdown = True
00191 if self.sm.getState().getName() != 'SM.Dead':
00192 self.break_bond()
00193 self.pub.unregister()
00194 self.condition.notify_all()
00195
00196 def _on_bond_status(self, msg):
00197
00198 if msg.id == self.id and msg.instance_id != self.instance_id:
00199 with self.lock:
00200 if not self.sister_instance_id:
00201 self.sister_instance_id = msg.instance_id
00202
00203 if msg.instance_id != self.sister_instance_id:
00204 rospy.logerr("More than two locations are trying to use a single bond (topic: %s, id: %s). " + \
00205 "You should only instantiate at most two bond instances for each (topic, id) pair." % \
00206 (self.topic, self.id))
00207 return
00208
00209 if msg.active:
00210 self.sm.SisterAlive()
00211 else:
00212 self.sm.SisterDead()
00213
00214
00215 if self.sister_died_first:
00216 self._publish(False)
00217 self._flush_pending_callbacks()
00218
00219
00220
00221 def _publish(self, active):
00222 msg = Status()
00223 msg.header.stamp = rospy.Time.now()
00224 msg.id = self.id
00225 msg.instance_id = self.instance_id
00226 msg.active = active
00227 msg.heartbeat_timeout = self.heartbeat_timeout
00228 msg.heartbeat_period = self.heartbeat_period
00229 self.pub.publish(msg)
00230
00231 def _publishing_thread(self):
00232 with self.lock:
00233
00234 while not self.is_shutdown and self.sm.getState().getName() in ['SM.WaitingForSister', 'SM.Alive']:
00235 self._publish(True)
00236 self.condition.wait(self.heartbeat_period)
00237
00238
00239 while not self.is_shutdown and self.sm.getState().getName() == 'SM.AwaitSisterDeath':
00240 self._publish(False)
00241 self.condition.wait(Constants.DEAD_PUBLISH_PERIOD)
00242
00243 def _flush_pending_callbacks(self):
00244 callbacks = []
00245 with self.lock:
00246 callbacks = self.pending_callbacks
00247 self.pending_callbacks = []
00248 for c in callbacks:
00249 c()
00250
00251
00252 def Connected(self):
00253 self.connect_timer.cancel()
00254 self.condition.notify_all()
00255 if self.on_formed:
00256 self.pending_callbacks.append(self.on_formed)
00257
00258
00259 def Heartbeat(self):
00260 self.heartbeat_timer.reset()
00261
00262
00263 def SisterDied(self):
00264 self.sister_died_first = True
00265
00266
00267 def Death(self):
00268 self.condition.notify_all()
00269 self.heartbeat_timer.cancel()
00270 self.disconnect_timer.cancel()
00271 if self.on_broken:
00272 self.pending_callbacks.append(self.on_broken)
00273
00274
00275 def StartDying(self):
00276 self.heartbeat_timer.cancel()
00277 self.disconnect_timer.reset()
00278
00279
00280 def set_formed_callback(self, on_formed):
00281 with self.lock:
00282 self.on_formed = on_formed
00283
00284
00285 def set_broken_callback(self, on_broken):
00286 with self.lock:
00287 self.on_broken = on_broken
00288
00289
00290
00291
00292
00293 def wait_until_formed(self, timeout = None):
00294 deadline = timeout and Timeout(timeout).reset()
00295 with self.lock:
00296 while self.sm.getState().getName() == 'SM.WaitingForSister':
00297 if rospy.is_shutdown():
00298 break
00299 if deadline and deadline.left() == rospy.Duration(0):
00300 break
00301 wait_duration = 0.1
00302 if deadline:
00303 wait_duration = min(wait_duration, deadline.left().to_sec())
00304 self.condition.wait(wait_duration)
00305 return self.sm.getState().getName() != 'SM.WaitingForSister'
00306
00307
00308
00309
00310
00311 def wait_until_broken(self, timeout = None):
00312 deadline = timeout and Timeout(timeout).reset()
00313 with self.lock:
00314 while self.sm.getState().getName() != 'SM.Dead':
00315 if rospy.is_shutdown():
00316 break
00317 if deadline and deadline.left() == rospy.Duration(0):
00318 break
00319 wait_duration = 0.1
00320 if deadline:
00321 wait_duration = min(wait_duration, deadline.left().to_sec())
00322 self.condition.wait(wait_duration)
00323 return self.sm.getState().getName() == 'SM.Dead'
00324
00325
00326
00327 def is_broken(self):
00328 with self.lock:
00329 return self.sm.getState().getName() == 'SM.Dead'
00330
00331
00332 def break_bond(self):
00333 with self.lock:
00334 self.sm.Die()
00335 self._publish(False)
00336 self._flush_pending_callbacks()
00337
00338
00339 def __repr__(self):
00340 return "[Bond %s, Instance %s (%s)]" % (self.id, self.instance_id, self.sm.getState().getName())