flexbe_mirror.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 
00003 import roslib; roslib.load_manifest('flexbe_mirror')
00004 import rospy
00005 import smach
00006 import threading
00007 import zlib
00008 
00009 from flexbe_core import JumpableStateMachine, LockableStateMachine
00010 from flexbe_core.core import PreemptableState
00011 from mirror_state import MirrorState
00012 
00013 from flexbe_core.proxy import ProxyPublisher, ProxySubscriberCached
00014 from flexbe_msgs.msg import ContainerStructure, BehaviorSync, BEStatus
00015 from std_msgs.msg import Empty, String, Int32, UInt8
00016 
00017 '''
00018 Created on 08.05.2013
00019 
00020 @author: Philipp Schillinger
00021 '''
00022 class VigirBehaviorMirror(object):
00023     '''
00024     classdocs
00025     '''
00026 
00027 
00028     def __init__(self):
00029         '''
00030         Constructor
00031         '''
00032         self._sm = None
00033 
00034         smach.set_loggers (
00035             rospy.logdebug, # hide SMACH transition log spamming
00036             rospy.logwarn,
00037             rospy.logdebug,
00038             rospy.logerr
00039         )
00040         
00041         # set up proxys for sm <--> GUI communication
00042         # publish topics
00043         self._pub = ProxyPublisher({'flexbe/behavior_update': String,
00044                                     'flexbe/request_mirror_structure': Int32})
00045             
00046         self._running = False
00047         self._stopping = False
00048         self._active_id = 0
00049         self._starting_path = None
00050         self._current_struct = None
00051         self._sync_lock = threading.Lock()
00052 
00053         self._outcome_topic = 'flexbe/mirror/outcome'
00054 
00055         self._struct_buffer = list()
00056         
00057         # listen for mirror message
00058         self._sub = ProxySubscriberCached()
00059         self._sub.subscribe(self._outcome_topic, UInt8)
00060         self._sub.enable_buffer(self._outcome_topic)
00061 
00062         self._sub.subscribe('flexbe/mirror/structure', ContainerStructure, self._mirror_callback)
00063         self._sub.subscribe('flexbe/status', BEStatus, self._status_callback)
00064         self._sub.subscribe('flexbe/mirror/sync', BehaviorSync, self._sync_callback)
00065         self._sub.subscribe('flexbe/mirror/preempt', Empty, self._preempt_callback)
00066     
00067     
00068     def _mirror_callback(self, msg):
00069         rate = rospy.Rate(10)
00070         while self._stopping:
00071             rate.sleep()
00072             
00073         if self._running:
00074             rospy.logwarn('Received a new mirror structure while mirror is already running, adding to buffer (checksum: %s).' % str(msg.behavior_id))
00075         elif self._active_id != 0 and msg.behavior_id != self._active_id:
00076             rospy.logwarn('checksum of received mirror structure (%s) does not match expected checksum (%s), will ignore.' % (str(msg.behavior_id), str(self._active_id)))
00077             return
00078         else:
00079             rospy.loginfo('Received a new mirror structure for checksum %s' % str(msg.behavior_id))
00080 
00081         self._struct_buffer.append(msg)
00082 
00083         if self._active_id == msg.behavior_id:
00084             self._struct_buffer = list()
00085             self._mirror_state_machine(msg)
00086             rospy.loginfo('Mirror built.')
00087 
00088             self._execute_mirror()
00089 
00090 
00091     def _status_callback(self, msg):
00092         if msg.code == BEStatus.STARTED:
00093             thread = threading.Thread(target=self._start_mirror, args=[msg])
00094             thread.daemon = True
00095             thread.start()
00096         else:
00097             thread = threading.Thread(target=self._stop_mirror, args=[msg])
00098             thread.daemon = True
00099             thread.start()
00100 
00101 
00102     def _start_mirror(self, msg):
00103         self._sync_lock.acquire()
00104         rate = rospy.Rate(10)
00105         while self._stopping:
00106             rate.sleep()
00107 
00108         if self._running:
00109             rospy.logwarn('Tried to start mirror while it is already running, will ignore.')
00110             return
00111 
00112         if len(msg.args) > 0:
00113             self._starting_path = "/" + msg.args[0][1:].replace("/", "_mirror/") + "_mirror"
00114 
00115         self._active_id = msg.behavior_id
00116 
00117         while self._sm is None and len(self._struct_buffer) > 0:
00118             struct = self._struct_buffer[0]
00119             self._struct_buffer = self._struct_buffer[1:]
00120             if struct.behavior_id == self._active_id:
00121                 self._mirror_state_machine(struct)
00122                 rospy.loginfo('Mirror built.')
00123             else:
00124                 rospy.logwarn('Discarded mismatching buffered structure for checksum %s' % str(struct.behavior_id))
00125 
00126         if self._sm is None:
00127             rospy.logwarn('Missing correct mirror structure, requesting...')
00128             rospy.sleep(0.2) # no clean wayacquire to wait for publisher to be ready...
00129             self._pub.publish('flexbe/request_mirror_structure', Int32(msg.behavior_id))
00130             self._active_id = msg.behavior_id
00131             return
00132         self._sync_lock.release()
00133 
00134         self._execute_mirror()
00135 
00136 
00137     def _stop_mirror(self, msg):
00138         self._sync_lock.acquire()
00139         self._stopping = True
00140         if self._sm is not None and self._running:
00141             if msg is not None and msg.code == BEStatus.FINISHED:
00142                 rospy.loginfo('Onboard behavior finished successfully.')
00143                 self._pub.publish('flexbe/behavior_update', String())
00144             elif msg is not None and msg.code == BEStatus.SWITCHING:
00145                 self._starting_path = None
00146                 rospy.loginfo('Onboard performing behavior switch.')
00147             elif msg is not None and msg.code == BEStatus.READY:
00148                 rospy.loginfo('Onboard engine just started, stopping currently running mirror.')
00149                 self._pub.publish('flexbe/behavior_update', String())
00150             elif msg is not None:
00151                 rospy.logwarn('Onboard behavior failed!')
00152                 self._pub.publish('flexbe/behavior_update', String())
00153 
00154             PreemptableState.preempt = True
00155             rate = rospy.Rate(10)
00156             while self._running:
00157                 rate.sleep()
00158         else:
00159             rospy.loginfo('No onboard behavior is active.')
00160 
00161         self._active_id = 0
00162         self._sm = None
00163         self._current_struct = None
00164         self._sub.remove_last_msg(self._outcome_topic, clear_buffer=True)
00165 
00166         if msg is not None and msg.code != BEStatus.SWITCHING:
00167             rospy.loginfo('\033[92m--- Behavior Mirror ready! ---\033[0m')
00168 
00169         self._stopping = False
00170         self._sync_lock.release()
00171 
00172 
00173     def _sync_callback(self, msg):
00174         if msg.behavior_id == self._active_id:
00175             thread = threading.Thread(target=self._restart_mirror, args=[msg])
00176             thread.daemon = True
00177             thread.start()
00178         else:
00179             rospy.logerr('Cannot synchronize! Different behavior is running onboard, please stop execution!')
00180             thread = threading.Thread(target=self._stop_mirror, args=[None])
00181             thread.daemon = True
00182             thread.start()
00183 
00184 
00185     def _restart_mirror(self, msg):
00186         self._sync_lock.acquire()
00187         rospy.loginfo('Restarting mirror for synchronization...')
00188         self._sub.remove_last_msg(self._outcome_topic, clear_buffer=True)
00189         if self._sm is not None and self._running:
00190             PreemptableState.preempt = True
00191             rate = rospy.Rate(10)
00192             while self._running:
00193                 rate.sleep()
00194             self._sm = None
00195         if msg.current_state_checksum in self._state_checksums:
00196             current_state_path = self._state_checksums[msg.current_state_checksum]
00197             self._starting_path = "/" + current_state_path[1:].replace("/", "_mirror/") + "_mirror"
00198             rospy.loginfo('Current state: %s' % current_state_path)
00199         try:
00200             self._mirror_state_machine(self._current_struct)
00201             rospy.loginfo('Mirror built.')
00202         except (AttributeError, smach.InvalidStateError):
00203             rospy.loginfo('Stopping synchronization because behavior has stopped.')
00204         self._sync_lock.release()
00205 
00206         self._execute_mirror()
00207         
00208 
00209     def _execute_mirror(self):
00210         self._running = True
00211 
00212         rospy.loginfo("Executing mirror...")
00213         if self._starting_path is not None:
00214             LockableStateMachine.path_for_switch = self._starting_path
00215             rospy.loginfo("Starting mirror in state " + self._starting_path)
00216             self._starting_path = None
00217 
00218         result = 'preempted'
00219         try:
00220             result = self._sm.execute()
00221         except Exception as e:
00222             rospy.loginfo('Caught exception on preempt:\n%s' % str(e))
00223             result = 'preempted'
00224 
00225         if self._sm is not None:
00226             self._sm.destroy()
00227         self._running = False
00228 
00229         rospy.loginfo('Mirror finished with result ' + result)
00230     
00231     
00232     def _mirror_state_machine(self, msg):
00233         self._current_struct = msg
00234         self._state_checksums = dict()
00235         root = None
00236         for con_msg in msg.containers:
00237             if con_msg.path.find('/') == -1:
00238                 root = con_msg.path
00239                 break
00240         self._add_node(msg, root)
00241         # calculate checksums of all states
00242         for con_msg in msg.containers:
00243             if con_msg.path.find('/') != -1:
00244                 self._state_checksums[zlib.adler32(con_msg.path)] = con_msg.path
00245         
00246     
00247     def _add_node(self, msg, path):
00248         #rospy.loginfo('Add node: '+path)
00249         container = None
00250         for con_msg in msg.containers:
00251             if con_msg.path == path:
00252                 container = con_msg
00253                 break
00254             
00255         transitions = None
00256         if container.transitions is not None:
00257             transitions = {}
00258             for i in range(len(container.transitions)):
00259                 transitions[container.outcomes[i]] = container.transitions[i] + '_mirror'
00260             
00261         path_frags = path.split('/')
00262         container_name = path_frags[len(path_frags)-1]
00263         if len(container.children) > 0:
00264             sm_outcomes = []
00265             for outcome in container.outcomes: sm_outcomes.append(outcome + '_mirror')
00266             sm = JumpableStateMachine(outcomes=sm_outcomes)
00267             with sm:
00268                 for child in container.children: 
00269                     self._add_node(msg, path+'/'+child)
00270             if len(transitions) > 0: 
00271                 container_transitions = {}
00272                 for i in range(len(container.transitions)): container_transitions[sm_outcomes[i]] = transitions[container.outcomes[i]]
00273                 JumpableStateMachine.add(container_name + '_mirror', sm, transitions=container_transitions)
00274             else:
00275                 self._sm = sm
00276         else:
00277             JumpableStateMachine.add(container_name + '_mirror', MirrorState(container_name, path, container.outcomes, container.autonomy), transitions=transitions)
00278     
00279     
00280     def _jump_callback(self, msg):
00281         start_time = rospy.Time.now()
00282         current_elapsed = 0
00283         jumpable = True
00284         while self._sm is None or not self._sm.is_running():
00285             current_elapsed = rospy.Time.now() - start_time
00286             if current_elapsed > rospy.Duration.from_sec(10):
00287                 jumpable = False
00288                 break
00289             rospy.sleep(0.05)
00290         if jumpable:
00291             self._sm.jump_to(msg.stateName)
00292         else:
00293             rospy.logwarn('Could not jump in mirror: Mirror does not exist or is not running!')
00294             
00295 
00296     def _preempt_callback(self, msg):
00297         if self._sm is not None and self._sm.is_running():
00298             rospy.logwarn('Explicit preempting is currently ignored, mirror should be preempted by onboard behavior.')
00299         else:
00300             rospy.logwarn('Could not preempt mirror because it seems not to be running!')
00301 
00302 
00303 
00304 


flexbe_mirror
Author(s): Philipp Schillinger
autogenerated on Thu Jun 6 2019 19:32:36