lockable_state_machine.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import smach
3 import rospy
4 
5 
6 class LockableStateMachine(smach.StateMachine):
7  """
8  A state machine that can be locked.
9  When locked, no transition can be done regardless of the resulting outcome.
10  However, if any outcome would be triggered, the outcome will be stored
11  and the state won't be executed anymore until it is unlocked and the stored outcome is set.
12  """
13  path_for_switch = None
14 
15  def __init__(self, *args, **kwargs):
16  super(LockableStateMachine, self).__init__(*args, **kwargs)
17 
18  self._locked = False
19  self._parent = None
20  self.transitions = None
21 
22  self.name = None
23  self._is_controlled = False
24 
25 
26  def _get_deep_state(self):
27  """
28  Looks for the current state (traversing all state machines down to the real state).
29 
30  @return: The current state (not state machine)
31  """
32  container = self
33  while isinstance(container._current_state, smach.StateMachine):
34  container = container._current_state
35  return container._current_state
36 
37 
38  def _update_once(self):
39  if LockableStateMachine.path_for_switch is not None and LockableStateMachine.path_for_switch.startswith(self._get_path()):
40  path_segments = LockableStateMachine.path_for_switch.replace(self._get_path(), "", 1).split("/")
41  wanted_state = path_segments[1]
42  self._set_current_state(wanted_state)
43  if len(path_segments) <= 2:
44  LockableStateMachine.path_for_switch = None
45  #self._current_state._entering = False
46  return super(LockableStateMachine, self)._update_once()
47 
48 
49  def _is_internal_transition(self, transition_target):
50  return transition_target in self._states
51 
52 
53  def transition_allowed(self, state, outcome):
54  if outcome is None or outcome == 'None':
55  return True
56  transition_target = self._transitions[state][outcome]
57  return self._is_internal_transition(transition_target) or (not self._locked and (self._parent is None or self._parent.transition_allowed(self.name, transition_target)))
58 
59 
60  def _get_path(self):
61  return "" if self._parent is None else self._parent._get_path() + "/" + self.name
62 
63 
64  def lock(self, path):
65  if path == self._get_path():
66  self._locked = True
67  return True
68  elif self._parent is not None:
69  return self._parent.lock(path)
70  else:
71  return False
72 
73 
74  def unlock(self, path):
75  if path == self._get_path():
76  self._locked = False
77  return True
78  elif self._parent is not None:
79  return self._parent.unlock(path)
80  else:
81  return False
82 
83 
84  def is_locked(self):
85  return self._locked
86 
87 
88  def is_locked_inside(self):
89  if self._locked:
90  return True
91 
92  for state in self._states:
93  result = False
94  if isinstance(state, smach.StateMachine):
95  result = state.is_locked_inside()
96  else:
97  result = state.is_locked()
98  if result is True:
99  return True
100 
101  return False
102 
103 
104  def get_locked_state(self):
105  if self._locked:
106  return self
107 
108  for state in self._states:
109  if state.is_locked():
110  return state
111  elif isinstance(state, smach.StateMachine):
112  return state.get_locked_state()
113 
114  return None


flexbe_core
Author(s): Philipp Schillinger
autogenerated on Wed Jun 5 2019 21:51:59