$search
00001 # Copyright (c) 2009, Willow Garage, Inc. 00002 # All rights reserved. 00003 # 00004 # Redistribution and use in source and binary forms, with or without 00005 # modification, are permitted provided that the following conditions are met: 00006 # 00007 # * Redistributions of source code must retain the above copyright 00008 # notice, this list of conditions and the following disclaimer. 00009 # * Redistributions in binary form must reproduce the above copyright 00010 # notice, this list of conditions and the following disclaimer in the 00011 # documentation and/or other materials provided with the distribution. 00012 # * Neither the name of the Willow Garage, Inc. nor the names of its 00013 # contributors may be used to endorse or promote products derived from 00014 # this software without specific prior written permission. 00015 # 00016 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 00017 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 00018 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 00019 # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 00020 # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 00021 # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 00022 # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00023 # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 00024 # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 00025 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00026 # POSSIBILITY OF SUCH DAMAGE. 00027 00028 ## \author Stuart Glaser 00029 00030 import sys 00031 import threading 00032 import time 00033 import uuid 00034 00035 import roslib; roslib.load_manifest('bondpy') 00036 import rospy 00037 from bond.msg import * 00038 00039 import BondSM_sm 00040 00041 def as_ros_duration(d): 00042 if not isinstance(d, roslib.rostime.Duration): 00043 return rospy.Duration.from_sec(d) 00044 return d 00045 def as_float_duration(d): 00046 if isinstance(d, roslib.rostime.Duration): 00047 return d.to_sec() 00048 return d 00049 00050 00051 ## Internal use only 00052 class Timeout: 00053 def __init__(self, duration, on_timeout=None): 00054 self.duration = duration 00055 self.timer = threading.Timer(0, self._on_timer) 00056 self.deadline = time.time() 00057 self.on_timeout = on_timeout 00058 00059 def reset(self): 00060 self.timer.cancel() 00061 self.timer = threading.Timer(self.duration.to_sec(), self._on_timer) 00062 self.timer.start() 00063 self.deadline = time.time() + self.duration.to_sec() 00064 return self 00065 00066 def cancel(self): 00067 self.timer.cancel() 00068 00069 def left(self): 00070 return max(rospy.Duration(0), rospy.Duration(self.deadline - time.time())) 00071 00072 def _on_timer(self): 00073 if self.on_timeout: 00074 self.on_timeout() 00075 00076 ## \brief Forms a bond to monitor another process. 00077 # 00078 # The Bond class implements a bond, allowing you to monitor another 00079 # process and be notified when it dies. In turn, it will be notified 00080 # when you die. 00081 class Bond(object): 00082 ## \brief Constructs a bond, but does not connect. 00083 # 00084 # \param topic The topic used to exchange the bond status messages. 00085 # \param id The ID of the bond, which should match the ID used on 00086 # the sister's end 00087 # \param on_broken callback that will be called when the bond is broken. 00088 # \param on_formed callback that will be called when the bond is formed. 00089 def __init__(self, topic, id, on_broken=None, on_formed=None): 00090 self.__started = False 00091 self.topic = topic 00092 self.id = id 00093 self.instance_id = str(uuid.uuid4()) 00094 self.sister_instance_id = None 00095 self.on_broken = on_broken 00096 self.on_formed = on_formed 00097 self.is_shutdown = False 00098 self.sister_died_first = False 00099 00100 # Callbacks must be called outside of the locks and after the 00101 # state machine finishes transitioning. 00102 self.pending_callbacks = [] 00103 00104 self.sm = BondSM_sm.BondSM_sm(self) 00105 #self.sm.setDebugFlag(True) 00106 self.lock = threading.RLock() 00107 self.condition = threading.Condition(self.lock) 00108 00109 # Sets the default timeout values 00110 self.connect_timeout = Constants.DEFAULT_CONNECT_TIMEOUT 00111 self.heartbeat_timeout = Constants.DEFAULT_HEARTBEAT_TIMEOUT 00112 self.disconnect_timeout = Constants.DEFAULT_DISCONNECT_TIMEOUT 00113 self.heartbeat_period = Constants.DEFAULT_HEARTBEAT_PERIOD 00114 00115 self.pub = rospy.Publisher(self.topic, Status) 00116 00117 00118 def get_connect_timeout(self): 00119 return self.__connect_timeout 00120 def set_connect_timeout(self, dur): 00121 assert not self.__started 00122 self.__connect_timeout = dur 00123 self.connect_timer = Timeout(as_ros_duration(dur), self._on_connect_timeout) 00124 connect_timeout = property(get_connect_timeout, set_connect_timeout) 00125 00126 def get_heartbeat_timeout(self): 00127 return self.__heartbeat_timeout 00128 def set_heartbeat_timeout(self, dur): 00129 assert not self.__started 00130 self.__heartbeat_timeout = dur 00131 self.heartbeat_timer = Timeout(as_ros_duration(dur), self._on_heartbeat_timeout) 00132 heartbeat_timeout = property(get_heartbeat_timeout, set_heartbeat_timeout) 00133 00134 def get_disconnect_timeout(self): 00135 return self.__disconnect_timeout 00136 def set_disconnect_timeout(self, dur): 00137 assert not self.__started 00138 self.__disconnect_timeout = dur 00139 self.disconnect_timer = Timeout(as_ros_duration(dur), self._on_disconnect_timeout) 00140 disconnect_timeout = property(get_disconnect_timeout, set_disconnect_timeout) 00141 00142 def get_heartbeat_period(self): 00143 return self.__heartbeat_period 00144 def set_heartbeat_period(self, per): 00145 assert not self.__started 00146 self.__heartbeat_period = as_float_duration(per) 00147 heartbeat_period = property(get_heartbeat_period, set_heartbeat_period) 00148 00149 00150 ## \brief Starts the bond and connects to the sister process. 00151 def start(self): 00152 with self.lock: 00153 self.connect_timer.reset() 00154 self.sub = rospy.Subscriber(self.topic, Status, self._on_bond_status) 00155 00156 self.thread = threading.Thread(target=self._publishing_thread) 00157 self.thread.daemon = True 00158 self.thread.start() 00159 self.__started = True 00160 00161 00162 def _on_connect_timeout(self): 00163 with self.lock: 00164 self.sm.ConnectTimeout() 00165 self._flush_pending_callbacks() 00166 00167 def _on_heartbeat_timeout(self): 00168 # Checks that heartbeat timeouts haven't been disabled globally 00169 disable_heartbeat_timeout = rospy.get_param(Constants.DISABLE_HEARTBEAT_TIMEOUT_PARAM, False) 00170 if disable_heartbeat_timeout: 00171 rospy.logwarn("Heartbeat timeout is disabled. Not breaking bond (topic: %s, id: %s)" % \ 00172 (self.topic, self.id)) 00173 return 00174 00175 with self.lock: 00176 self.sm.HeartbeatTimeout() 00177 self._flush_pending_callbacks() 00178 00179 def _on_disconnect_timeout(self): 00180 with self.lock: 00181 self.sm.DisconnectTimeout() 00182 self._flush_pending_callbacks() 00183 00184 def __del__(self): 00185 self.shutdown() 00186 00187 def shutdown(self): 00188 if not self.is_shutdown: 00189 self.sub.unregister() 00190 with self.lock: 00191 self.is_shutdown = True 00192 if self.sm.getState().getName() != 'SM.Dead': 00193 self.break_bond() 00194 self.pub.unregister() 00195 self.condition.notify_all() 00196 00197 def _on_bond_status(self, msg): 00198 # Filters out messages from other bonds and messages from ourself 00199 if msg.id == self.id and msg.instance_id != self.instance_id: 00200 with self.lock: 00201 if not self.sister_instance_id: 00202 self.sister_instance_id = msg.instance_id 00203 00204 if msg.instance_id != self.sister_instance_id: 00205 rospy.logerr("More than two locations are trying to use a single bond (topic: %s, id: %s). " + \ 00206 "You should only instantiate at most two bond instances for each (topic, id) pair." % \ 00207 (self.topic, self.id)) 00208 return 00209 00210 if msg.active: 00211 self.sm.SisterAlive() 00212 else: 00213 self.sm.SisterDead() 00214 00215 # Immediate ack for sister's death notification 00216 if self.sister_died_first: 00217 self._publish(False) 00218 self._flush_pending_callbacks() 00219 00220 00221 00222 def _publish(self, active): 00223 msg = Status() 00224 msg.header.stamp = rospy.Time.now() 00225 msg.id = self.id 00226 msg.instance_id = self.instance_id 00227 msg.active = active 00228 msg.heartbeat_timeout = self.heartbeat_timeout 00229 msg.heartbeat_period = self.heartbeat_period 00230 self.pub.publish(msg) 00231 00232 def _publishing_thread(self): 00233 with self.lock: 00234 # Publishing ALIVE 00235 while not self.is_shutdown and self.sm.getState().getName() in ['SM.WaitingForSister', 'SM.Alive']: 00236 self._publish(True) 00237 self.condition.wait(self.heartbeat_period) 00238 00239 # Publishing DEAD 00240 while not self.is_shutdown and self.sm.getState().getName() == 'SM.AwaitSisterDeath': 00241 self._publish(False) 00242 self.condition.wait(Constants.DEAD_PUBLISH_PERIOD) 00243 00244 def _flush_pending_callbacks(self): 00245 callbacks = [] 00246 with self.lock: 00247 callbacks = self.pending_callbacks 00248 self.pending_callbacks = [] 00249 for c in callbacks: 00250 c() 00251 00252 ## \brief INTERNAL 00253 def Connected(self): 00254 self.connect_timer.cancel() 00255 self.condition.notify_all() 00256 if self.on_formed: 00257 self.pending_callbacks.append(self.on_formed) 00258 00259 ## \brief INTERNAL 00260 def Heartbeat(self): 00261 self.heartbeat_timer.reset() 00262 00263 ## \brief INTERNAL 00264 def SisterDied(self): 00265 self.sister_died_first = True 00266 00267 ## \brief INTERNAL 00268 def Death(self): 00269 self.condition.notify_all() 00270 self.heartbeat_timer.cancel() 00271 self.disconnect_timer.cancel() 00272 if self.on_broken: 00273 self.pending_callbacks.append(self.on_broken) 00274 00275 ## \brief INTERNAL 00276 def StartDying(self): 00277 self.heartbeat_timer.cancel() 00278 self.disconnect_timer.reset() 00279 00280 ## \brief Sets the formed callback 00281 def set_formed_callback(self, on_formed): 00282 with self.lock: 00283 self.on_formed = on_formed 00284 00285 ## \brief Sets the broken callback 00286 def set_broken_callback(self, on_broken): 00287 with self.lock: 00288 self.on_broken = on_broken 00289 00290 ## \brief Blocks until the bond is formed for at most 'duration'. 00291 # 00292 # \param timeout Maximum duration to wait. If None then this call will not timeout. 00293 # \return true iff the bond has been formed. 00294 def wait_until_formed(self, timeout = None): 00295 deadline = timeout and Timeout(timeout).reset() 00296 with self.lock: 00297 while self.sm.getState().getName() == 'SM.WaitingForSister': 00298 if rospy.is_shutdown(): 00299 break 00300 if deadline and deadline.left() == rospy.Duration(0): 00301 break 00302 wait_duration = 0.1 00303 if deadline: 00304 wait_duration = min(wait_duration, deadline.left().to_sec()) 00305 self.condition.wait(wait_duration) 00306 return self.sm.getState().getName() != 'SM.WaitingForSister' 00307 00308 ## \brief Blocks until the bond is broken for at most 'duration'. 00309 # 00310 # \param timeout Maximum duration to wait. If None then this call will not timeout. 00311 # \return true iff the bond has been broken, even if it has never been formed. 00312 def wait_until_broken(self, timeout = None): 00313 deadline = timeout and Timeout(timeout).reset() 00314 with self.lock: 00315 while self.sm.getState().getName() != 'SM.Dead': 00316 if rospy.is_shutdown(): 00317 break 00318 if deadline and deadline.left() == rospy.Duration(0): 00319 break 00320 wait_duration = 0.1 00321 if deadline: 00322 wait_duration = min(wait_duration, deadline.left().to_sec()) 00323 self.condition.wait(wait_duration) 00324 return self.sm.getState().getName() == 'SM.Dead' 00325 00326 ## \brief Indicates if the bond is broken 00327 # \return true iff the bond has been broken. 00328 def is_broken(self): 00329 with self.lock: 00330 return self.sm.getState().getName() == 'SM.Dead' 00331 00332 ## \brief Breaks the bond, notifying the other process. 00333 def break_bond(self): 00334 with self.lock: 00335 self.sm.Die() 00336 self._publish(False) 00337 self._flush_pending_callbacks() 00338 00339 00340 def __repr__(self): 00341 return "[Bond %s, Instance %s (%s)]" % (self.id, self.instance_id, self.sm.getState().getName())