bondpy.py
Go to the documentation of this file.
00001 # Copyright (c) 2009, Willow Garage, Inc.
00002 # All rights reserved.
00003 #
00004 # Redistribution and use in source and binary forms, with or without
00005 # modification, are permitted provided that the following conditions are met:
00006 #
00007 #     * Redistributions of source code must retain the above copyright
00008 #       notice, this list of conditions and the following disclaimer.
00009 #     * Redistributions in binary form must reproduce the above copyright
00010 #       notice, this list of conditions and the following disclaimer in the
00011 #       documentation and/or other materials provided with the distribution.
00012 #     * Neither the name of the Willow Garage, Inc. nor the names of its
00013 #       contributors may be used to endorse or promote products derived from
00014 #       this software without specific prior written permission.
00015 #
00016 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00017 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00018 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00019 # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00020 # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00021 # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00022 # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00023 # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00024 # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00025 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00026 # POSSIBILITY OF SUCH DAMAGE.
00027 
00028 ## \author Stuart Glaser
00029 
00030 import sys
00031 import threading
00032 import time
00033 import uuid
00034 
00035 import roslib; roslib.load_manifest('bondpy')
00036 import rospy
00037 from bond.msg import *
00038 
00039 import BondSM_sm
00040 
00041 def as_ros_duration(d):
00042     if not isinstance(d, roslib.rostime.Duration):
00043         return rospy.Duration.from_sec(d)
00044     return d
00045 def as_float_duration(d):
00046     if isinstance(d, roslib.rostime.Duration):
00047         return d.to_sec()
00048     return d
00049 
00050 
00051 ## Internal use only
00052 class Timeout:
00053     def __init__(self, duration, on_timeout=None):
00054         self.duration = duration
00055         self.timer = threading.Timer(0, self._on_timer)
00056         self.deadline = time.time()
00057         self.on_timeout = on_timeout
00058 
00059     def reset(self):
00060         self.timer.cancel()
00061         self.timer = threading.Timer(self.duration.to_sec(), self._on_timer)
00062         self.timer.start()
00063         self.deadline = time.time() + self.duration.to_sec()
00064         return self
00065 
00066     def cancel(self):
00067         self.timer.cancel()
00068 
00069     def left(self):
00070         return max(rospy.Duration(0), rospy.Duration(self.deadline - time.time()))
00071 
00072     def _on_timer(self):
00073         if self.on_timeout:
00074             self.on_timeout()
00075 
00076 ## \brief Forms a bond to monitor another process.
00077 #
00078 # The Bond class implements a bond, allowing you to monitor another
00079 # process and be notified when it dies.  In turn, it will be notified
00080 # when you die.
00081 class Bond(object):
00082     ## \brief Constructs a bond, but does not connect.
00083     #
00084     # \param topic The topic used to exchange the bond status messages.
00085     # \param id The ID of the bond, which should match the ID used on
00086     #           the sister's end
00087     # \param on_broken callback that will be called when the bond is broken.
00088     # \param on_formed callback that will be called when the bond is formed.
00089     def __init__(self, topic, id, on_broken=None, on_formed=None):
00090         self.__started = False
00091         self.topic = topic
00092         self.id = id
00093         self.instance_id = str(uuid.uuid4())
00094         self.sister_instance_id = None
00095         self.on_broken = on_broken
00096         self.on_formed = on_formed
00097         self.is_shutdown = False
00098         self.sister_died_first = False
00099 
00100         # Callbacks must be called outside of the locks and after the
00101         # state machine finishes transitioning.
00102         self.pending_callbacks = []
00103 
00104         self.sm = BondSM_sm.BondSM_sm(self)
00105         #self.sm.setDebugFlag(True)
00106         self.lock = threading.RLock()
00107         self.condition = threading.Condition(self.lock)
00108 
00109         # Sets the default timeout values
00110         self.connect_timeout = Constants.DEFAULT_CONNECT_TIMEOUT
00111         self.heartbeat_timeout = Constants.DEFAULT_HEARTBEAT_TIMEOUT
00112         self.disconnect_timeout = Constants.DEFAULT_DISCONNECT_TIMEOUT
00113         self.heartbeat_period = Constants.DEFAULT_HEARTBEAT_PERIOD
00114 
00115         self.pub = rospy.Publisher(self.topic, Status)
00116 
00117 
00118     def get_connect_timeout(self):
00119         return self.__connect_timeout
00120     def set_connect_timeout(self, dur):
00121         assert not self.__started
00122         self.__connect_timeout = dur
00123         self.connect_timer = Timeout(as_ros_duration(dur), self._on_connect_timeout)
00124     connect_timeout = property(get_connect_timeout, set_connect_timeout)
00125 
00126     def get_heartbeat_timeout(self):
00127         return self.__heartbeat_timeout
00128     def set_heartbeat_timeout(self, dur):
00129         assert not self.__started
00130         self.__heartbeat_timeout = dur
00131         self.heartbeat_timer = Timeout(as_ros_duration(dur), self._on_heartbeat_timeout)
00132     heartbeat_timeout = property(get_heartbeat_timeout, set_heartbeat_timeout)
00133 
00134     def get_disconnect_timeout(self):
00135         return self.__disconnect_timeout
00136     def set_disconnect_timeout(self, dur):
00137         assert not self.__started
00138         self.__disconnect_timeout = dur
00139         self.disconnect_timer = Timeout(as_ros_duration(dur), self._on_disconnect_timeout)
00140     disconnect_timeout = property(get_disconnect_timeout, set_disconnect_timeout)
00141 
00142     def get_heartbeat_period(self):
00143         return self.__heartbeat_period
00144     def set_heartbeat_period(self, per):
00145         assert not self.__started
00146         self.__heartbeat_period = as_float_duration(per)
00147     heartbeat_period = property(get_heartbeat_period, set_heartbeat_period)
00148 
00149 
00150     ## \brief Starts the bond and connects to the sister process.
00151     def start(self):
00152         with self.lock:
00153             self.connect_timer.reset()
00154             self.sub = rospy.Subscriber(self.topic, Status, self._on_bond_status)
00155 
00156             self.thread = threading.Thread(target=self._publishing_thread)
00157             self.thread.daemon = True
00158             self.thread.start()
00159             self.__started = True
00160 
00161 
00162     def _on_connect_timeout(self):
00163         with self.lock:
00164             self.sm.ConnectTimeout()
00165         self._flush_pending_callbacks()
00166 
00167     def _on_heartbeat_timeout(self):
00168         # Checks that heartbeat timeouts haven't been disabled globally
00169         disable_heartbeat_timeout = rospy.get_param(Constants.DISABLE_HEARTBEAT_TIMEOUT_PARAM, False)
00170         if disable_heartbeat_timeout:
00171             rospy.logwarn("Heartbeat timeout is disabled.  Not breaking bond (topic: %s, id: %s)" % \
00172                               (self.topic, self.id))
00173             return
00174 
00175         with self.lock:
00176             self.sm.HeartbeatTimeout()
00177         self._flush_pending_callbacks()
00178 
00179     def _on_disconnect_timeout(self):
00180         with self.lock:
00181             self.sm.DisconnectTimeout()
00182         self._flush_pending_callbacks()
00183 
00184     def __del__(self):
00185         self.shutdown()
00186 
00187     def shutdown(self):
00188         if not self.is_shutdown:
00189             self.sub.unregister()
00190             with self.lock:
00191                 self.is_shutdown = True
00192                 if self.sm.getState().getName() != 'SM.Dead':
00193                     self.break_bond()
00194                 self.pub.unregister()
00195                 self.condition.notify_all()
00196 
00197     def _on_bond_status(self, msg):
00198         # Filters out messages from other bonds and messages from ourself
00199         if msg.id == self.id and msg.instance_id != self.instance_id:
00200             with self.lock:
00201                 if not self.sister_instance_id:
00202                     self.sister_instance_id = msg.instance_id
00203 
00204                 if msg.instance_id != self.sister_instance_id:
00205                     rospy.logerr("More than two locations are trying to use a single bond (topic: %s, id: %s).  " + \
00206                                  "You should only instantiate at most two bond instances for each (topic, id) pair." % \
00207                                      (self.topic, self.id))
00208                     return
00209 
00210                 if msg.active:
00211                     self.sm.SisterAlive()
00212                 else:
00213                     self.sm.SisterDead()
00214 
00215                     # Immediate ack for sister's death notification
00216                     if self.sister_died_first:
00217                         self._publish(False)
00218             self._flush_pending_callbacks()
00219 
00220 
00221 
00222     def _publish(self, active):
00223         msg = Status()
00224         msg.header.stamp = rospy.Time.now()
00225         msg.id = self.id
00226         msg.instance_id = self.instance_id
00227         msg.active = active
00228         msg.heartbeat_timeout = self.heartbeat_timeout
00229         msg.heartbeat_period = self.heartbeat_period
00230         self.pub.publish(msg)
00231 
00232     def _publishing_thread(self):
00233         with self.lock:
00234             # Publishing ALIVE
00235             while not self.is_shutdown and self.sm.getState().getName() in ['SM.WaitingForSister', 'SM.Alive']:
00236                 self._publish(True)
00237                 self.condition.wait(self.heartbeat_period)
00238 
00239             # Publishing DEAD
00240             while not self.is_shutdown and self.sm.getState().getName() == 'SM.AwaitSisterDeath':
00241                 self._publish(False)
00242                 self.condition.wait(Constants.DEAD_PUBLISH_PERIOD)
00243 
00244     def _flush_pending_callbacks(self):
00245         callbacks = []
00246         with self.lock:
00247             callbacks = self.pending_callbacks
00248             self.pending_callbacks = []
00249         for c in callbacks:
00250             c()
00251 
00252     ## \brief INTERNAL
00253     def Connected(self):
00254         self.connect_timer.cancel()
00255         self.condition.notify_all()
00256         if self.on_formed:
00257             self.pending_callbacks.append(self.on_formed)
00258 
00259     ## \brief INTERNAL
00260     def Heartbeat(self):
00261         self.heartbeat_timer.reset()
00262 
00263     ## \brief INTERNAL
00264     def SisterDied(self):
00265         self.sister_died_first = True
00266 
00267     ## \brief INTERNAL
00268     def Death(self):
00269         self.condition.notify_all()
00270         self.heartbeat_timer.cancel()
00271         self.disconnect_timer.cancel()
00272         if self.on_broken:
00273             self.pending_callbacks.append(self.on_broken)
00274 
00275     ## \brief INTERNAL
00276     def StartDying(self):
00277         self.heartbeat_timer.cancel()
00278         self.disconnect_timer.reset()
00279 
00280     ## \brief Sets the formed callback
00281     def set_formed_callback(self, on_formed):
00282         with self.lock:
00283             self.on_formed = on_formed
00284 
00285     ## \brief Sets the broken callback
00286     def set_broken_callback(self, on_broken):
00287         with self.lock:
00288             self.on_broken = on_broken
00289 
00290     ## \brief Blocks until the bond is formed for at most 'duration'.
00291     #
00292     # \param timeout Maximum duration to wait.  If None then this call will not timeout.
00293     # \return true iff the bond has been formed.
00294     def wait_until_formed(self, timeout = None):
00295         deadline = timeout and Timeout(timeout).reset()
00296         with self.lock:
00297             while self.sm.getState().getName() == 'SM.WaitingForSister':
00298                 if rospy.is_shutdown():
00299                     break
00300                 if deadline and deadline.left() == rospy.Duration(0):
00301                     break
00302                 wait_duration = 0.1
00303                 if deadline:
00304                     wait_duration = min(wait_duration, deadline.left().to_sec())
00305                 self.condition.wait(wait_duration)
00306             return self.sm.getState().getName() != 'SM.WaitingForSister'
00307 
00308     ## \brief Blocks until the bond is broken for at most 'duration'.
00309     #
00310     # \param timeout Maximum duration to wait.  If None then this call will not timeout.
00311     # \return true iff the bond has been broken, even if it has never been formed.
00312     def wait_until_broken(self, timeout = None):
00313         deadline = timeout and Timeout(timeout).reset()
00314         with self.lock:
00315             while self.sm.getState().getName() != 'SM.Dead':
00316                 if rospy.is_shutdown():
00317                     break
00318                 if deadline and deadline.left() == rospy.Duration(0):
00319                     break
00320                 wait_duration = 0.1
00321                 if deadline:
00322                     wait_duration = min(wait_duration, deadline.left().to_sec())
00323                 self.condition.wait(wait_duration)
00324             return self.sm.getState().getName() == 'SM.Dead'
00325 
00326     ## \brief Indicates if the bond is broken
00327     # \return true iff the bond has been broken.
00328     def is_broken(self):
00329         with self.lock:
00330             return self.sm.getState().getName() == 'SM.Dead'
00331 
00332     ## \brief Breaks the bond, notifying the other process.
00333     def break_bond(self):
00334         with self.lock:
00335             self.sm.Die()
00336             self._publish(False)
00337         self._flush_pending_callbacks()
00338 
00339 
00340     def __repr__(self):
00341         return "[Bond %s, Instance %s (%s)]" % (self.id, self.instance_id, self.sm.getState().getName())
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Properties Friends


bondpy
Author(s): Stuart Glaser
autogenerated on Thu Aug 15 2013 10:10:41