bondpy.py
Go to the documentation of this file.
00001 # Copyright (c) 2009, Willow Garage, Inc.
00002 # All rights reserved.
00003 #
00004 # Redistribution and use in source and binary forms, with or without
00005 # modification, are permitted provided that the following conditions are met:
00006 #
00007 #     * Redistributions of source code must retain the above copyright
00008 #       notice, this list of conditions and the following disclaimer.
00009 #     * Redistributions in binary form must reproduce the above copyright
00010 #       notice, this list of conditions and the following disclaimer in the
00011 #       documentation and/or other materials provided with the distribution.
00012 #     * Neither the name of the Willow Garage, Inc. nor the names of its
00013 #       contributors may be used to endorse or promote products derived from
00014 #       this software without specific prior written permission.
00015 #
00016 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00017 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00018 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00019 # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00020 # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00021 # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00022 # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00023 # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00024 # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00025 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00026 # POSSIBILITY OF SUCH DAMAGE.
00027 
00028 ## \author Stuart Glaser
00029 
00030 import sys
00031 import threading
00032 import time
00033 import uuid
00034 
00035 import rospy
00036 from bond.msg import *
00037 
00038 import BondSM_sm
00039 
00040 def as_ros_duration(d):
00041     if not isinstance(d, rospy.Duration):
00042         return rospy.Duration.from_sec(d)
00043     return d
00044 def as_float_duration(d):
00045     if isinstance(d, rospy.Duration):
00046         return d.to_sec()
00047     return d
00048 
00049 
00050 ## Internal use only
00051 class Timeout:
00052     def __init__(self, duration, on_timeout=None):
00053         self.duration = duration
00054         self.timer = threading.Timer(0, self._on_timer)
00055         self.deadline = time.time()
00056         self.on_timeout = on_timeout
00057 
00058     def reset(self):
00059         self.timer.cancel()
00060         self.timer = threading.Timer(self.duration.to_sec(), self._on_timer)
00061         self.timer.start()
00062         self.deadline = time.time() + self.duration.to_sec()
00063         return self
00064 
00065     def cancel(self):
00066         self.timer.cancel()
00067 
00068     def left(self):
00069         return max(rospy.Duration(0), rospy.Duration(self.deadline - time.time()))
00070 
00071     def _on_timer(self):
00072         if self.on_timeout:
00073             self.on_timeout()
00074 
00075 ## \brief Forms a bond to monitor another process.
00076 #
00077 # The Bond class implements a bond, allowing you to monitor another
00078 # process and be notified when it dies.  In turn, it will be notified
00079 # when you die.
00080 class Bond(object):
00081     ## \brief Constructs a bond, but does not connect.
00082     #
00083     # \param topic The topic used to exchange the bond status messages.
00084     # \param id The ID of the bond, which should match the ID used on
00085     #           the sister's end
00086     # \param on_broken callback that will be called when the bond is broken.
00087     # \param on_formed callback that will be called when the bond is formed.
00088     def __init__(self, topic, id, on_broken=None, on_formed=None):
00089         self.__started = False
00090         self.topic = topic
00091         self.id = id
00092         self.instance_id = str(uuid.uuid4())
00093         self.sister_instance_id = None
00094         self.on_broken = on_broken
00095         self.on_formed = on_formed
00096         self.is_shutdown = False
00097         self.sister_died_first = False
00098 
00099         # Callbacks must be called outside of the locks and after the
00100         # state machine finishes transitioning.
00101         self.pending_callbacks = []
00102 
00103         self.sm = BondSM_sm.BondSM_sm(self)
00104         #self.sm.setDebugFlag(True)
00105         self.lock = threading.RLock()
00106         self.condition = threading.Condition(self.lock)
00107 
00108         # Sets the default timeout values
00109         self.connect_timeout = Constants.DEFAULT_CONNECT_TIMEOUT
00110         self.heartbeat_timeout = Constants.DEFAULT_HEARTBEAT_TIMEOUT
00111         self.disconnect_timeout = Constants.DEFAULT_DISCONNECT_TIMEOUT
00112         self.heartbeat_period = Constants.DEFAULT_HEARTBEAT_PERIOD
00113 
00114         self.pub = rospy.Publisher(self.topic, Status)
00115 
00116 
00117     def get_connect_timeout(self):
00118         return self.__connect_timeout
00119     def set_connect_timeout(self, dur):
00120         assert not self.__started
00121         self.__connect_timeout = dur
00122         self.connect_timer = Timeout(as_ros_duration(dur), self._on_connect_timeout)
00123     connect_timeout = property(get_connect_timeout, set_connect_timeout)
00124 
00125     def get_heartbeat_timeout(self):
00126         return self.__heartbeat_timeout
00127     def set_heartbeat_timeout(self, dur):
00128         assert not self.__started
00129         self.__heartbeat_timeout = dur
00130         self.heartbeat_timer = Timeout(as_ros_duration(dur), self._on_heartbeat_timeout)
00131     heartbeat_timeout = property(get_heartbeat_timeout, set_heartbeat_timeout)
00132 
00133     def get_disconnect_timeout(self):
00134         return self.__disconnect_timeout
00135     def set_disconnect_timeout(self, dur):
00136         assert not self.__started
00137         self.__disconnect_timeout = dur
00138         self.disconnect_timer = Timeout(as_ros_duration(dur), self._on_disconnect_timeout)
00139     disconnect_timeout = property(get_disconnect_timeout, set_disconnect_timeout)
00140 
00141     def get_heartbeat_period(self):
00142         return self.__heartbeat_period
00143     def set_heartbeat_period(self, per):
00144         assert not self.__started
00145         self.__heartbeat_period = as_float_duration(per)
00146     heartbeat_period = property(get_heartbeat_period, set_heartbeat_period)
00147 
00148 
00149     ## \brief Starts the bond and connects to the sister process.
00150     def start(self):
00151         with self.lock:
00152             self.connect_timer.reset()
00153             self.sub = rospy.Subscriber(self.topic, Status, self._on_bond_status)
00154 
00155             self.thread = threading.Thread(target=self._publishing_thread)
00156             self.thread.daemon = True
00157             self.thread.start()
00158             self.__started = True
00159 
00160 
00161     def _on_connect_timeout(self):
00162         with self.lock:
00163             self.sm.ConnectTimeout()
00164         self._flush_pending_callbacks()
00165 
00166     def _on_heartbeat_timeout(self):
00167         # Checks that heartbeat timeouts haven't been disabled globally
00168         disable_heartbeat_timeout = rospy.get_param(Constants.DISABLE_HEARTBEAT_TIMEOUT_PARAM, False)
00169         if disable_heartbeat_timeout:
00170             rospy.logwarn("Heartbeat timeout is disabled.  Not breaking bond (topic: %s, id: %s)" % \
00171                               (self.topic, self.id))
00172             return
00173 
00174         with self.lock:
00175             self.sm.HeartbeatTimeout()
00176         self._flush_pending_callbacks()
00177 
00178     def _on_disconnect_timeout(self):
00179         with self.lock:
00180             self.sm.DisconnectTimeout()
00181         self._flush_pending_callbacks()
00182 
00183     def __del__(self):
00184         self.shutdown()
00185 
00186     def shutdown(self):
00187         if not self.is_shutdown:
00188             self.sub.unregister()
00189             with self.lock:
00190                 self.is_shutdown = True
00191                 if self.sm.getState().getName() != 'SM.Dead':
00192                     self.break_bond()
00193                 self.pub.unregister()
00194                 self.condition.notify_all()
00195 
00196     def _on_bond_status(self, msg):
00197         # Filters out messages from other bonds and messages from ourself
00198         if msg.id == self.id and msg.instance_id != self.instance_id:
00199             with self.lock:
00200                 if not self.sister_instance_id:
00201                     self.sister_instance_id = msg.instance_id
00202 
00203                 if msg.instance_id != self.sister_instance_id:
00204                     rospy.logerr("More than two locations are trying to use a single bond (topic: %s, id: %s).  " + \
00205                                  "You should only instantiate at most two bond instances for each (topic, id) pair." % \
00206                                      (self.topic, self.id))
00207                     return
00208 
00209                 if msg.active:
00210                     self.sm.SisterAlive()
00211                 else:
00212                     self.sm.SisterDead()
00213 
00214                     # Immediate ack for sister's death notification
00215                     if self.sister_died_first:
00216                         self._publish(False)
00217             self._flush_pending_callbacks()
00218 
00219 
00220 
00221     def _publish(self, active):
00222         msg = Status()
00223         msg.header.stamp = rospy.Time.now()
00224         msg.id = self.id
00225         msg.instance_id = self.instance_id
00226         msg.active = active
00227         msg.heartbeat_timeout = self.heartbeat_timeout
00228         msg.heartbeat_period = self.heartbeat_period
00229         self.pub.publish(msg)
00230 
00231     def _publishing_thread(self):
00232         with self.lock:
00233             # Publishing ALIVE
00234             while not self.is_shutdown and self.sm.getState().getName() in ['SM.WaitingForSister', 'SM.Alive']:
00235                 self._publish(True)
00236                 self.condition.wait(self.heartbeat_period)
00237 
00238             # Publishing DEAD
00239             while not self.is_shutdown and self.sm.getState().getName() == 'SM.AwaitSisterDeath':
00240                 self._publish(False)
00241                 self.condition.wait(Constants.DEAD_PUBLISH_PERIOD)
00242 
00243     def _flush_pending_callbacks(self):
00244         callbacks = []
00245         with self.lock:
00246             callbacks = self.pending_callbacks
00247             self.pending_callbacks = []
00248         for c in callbacks:
00249             c()
00250 
00251     ## \brief INTERNAL
00252     def Connected(self):
00253         self.connect_timer.cancel()
00254         self.condition.notify_all()
00255         if self.on_formed:
00256             self.pending_callbacks.append(self.on_formed)
00257 
00258     ## \brief INTERNAL
00259     def Heartbeat(self):
00260         self.heartbeat_timer.reset()
00261 
00262     ## \brief INTERNAL
00263     def SisterDied(self):
00264         self.sister_died_first = True
00265 
00266     ## \brief INTERNAL
00267     def Death(self):
00268         self.condition.notify_all()
00269         self.heartbeat_timer.cancel()
00270         self.disconnect_timer.cancel()
00271         if self.on_broken:
00272             self.pending_callbacks.append(self.on_broken)
00273 
00274     ## \brief INTERNAL
00275     def StartDying(self):
00276         self.heartbeat_timer.cancel()
00277         self.disconnect_timer.reset()
00278 
00279     ## \brief Sets the formed callback
00280     def set_formed_callback(self, on_formed):
00281         with self.lock:
00282             self.on_formed = on_formed
00283 
00284     ## \brief Sets the broken callback
00285     def set_broken_callback(self, on_broken):
00286         with self.lock:
00287             self.on_broken = on_broken
00288 
00289     ## \brief Blocks until the bond is formed for at most 'duration'.
00290     #
00291     # \param timeout Maximum duration to wait.  If None then this call will not timeout.
00292     # \return true iff the bond has been formed.
00293     def wait_until_formed(self, timeout = None):
00294         deadline = timeout and Timeout(timeout).reset()
00295         with self.lock:
00296             while self.sm.getState().getName() == 'SM.WaitingForSister':
00297                 if rospy.is_shutdown():
00298                     break
00299                 if deadline and deadline.left() == rospy.Duration(0):
00300                     break
00301                 wait_duration = 0.1
00302                 if deadline:
00303                     wait_duration = min(wait_duration, deadline.left().to_sec())
00304                 self.condition.wait(wait_duration)
00305             return self.sm.getState().getName() != 'SM.WaitingForSister'
00306 
00307     ## \brief Blocks until the bond is broken for at most 'duration'.
00308     #
00309     # \param timeout Maximum duration to wait.  If None then this call will not timeout.
00310     # \return true iff the bond has been broken, even if it has never been formed.
00311     def wait_until_broken(self, timeout = None):
00312         deadline = timeout and Timeout(timeout).reset()
00313         with self.lock:
00314             while self.sm.getState().getName() != 'SM.Dead':
00315                 if rospy.is_shutdown():
00316                     break
00317                 if deadline and deadline.left() == rospy.Duration(0):
00318                     break
00319                 wait_duration = 0.1
00320                 if deadline:
00321                     wait_duration = min(wait_duration, deadline.left().to_sec())
00322                 self.condition.wait(wait_duration)
00323             return self.sm.getState().getName() == 'SM.Dead'
00324 
00325     ## \brief Indicates if the bond is broken
00326     # \return true iff the bond has been broken.
00327     def is_broken(self):
00328         with self.lock:
00329             return self.sm.getState().getName() == 'SM.Dead'
00330 
00331     ## \brief Breaks the bond, notifying the other process.
00332     def break_bond(self):
00333         with self.lock:
00334             self.sm.Die()
00335             self._publish(False)
00336         self._flush_pending_callbacks()
00337 
00338 
00339     def __repr__(self):
00340         return "[Bond %s, Instance %s (%s)]" % (self.id, self.instance_id, self.sm.getState().getName())


bondpy
Author(s): Stuart Glaser
autogenerated on Sun Oct 5 2014 22:32:32