lockable_state_machine.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 from flexbe_core.core.ros_state_machine import RosStateMachine
3 
4 
6  """
7  A state machine that can be locked.
8  When locked, no transition can be done regardless of the resulting outcome.
9  However, if any outcome would be triggered, the outcome will be stored
10  and the state won't be executed anymore until it is unlocked and the stored outcome is set.
11  """
12  path_for_switch = None
13 
14  def __init__(self, *args, **kwargs):
15  super(LockableStateMachine, self).__init__(*args, **kwargs)
16  self._locked = False
17 
18  def get_deep_state(self):
19  """
20  Looks for the current state (traversing all state machines down to the real state).
21 
22  @return: The current state (not state machine)
23  """
24  container = self
25  while isinstance(container._current_state, LockableStateMachine):
26  container = container._current_state
27  return container._current_state
28 
29  def _is_internal_transition(self, transition_target):
30  return transition_target in self._labels
31 
32  def transition_allowed(self, state, outcome):
33  if outcome is None or outcome == 'None':
34  return True
35  transition_target = self._transitions[state].get(outcome)
36  return (self._is_internal_transition(transition_target) or
37  (not self._locked and (self.parent is None or
38  self.parent.transition_allowed(self.name, transition_target))))
39 
40  # for switching
41 
42  def execute(self, userdata):
43  if (LockableStateMachine.path_for_switch is not None
44  and LockableStateMachine.path_for_switch.startswith(self.path)):
45  path_segments = LockableStateMachine.path_for_switch.replace(self.path, "", 1).split("/")
46  wanted_state = path_segments[1]
47  self._current_state = self._labels[wanted_state]
48  if len(path_segments) <= 2:
49  LockableStateMachine.path_for_switch = None
50  return super(LockableStateMachine, self).execute(userdata)
51 
52  def replace_userdata(self, userdata):
53  self._userdata = userdata
54 
55  def replace_state(self, state):
56  old_state = self._labels[state.name]
57  state._parent = old_state._parent
58  self._states[self._states.index(old_state)] = state
59  self._labels[state.name] = state
60 
61  def remove_state(self, state):
62  del self._labels[state.name]
63  self._states.remove(state)
64 
65  # for locking
66 
67  def lock(self, path):
68  if path == self.path:
69  self._locked = True
70  return True
71  elif self._parent is not None:
72  return self._parent.lock(path)
73  else:
74  return False
75 
76  def unlock(self, path):
77  if path == self.path:
78  self._locked = False
79  return True
80  elif self._parent is not None:
81  return self._parent.unlock(path)
82  else:
83  return False
84 
85  def is_locked(self):
86  return self._locked
87 
88  def is_locked_inside(self):
89  if self._locked:
90  return True
91  for state in self._states:
92  result = False
93  if isinstance(state, LockableStateMachine):
94  result = state.is_locked_inside()
95  else:
96  result = state.is_locked()
97  if result is True:
98  return True
99  return False
100 
101  def get_locked_state(self):
102  if self._locked:
103  return self
104  for state in self._states:
105  if state.is_locked():
106  return state
107  elif isinstance(state, LockableStateMachine):
108  return state.get_locked_state()
109  return None


flexbe_core
Author(s): Philipp Schillinger
autogenerated on Sun Dec 13 2020 04:01:39