state_machine.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 from flexbe_core.core.state import State
3 from flexbe_core.core.user_data import UserData
4 from flexbe_core.core.exceptions import StateError, StateMachineError
5 
6 
8 
9  _currently_opened_container = None
10 
11  def __init__(self, *args, **kwargs):
12  super(StateMachine, self).__init__(*args, **kwargs)
13  self._states = list()
14  self._labels = dict()
15  self._transitions = dict()
16  self._remappings = dict()
17  self._current_state = None
19  self._userdata = None
21 
22  def __enter__(self):
23  self._previously_opened_container = StateMachine._currently_opened_container
24  StateMachine._currently_opened_container = self
25 
26  def __exit__(self, *args):
27  StateMachine._currently_opened_container = self._previously_opened_container
29 
30  def __contains__(self, label):
31  return label in self._labels
32 
33  def __getitem__(self, label):
34  return self._labels[label]
35 
36  def __iter__(self):
37  return iter(state.name for state in self._states)
38 
39  # construction
40 
41  @staticmethod
42  def add(label, state, transitions, remapping=None):
43  self = StateMachine.get_opened_container()
44  if self is None:
45  raise StateMachineError("No container openend, activate one first by: 'with state_machine:'")
46  if label in self._labels:
47  raise StateMachineError("The label %s has already been added to this state machine!" % label)
48  if label in self._outcomes:
49  raise StateMachineError("The label %s is an outcome of this state machine!" % label)
50  # add state to this state machine
51  self._states.append(state)
52  self._labels[label] = state
53  self._transitions[label] = transitions
54  self._remappings[label] = remapping or dict()
55  # update state instance
56  state.set_name(label)
57  state.set_parent(self)
58 
59  @staticmethod
61  return StateMachine._currently_opened_container
62 
63  # execution
64 
65  def spin(self, userdata=None):
66  outcome = None
67  while True:
68  outcome = self.execute(userdata)
69  if outcome is not None:
70  break
71  self.sleep()
72  return outcome
73 
74  def execute(self, userdata):
75  if self._current_state is None:
77  self._current_state = self.initial_state
78  self._userdata = userdata or UserData()
79  self._userdata(add_from=self._own_userdata)
80  outcome = self._execute_current_state()
81  return outcome
82 
83  def sleep(self):
84  if self._current_state is not None:
85  self._current_state.sleep()
86 
88  with UserData(reference=self._userdata, remap=self._remappings[self._current_state.name],
89  input_keys=self._current_state.input_keys, output_keys=self._current_state.output_keys
90  ) as userdata:
91  outcome = self._current_state.execute(userdata)
92  if outcome is not None:
93  try:
94  target = self._transitions[self._current_state.name][outcome]
95  except KeyError as e:
96  outcome = None
97  raise StateError("Returned outcome '%s' is not registered as transition!" % str(e))
98  self._current_state = self._labels.get(target)
99  if self._current_state is None:
100  return target
101 
102  # properties
103 
104  @property
105  def userdata(self):
106  return self._own_userdata
107 
108  @property
109  def current_state(self):
110  if self._current_state is not None:
111  return self._current_state
112  else:
113  raise StateMachineError("No state active!")
114 
115  @property
117  return self.current_state.name
118 
119  @property
120  def initial_state(self):
121  if len(self._states) > 0:
122  return self._states[0]
123  else:
124  raise StateMachineError("No states added yet!")
125 
126  @property
128  return self.initial_state.name
129 
130  @property
131  def sleep_duration(self):
132  if self._current_state is not None:
133  return self._current_state.sleep_duration
134  else:
135  return 0.
136 
137  # consistency checks
138 
139  @property
140  def _valid_targets(self):
141  return list(self._labels.keys()) + self.outcomes
142 
144  for transitions in self._transitions.values():
145  for transition_target in transitions.values():
146  if transition_target not in self._valid_targets:
147  raise StateMachineError("Transition target '%s' missing in %s" %
148  (transition_target, str(self._valid_targets)))
def add(label, state, transitions, remapping=None)
def execute(self, userdata)
Definition: state.py:24


flexbe_core
Author(s): Philipp Schillinger
autogenerated on Sun Dec 13 2020 04:01:39