Go to the documentation of this file.00001
00002 import smach
00003 import rospy
00004
00005
00006 class LockableStateMachine(smach.StateMachine):
00007 """
00008 A state machine that can be locked.
00009 When locked, no transition can be done regardless of the resulting outcome.
00010 However, if any outcome would be triggered, the outcome will be stored
00011 and the state won't be executed anymore until it is unlocked and the stored outcome is set.
00012 """
00013 path_for_switch = None
00014
00015 def __init__(self, *args, **kwargs):
00016 super(LockableStateMachine, self).__init__(*args, **kwargs)
00017
00018 self._locked = False
00019 self._parent = None
00020 self.transitions = None
00021
00022 self.name = None
00023 self._is_controlled = False
00024
00025
00026 def _get_deep_state(self):
00027 """
00028 Looks for the current state (traversing all state machines down to the real state).
00029
00030 @return: The current state (not state machine)
00031 """
00032 container = self
00033 while isinstance(container._current_state, smach.StateMachine):
00034 container = container._current_state
00035 return container._current_state
00036
00037
00038 def _update_once(self):
00039 if LockableStateMachine.path_for_switch is not None and LockableStateMachine.path_for_switch.startswith(self._get_path()):
00040 path_segments = LockableStateMachine.path_for_switch.replace(self._get_path(), "", 1).split("/")
00041 wanted_state = path_segments[1]
00042 self._set_current_state(wanted_state)
00043 if len(path_segments) <= 2:
00044 LockableStateMachine.path_for_switch = None
00045
00046 return super(LockableStateMachine, self)._update_once()
00047
00048
00049 def _is_internal_transition(self, transition_target):
00050 return transition_target in self._states
00051
00052
00053 def transition_allowed(self, state, outcome):
00054 if outcome is None or outcome == 'None':
00055 return True
00056 transition_target = self._transitions[state][outcome]
00057 return self._is_internal_transition(transition_target) or (not self._locked and (self._parent is None or self._parent.transition_allowed(self.name, transition_target)))
00058
00059
00060 def _get_path(self):
00061 return "" if self._parent is None else self._parent._get_path() + "/" + self.name
00062
00063
00064 def lock(self, path):
00065 if path == self._get_path():
00066 self._locked = True
00067 return True
00068 elif self._parent is not None:
00069 return self._parent.lock(path)
00070 else:
00071 return False
00072
00073
00074 def unlock(self, path):
00075 if path == self._get_path():
00076 self._locked = False
00077 return True
00078 elif self._parent is not None:
00079 return self._parent.unlock(path)
00080 else:
00081 return False
00082
00083
00084 def is_locked(self):
00085 return self._locked
00086
00087
00088 def is_locked_inside(self):
00089 if self._locked:
00090 return True
00091
00092 for state in self._states:
00093 result = False
00094 if isinstance(state, smach.StateMachine):
00095 result = state.is_locked_inside()
00096 else:
00097 result = state.is_locked()
00098 if result is True:
00099 return True
00100
00101 return False
00102
00103
00104 def get_locked_state(self):
00105 if self._locked:
00106 return self
00107
00108 for state in self._states:
00109 if state.is_locked():
00110 return state
00111 elif isinstance(state, smach.StateMachine):
00112 return state.get_locked_state()
00113
00114 return None