bondpy.py
Go to the documentation of this file.
00001 # Copyright (c) 2009, Willow Garage, Inc.
00002 # All rights reserved.
00003 #
00004 # Redistribution and use in source and binary forms, with or without
00005 # modification, are permitted provided that the following conditions are met:
00006 #
00007 #     * Redistributions of source code must retain the above copyright
00008 #       notice, this list of conditions and the following disclaimer.
00009 #     * Redistributions in binary form must reproduce the above copyright
00010 #       notice, this list of conditions and the following disclaimer in the
00011 #       documentation and/or other materials provided with the distribution.
00012 #     * Neither the name of the Willow Garage, Inc. nor the names of its
00013 #       contributors may be used to endorse or promote products derived from
00014 #       this software without specific prior written permission.
00015 #
00016 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00017 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00018 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00019 # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00020 # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00021 # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00022 # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00023 # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00024 # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00025 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00026 # POSSIBILITY OF SUCH DAMAGE.
00027 
00028 ## \author Stuart Glaser
00029 
00030 import threading
00031 import time
00032 import uuid
00033 
00034 import rospy
00035 
00036 import BondSM_sm
00037 
00038 from bond.msg import Constants, Status
00039 
00040 
00041 def as_ros_duration(d):
00042     if not isinstance(d, rospy.Duration):
00043         return rospy.Duration.from_sec(d)
00044     return d
00045 
00046 
00047 def as_float_duration(d):
00048     if isinstance(d, rospy.Duration):
00049         return d.to_sec()
00050     return d
00051 
00052 
00053 ## Internal use only
00054 class Timeout:
00055     def __init__(self, duration, on_timeout=None):
00056         self.duration = duration
00057         self.timer = threading.Timer(0, self._on_timer)
00058         self.timer.daemon = True
00059         self.deadline = time.time()
00060         self.on_timeout = on_timeout
00061 
00062     def reset(self):
00063         self.timer.cancel()
00064         self.timer = threading.Timer(self.duration.to_sec(), self._on_timer)
00065         self.timer.daemon = True
00066         self.timer.start()
00067         self.deadline = time.time() + self.duration.to_sec()
00068         return self
00069 
00070     def cancel(self):
00071         self.timer.cancel()
00072 
00073     def left(self):
00074         return max(rospy.Duration(0), rospy.Duration(self.deadline - time.time()))
00075 
00076     def _on_timer(self):
00077         if self.on_timeout:
00078             self.on_timeout()
00079 
00080 
00081 ## \brief Forms a bond to monitor another process.
00082 #
00083 # The Bond class implements a bond, allowing you to monitor another
00084 # process and be notified when it dies.  In turn, it will be notified
00085 # when you die.
00086 class Bond(object):
00087     ## \brief Constructs a bond, but does not connect.
00088     #
00089     # \param topic The topic used to exchange the bond status messages.
00090     # \param id The ID of the bond, which should match the ID used on
00091     #           the sister's end
00092     # \param on_broken callback that will be called when the bond is broken.
00093     # \param on_formed callback that will be called when the bond is formed.
00094     def __init__(self, topic, id, on_broken=None, on_formed=None):
00095         self.__started = False
00096         self.topic = topic
00097         self.id = id
00098         self.instance_id = str(uuid.uuid4())
00099         self.sister_instance_id = None
00100         self.on_broken = on_broken
00101         self.on_formed = on_formed
00102         self.is_shutdown = False
00103         self.sister_died_first = False
00104         # Timeout for wait_until_formed
00105         self.deadline = None
00106 
00107         # Callbacks must be called outside of the locks and after the
00108         # state machine finishes transitioning.
00109         self.pending_callbacks = []
00110 
00111         self.sm = BondSM_sm.BondSM_sm(self)
00112         # self.sm.setDebugFlag(True)
00113         self.lock = threading.RLock()
00114         self.condition = threading.Condition(self.lock)
00115 
00116         # Sets the default timeout values
00117         self.connect_timeout = Constants.DEFAULT_CONNECT_TIMEOUT
00118         self.heartbeat_timeout = Constants.DEFAULT_HEARTBEAT_TIMEOUT
00119         self.disconnect_timeout = Constants.DEFAULT_DISCONNECT_TIMEOUT
00120         self.heartbeat_period = Constants.DEFAULT_HEARTBEAT_PERIOD
00121 
00122         # queue_size 1 : avoid having a client receive backed up, potentially
00123         # late heartbearts, discussion@https://github.com/ros/bond_core/pull/10
00124         self.pub = rospy.Publisher(self.topic, Status, queue_size=1)
00125 
00126     def get_connect_timeout(self):
00127         return self.__connect_timeout
00128 
00129     def set_connect_timeout(self, dur):
00130         assert not self.__started
00131         self.__connect_timeout = dur
00132         self.connect_timer = Timeout(as_ros_duration(dur), self._on_connect_timeout)
00133     connect_timeout = property(get_connect_timeout, set_connect_timeout)
00134 
00135     def get_heartbeat_timeout(self):
00136         return self.__heartbeat_timeout
00137 
00138     def set_heartbeat_timeout(self, dur):
00139         assert not self.__started
00140         self.__heartbeat_timeout = dur
00141         self.heartbeat_timer = Timeout(as_ros_duration(dur), self._on_heartbeat_timeout)
00142     heartbeat_timeout = property(get_heartbeat_timeout, set_heartbeat_timeout)
00143 
00144     def get_disconnect_timeout(self):
00145         return self.__disconnect_timeout
00146 
00147     def set_disconnect_timeout(self, dur):
00148         assert not self.__started
00149         self.__disconnect_timeout = dur
00150         self.disconnect_timer = Timeout(as_ros_duration(dur), self._on_disconnect_timeout)
00151     disconnect_timeout = property(get_disconnect_timeout, set_disconnect_timeout)
00152 
00153     def get_heartbeat_period(self):
00154         return self.__heartbeat_period
00155 
00156     def set_heartbeat_period(self, per):
00157         assert not self.__started
00158         self.__heartbeat_period = as_float_duration(per)
00159     heartbeat_period = property(get_heartbeat_period, set_heartbeat_period)
00160 
00161     ## \brief Starts the bond and connects to the sister process.
00162     def start(self):
00163         with self.lock:
00164             self.connect_timer.reset()
00165             self.sub = rospy.Subscriber(self.topic, Status, self._on_bond_status)
00166 
00167             self.thread = threading.Thread(target=self._publishing_thread)
00168             self.thread.daemon = True
00169             self.thread.start()
00170             self.__started = True
00171 
00172     def _on_connect_timeout(self):
00173         with self.lock:
00174             self.sm.ConnectTimeout()
00175         self._flush_pending_callbacks()
00176 
00177     def _on_heartbeat_timeout(self):
00178         # Checks that heartbeat timeouts haven't been disabled globally
00179         disable_heartbeat_timeout = rospy.get_param(
00180             Constants.DISABLE_HEARTBEAT_TIMEOUT_PARAM, False)
00181         if disable_heartbeat_timeout:
00182             rospy.logwarn(
00183                 "Heartbeat timeout is disabled.  Not breaking bond (topic: %s, id: %s)" %
00184                 (self.topic, self.id))
00185             return
00186 
00187         with self.lock:
00188             self.sm.HeartbeatTimeout()
00189         self._flush_pending_callbacks()
00190 
00191     def _on_disconnect_timeout(self):
00192         with self.lock:
00193             self.sm.DisconnectTimeout()
00194         self._flush_pending_callbacks()
00195 
00196     def __del__(self):
00197         self.shutdown()
00198 
00199     def shutdown(self):
00200         if not self.is_shutdown:
00201             self.sub.unregister()
00202             with self.lock:
00203                 self.is_shutdown = True
00204                 if self.sm.getState().getName() != 'SM.Dead':
00205                     self.break_bond()
00206                 self.pub.unregister()
00207                 self.condition.notify_all()
00208                 self.connect_timer.cancel()
00209                 if self.deadline:
00210                     self.deadline.cancel()
00211 
00212     def _on_bond_status(self, msg):
00213         # Filters out messages from other bonds and messages from ourself
00214         if msg.id == self.id and msg.instance_id != self.instance_id:
00215             with self.lock:
00216                 if not self.sister_instance_id:
00217                     self.sister_instance_id = msg.instance_id
00218 
00219                 if msg.instance_id != self.sister_instance_id:
00220                     rospy.logerr(
00221                         "More than two locations are trying to use a single bond (topic: %s, id: %s).  " +
00222                         "You should only instantiate at most two bond instances for each (topic, id) pair." %
00223                         (self.topic, self.id))
00224                     return
00225 
00226                 if msg.active:
00227                     self.sm.SisterAlive()
00228                 else:
00229                     self.sm.SisterDead()
00230 
00231                     # Immediate ack for sister's death notification
00232                     if self.sister_died_first:
00233                         self._publish(False)
00234             self._flush_pending_callbacks()
00235 
00236     def _publish(self, active):
00237         msg = Status()
00238         msg.header.stamp = rospy.Time.now()
00239         msg.id = self.id
00240         msg.instance_id = self.instance_id
00241         msg.active = active
00242         msg.heartbeat_timeout = self.heartbeat_timeout
00243         msg.heartbeat_period = self.heartbeat_period
00244         self.pub.publish(msg)
00245 
00246     def _publishing_thread(self):
00247         with self.lock:
00248             # Publishing ALIVE
00249             while not self.is_shutdown and self.sm.getState().getName() in ['SM.WaitingForSister', 'SM.Alive']:
00250                 self._publish(True)
00251                 self.condition.wait(self.heartbeat_period)
00252 
00253             # Publishing DEAD
00254             while not self.is_shutdown and self.sm.getState().getName() == 'SM.AwaitSisterDeath':
00255                 self._publish(False)
00256                 self.condition.wait(Constants.DEAD_PUBLISH_PERIOD)
00257 
00258     def _flush_pending_callbacks(self):
00259         callbacks = []
00260         with self.lock:
00261             callbacks = self.pending_callbacks
00262             self.pending_callbacks = []
00263         for c in callbacks:
00264             c()
00265 
00266     ## \brief INTERNAL
00267     def Connected(self):
00268         self.connect_timer.cancel()
00269         self.condition.notify_all()
00270         if self.on_formed:
00271             self.pending_callbacks.append(self.on_formed)
00272 
00273     ## \brief INTERNAL
00274     def Heartbeat(self):
00275         self.heartbeat_timer.reset()
00276 
00277     ## \brief INTERNAL
00278     def SisterDied(self):
00279         self.sister_died_first = True
00280 
00281     ## \brief INTERNAL
00282     def Death(self):
00283         self.condition.notify_all()
00284         self.heartbeat_timer.cancel()
00285         self.disconnect_timer.cancel()
00286         if self.on_broken:
00287             self.pending_callbacks.append(self.on_broken)
00288 
00289     ## \brief INTERNAL
00290     def StartDying(self):
00291         self.heartbeat_timer.cancel()
00292         self.disconnect_timer.reset()
00293 
00294     ## \brief Sets the formed callback
00295     def set_formed_callback(self, on_formed):
00296         with self.lock:
00297             self.on_formed = on_formed
00298 
00299     ## \brief Sets the broken callback
00300     def set_broken_callback(self, on_broken):
00301         with self.lock:
00302             self.on_broken = on_broken
00303 
00304     ## \brief Blocks until the bond is formed for at most 'duration'.
00305     #
00306     # \param timeout Maximum duration to wait.  If None then this call will not timeout.
00307     # \return true iff the bond has been formed.
00308     def wait_until_formed(self, timeout=None):
00309         if self.deadline:
00310             self.deadline.cancel()
00311             self.deadline = None
00312         if timeout:
00313             self.deadline = Timeout(timeout).reset()
00314         with self.lock:
00315             while self.sm.getState().getName() == 'SM.WaitingForSister':
00316                 if rospy.is_shutdown():
00317                     break
00318                 if self.deadline and self.deadline.left() == rospy.Duration(0):
00319                     break
00320                 wait_duration = 0.1
00321                 if self.deadline:
00322                     wait_duration = min(wait_duration, self.deadline.left().to_sec())
00323                 self.condition.wait(wait_duration)
00324             return self.sm.getState().getName() != 'SM.WaitingForSister'
00325 
00326     ## \brief Blocks until the bond is broken for at most 'duration'.
00327     #
00328     # \param timeout Maximum duration to wait.  If None then this call will not timeout.
00329     # \return true iff the bond has been broken, even if it has never been formed.
00330     def wait_until_broken(self, timeout=None):
00331         if self.deadline:
00332             self.deadline.cancel()
00333             self.deadline = None
00334         if timeout:
00335             self.deadline = Timeout(timeout).reset()
00336         with self.lock:
00337             while self.sm.getState().getName() != 'SM.Dead':
00338                 if rospy.is_shutdown():
00339                     break
00340                 if self.deadline and self.deadline.left() == rospy.Duration(0):
00341                     break
00342                 wait_duration = 0.1
00343                 if self.deadline:
00344                     wait_duration = min(wait_duration, self.deadline.left().to_sec())
00345                 self.condition.wait(wait_duration)
00346             return self.sm.getState().getName() == 'SM.Dead'
00347 
00348     ## \brief Indicates if the bond is broken
00349     # \return true iff the bond has been broken.
00350     def is_broken(self):
00351         with self.lock:
00352             return self.sm.getState().getName() == 'SM.Dead'
00353 
00354     ## \brief Breaks the bond, notifying the other process.
00355     def break_bond(self):
00356         with self.lock:
00357             self.sm.Die()
00358             self._publish(False)
00359         self._flush_pending_callbacks()
00360 
00361     def __repr__(self):
00362         return "[Bond %s, Instance %s (%s)]" % \
00363             (self.id, self.instance_id, self.sm.getState().getName())


bondpy
Author(s): Stuart Glaser
autogenerated on Thu Aug 17 2017 03:12:48