00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 import threading
00031 import time
00032 import uuid
00033
00034 import rospy
00035
00036 import BondSM_sm
00037
00038 from bond.msg import Constants, Status
00039
00040
00041 def as_ros_duration(d):
00042 if not isinstance(d, rospy.Duration):
00043 return rospy.Duration.from_sec(d)
00044 return d
00045
00046
00047 def as_float_duration(d):
00048 if isinstance(d, rospy.Duration):
00049 return d.to_sec()
00050 return d
00051
00052
00053
00054 class Timeout:
00055 def __init__(self, duration, on_timeout=None):
00056 self.duration = duration
00057 self.timer = threading.Timer(0, self._on_timer)
00058 self.timer.daemon = True
00059 self.deadline = time.time()
00060 self.on_timeout = on_timeout
00061
00062 def reset(self):
00063 self.timer.cancel()
00064 self.timer = threading.Timer(self.duration.to_sec(), self._on_timer)
00065 self.timer.daemon = True
00066 self.timer.start()
00067 self.deadline = time.time() + self.duration.to_sec()
00068 return self
00069
00070 def cancel(self):
00071 self.timer.cancel()
00072
00073 def left(self):
00074 return max(rospy.Duration(0), rospy.Duration(self.deadline - time.time()))
00075
00076 def _on_timer(self):
00077 if self.on_timeout:
00078 self.on_timeout()
00079
00080
00081
00082
00083
00084
00085
00086 class Bond(object):
00087
00088
00089
00090
00091
00092
00093
00094 def __init__(self, topic, id, on_broken=None, on_formed=None):
00095 self.__started = False
00096 self.topic = topic
00097 self.id = id
00098 self.instance_id = str(uuid.uuid4())
00099 self.sister_instance_id = None
00100 self.on_broken = on_broken
00101 self.on_formed = on_formed
00102 self.is_shutdown = False
00103 self.sister_died_first = False
00104
00105 self.deadline = None
00106
00107
00108
00109 self.pending_callbacks = []
00110
00111 self.sm = BondSM_sm.BondSM_sm(self)
00112
00113 self.lock = threading.RLock()
00114 self.condition = threading.Condition(self.lock)
00115
00116
00117 self.connect_timeout = Constants.DEFAULT_CONNECT_TIMEOUT
00118 self.heartbeat_timeout = Constants.DEFAULT_HEARTBEAT_TIMEOUT
00119 self.disconnect_timeout = Constants.DEFAULT_DISCONNECT_TIMEOUT
00120 self.heartbeat_period = Constants.DEFAULT_HEARTBEAT_PERIOD
00121
00122
00123
00124 self.pub = rospy.Publisher(self.topic, Status, queue_size=1)
00125
00126 def get_connect_timeout(self):
00127 return self.__connect_timeout
00128
00129 def set_connect_timeout(self, dur):
00130 assert not self.__started
00131 self.__connect_timeout = dur
00132 self.connect_timer = Timeout(as_ros_duration(dur), self._on_connect_timeout)
00133 connect_timeout = property(get_connect_timeout, set_connect_timeout)
00134
00135 def get_heartbeat_timeout(self):
00136 return self.__heartbeat_timeout
00137
00138 def set_heartbeat_timeout(self, dur):
00139 assert not self.__started
00140 self.__heartbeat_timeout = dur
00141 self.heartbeat_timer = Timeout(as_ros_duration(dur), self._on_heartbeat_timeout)
00142 heartbeat_timeout = property(get_heartbeat_timeout, set_heartbeat_timeout)
00143
00144 def get_disconnect_timeout(self):
00145 return self.__disconnect_timeout
00146
00147 def set_disconnect_timeout(self, dur):
00148 assert not self.__started
00149 self.__disconnect_timeout = dur
00150 self.disconnect_timer = Timeout(as_ros_duration(dur), self._on_disconnect_timeout)
00151 disconnect_timeout = property(get_disconnect_timeout, set_disconnect_timeout)
00152
00153 def get_heartbeat_period(self):
00154 return self.__heartbeat_period
00155
00156 def set_heartbeat_period(self, per):
00157 assert not self.__started
00158 self.__heartbeat_period = as_float_duration(per)
00159 heartbeat_period = property(get_heartbeat_period, set_heartbeat_period)
00160
00161
00162 def start(self):
00163 with self.lock:
00164 self.connect_timer.reset()
00165 self.sub = rospy.Subscriber(self.topic, Status, self._on_bond_status)
00166
00167 self.thread = threading.Thread(target=self._publishing_thread)
00168 self.thread.daemon = True
00169 self.thread.start()
00170 self.__started = True
00171
00172 def _on_connect_timeout(self):
00173 with self.lock:
00174 self.sm.ConnectTimeout()
00175 self._flush_pending_callbacks()
00176
00177 def _on_heartbeat_timeout(self):
00178
00179 disable_heartbeat_timeout = rospy.get_param(
00180 Constants.DISABLE_HEARTBEAT_TIMEOUT_PARAM, False)
00181 if disable_heartbeat_timeout:
00182 rospy.logwarn(
00183 "Heartbeat timeout is disabled. Not breaking bond (topic: %s, id: %s)" %
00184 (self.topic, self.id))
00185 return
00186
00187 with self.lock:
00188 self.sm.HeartbeatTimeout()
00189 self._flush_pending_callbacks()
00190
00191 def _on_disconnect_timeout(self):
00192 with self.lock:
00193 self.sm.DisconnectTimeout()
00194 self._flush_pending_callbacks()
00195
00196 def __del__(self):
00197 self.shutdown()
00198
00199 def shutdown(self):
00200 if not self.is_shutdown:
00201 self.sub.unregister()
00202 with self.lock:
00203 self.is_shutdown = True
00204 if self.sm.getState().getName() != 'SM.Dead':
00205 self.break_bond()
00206 self.pub.unregister()
00207 self.condition.notify_all()
00208 self.connect_timer.cancel()
00209 if self.deadline:
00210 self.deadline.cancel()
00211
00212 def _on_bond_status(self, msg):
00213
00214 if msg.id == self.id and msg.instance_id != self.instance_id:
00215 with self.lock:
00216 if not self.sister_instance_id:
00217 self.sister_instance_id = msg.instance_id
00218
00219 if msg.instance_id != self.sister_instance_id:
00220 rospy.logerr(
00221 "More than two locations are trying to use a single bond (topic: %s, id: %s). " +
00222 "You should only instantiate at most two bond instances for each (topic, id) pair." %
00223 (self.topic, self.id))
00224 return
00225
00226 if msg.active:
00227 self.sm.SisterAlive()
00228 else:
00229 self.sm.SisterDead()
00230
00231
00232 if self.sister_died_first:
00233 self._publish(False)
00234 self._flush_pending_callbacks()
00235
00236 def _publish(self, active):
00237 msg = Status()
00238 msg.header.stamp = rospy.Time.now()
00239 msg.id = self.id
00240 msg.instance_id = self.instance_id
00241 msg.active = active
00242 msg.heartbeat_timeout = self.heartbeat_timeout
00243 msg.heartbeat_period = self.heartbeat_period
00244 self.pub.publish(msg)
00245
00246 def _publishing_thread(self):
00247 with self.lock:
00248
00249 while not self.is_shutdown and self.sm.getState().getName() in ['SM.WaitingForSister', 'SM.Alive']:
00250 self._publish(True)
00251 self.condition.wait(self.heartbeat_period)
00252
00253
00254 while not self.is_shutdown and self.sm.getState().getName() == 'SM.AwaitSisterDeath':
00255 self._publish(False)
00256 self.condition.wait(Constants.DEAD_PUBLISH_PERIOD)
00257
00258 def _flush_pending_callbacks(self):
00259 callbacks = []
00260 with self.lock:
00261 callbacks = self.pending_callbacks
00262 self.pending_callbacks = []
00263 for c in callbacks:
00264 c()
00265
00266
00267 def Connected(self):
00268 self.connect_timer.cancel()
00269 self.condition.notify_all()
00270 if self.on_formed:
00271 self.pending_callbacks.append(self.on_formed)
00272
00273
00274 def Heartbeat(self):
00275 self.heartbeat_timer.reset()
00276
00277
00278 def SisterDied(self):
00279 self.sister_died_first = True
00280
00281
00282 def Death(self):
00283 self.condition.notify_all()
00284 self.heartbeat_timer.cancel()
00285 self.disconnect_timer.cancel()
00286 if self.on_broken:
00287 self.pending_callbacks.append(self.on_broken)
00288
00289
00290 def StartDying(self):
00291 self.heartbeat_timer.cancel()
00292 self.disconnect_timer.reset()
00293
00294
00295 def set_formed_callback(self, on_formed):
00296 with self.lock:
00297 self.on_formed = on_formed
00298
00299
00300 def set_broken_callback(self, on_broken):
00301 with self.lock:
00302 self.on_broken = on_broken
00303
00304
00305
00306
00307
00308 def wait_until_formed(self, timeout=None):
00309 if self.deadline:
00310 self.deadline.cancel()
00311 self.deadline = None
00312 if timeout:
00313 self.deadline = Timeout(timeout).reset()
00314 with self.lock:
00315 while self.sm.getState().getName() == 'SM.WaitingForSister':
00316 if rospy.is_shutdown():
00317 break
00318 if self.deadline and self.deadline.left() == rospy.Duration(0):
00319 break
00320 wait_duration = 0.1
00321 if self.deadline:
00322 wait_duration = min(wait_duration, self.deadline.left().to_sec())
00323 self.condition.wait(wait_duration)
00324 return self.sm.getState().getName() != 'SM.WaitingForSister'
00325
00326
00327
00328
00329
00330 def wait_until_broken(self, timeout=None):
00331 if self.deadline:
00332 self.deadline.cancel()
00333 self.deadline = None
00334 if timeout:
00335 self.deadline = Timeout(timeout).reset()
00336 with self.lock:
00337 while self.sm.getState().getName() != 'SM.Dead':
00338 if rospy.is_shutdown():
00339 break
00340 if self.deadline and self.deadline.left() == rospy.Duration(0):
00341 break
00342 wait_duration = 0.1
00343 if self.deadline:
00344 wait_duration = min(wait_duration, self.deadline.left().to_sec())
00345 self.condition.wait(wait_duration)
00346 return self.sm.getState().getName() == 'SM.Dead'
00347
00348
00349
00350 def is_broken(self):
00351 with self.lock:
00352 return self.sm.getState().getName() == 'SM.Dead'
00353
00354
00355 def break_bond(self):
00356 with self.lock:
00357 self.sm.Die()
00358 self._publish(False)
00359 self._flush_pending_callbacks()
00360
00361 def __repr__(self):
00362 return "[Bond %s, Instance %s (%s)]" % \
00363 (self.id, self.instance_id, self.sm.getState().getName())