concurrency_container.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 from flexbe_core.logger import Logger
3 from flexbe_core.core.user_data import UserData
4 from flexbe_core.core.event_state import EventState
5 from flexbe_core.core.priority_container import PriorityContainer
6 
7 from flexbe_core.core.operatable_state_machine import OperatableStateMachine
8 
9 
11  """
12  A state machine that can be operated.
13  It synchronizes its current state with the mirror and supports some control mechanisms.
14  """
15 
16  def __init__(self, conditions=dict(), *args, **kwargs):
17  super(ConcurrencyContainer, self).__init__(*args, **kwargs)
18  self._conditions = conditions
19  self._returned_outcomes = dict()
20 
21  def sleep(self):
22  self.wait(seconds=self.sleep_duration)
23 
24  @property
25  def sleep_duration(self):
26  sleep_dur = None
27  for state in self._states:
28  if state.sleep_duration > 0:
29  sleep_dur = state.sleep_duration if sleep_dur is None else min(sleep_dur, state.sleep_duration)
30  return sleep_dur or 0.
31 
33  # execute all states that are done with sleeping and determine next sleep duration
34  for state in self._states:
35  if state.name in list(self._returned_outcomes.keys()) and self._returned_outcomes[state.name] is not None:
36  continue # already done with executing
37  if (PriorityContainer.active_container is not None
38  and not all(a == s for a, s in zip(PriorityContainer.active_container.split('/'),
39  state.path.split('/')))):
40  if isinstance(state, EventState):
41  state._notify_skipped()
42  elif state.get_deep_state() is not None:
43  state.get_deep_state()._notify_skipped()
44  continue # other state has priority
45  if state.sleep_duration <= 0: # ready to execute
46  self._returned_outcomes[state.name] = self._execute_single_state(state)
47  # check again in case the sleep has already been handled by the child
48  if state.sleep_duration <= 0:
49  # this sleep returns immediately since sleep duration is negative,
50  # but is required here to reset the sleep time after executing
51  state.sleep()
52 
53  # Determine outcome
54  outcome = None
55  if any(self._returned_outcomes[state.name] == state._preempted_name
56  for state in self._states if state.name in self._returned_outcomes):
57  return self._preempted_name # handle preemption if required
58  # check conditions
59  for item in self._conditions:
60  (oc, cond) = item
61  if all(sn in self._returned_outcomes and self._returned_outcomes[sn] == o for sn, o in cond):
62  outcome = oc
63  break
64 
65  if outcome is None:
66  return None
67 
68  # trigger on_exit for those states that are not done yet
69  self.on_exit(self.userdata,
70  states=[s for s in self._states if (s.name not in list(self._returned_outcomes.keys()) or
71  self._returned_outcomes[s.name] is None)])
72  self._returned_outcomes = dict()
73  # right now, going out of a cc may break sync
74  # thus, as a quick fix, explicitly sync again
75  self._parent._inner_sync_request = True
76  self._current_state = None
77  return outcome
78 
79  def _execute_single_state(self, state, force_exit=False):
80  result = None
81  try:
82  with UserData(reference=self._userdata, remap=self._remappings[state.name],
83  input_keys=state.input_keys, output_keys=state.output_keys) as userdata:
84  if force_exit:
85  state._entering = True
86  state.on_exit(userdata)
87  else:
88  result = state.execute(userdata)
89  except Exception as e:
90  result = None
91  self._last_exception = e
92  Logger.logerr('Failed to execute state %s:\n%s' % (self.current_state_label, str(e)))
93  return result
94 
96  state = self._states[0]
97  if isinstance(state, EventState):
98  state._enable_ros_control()
99  if isinstance(state, OperatableStateMachine):
100  state._enable_ros_control()
101 
103  state = self._states[0]
104  if isinstance(state, EventState):
105  state._disable_ros_control()
106  if isinstance(state, OperatableStateMachine):
107  state._disable_ros_control()
108 
109  def on_exit(self, userdata, states=None):
110  for state in self._states if states is None else states:
111  if state in self._returned_outcomes:
112  continue # skip states that already exited themselves
113  self._execute_single_state(state, force_exit=True)
114  self._current_state = None
flexbe_core.core.concurrency_container.ConcurrencyContainer
Definition: concurrency_container.py:10
flexbe_core.core.user_data.UserData
Definition: user_data.py:6
flexbe_core.core.user_data
Definition: user_data.py:1
flexbe_core.core.concurrency_container.ConcurrencyContainer._enable_ros_control
def _enable_ros_control(self)
Definition: concurrency_container.py:95
flexbe_core.core.event_state
Definition: event_state.py:1
flexbe_core.core.concurrency_container.ConcurrencyContainer._conditions
_conditions
Definition: concurrency_container.py:18
flexbe_core.core.concurrency_container.ConcurrencyContainer._execute_current_state
def _execute_current_state(self)
Definition: concurrency_container.py:32
flexbe_core.core.concurrency_container.ConcurrencyContainer.__init__
def __init__(self, conditions=dict(), *args, **kwargs)
Definition: concurrency_container.py:16
flexbe_core.core.preemptable_state_machine.PreemptableStateMachine._preempted_name
string _preempted_name
Definition: preemptable_state_machine.py:16
flexbe_core.core.concurrency_container.ConcurrencyContainer._disable_ros_control
def _disable_ros_control(self)
Definition: concurrency_container.py:102
flexbe_core.core.state_machine.StateMachine._states
_states
Definition: state_machine.py:13
flexbe_core.core.state_machine.StateMachine.userdata
def userdata(self)
Definition: state_machine.py:105
flexbe_core.core.concurrency_container.ConcurrencyContainer.sleep_duration
def sleep_duration(self)
Definition: concurrency_container.py:25
flexbe_core.core.state_machine.StateMachine._remappings
_remappings
Definition: state_machine.py:16
flexbe_core.logger
Definition: logger.py:1
flexbe_core.core.operatable_state_machine
Definition: operatable_state_machine.py:1
flexbe_core.core.lockable_state_machine.LockableStateMachine._userdata
_userdata
Definition: lockable_state_machine.py:53
flexbe_core.core.concurrency_container.ConcurrencyContainer._current_state
_current_state
Definition: concurrency_container.py:76
flexbe_core.core.operatable_state_machine.OperatableStateMachine
Definition: operatable_state_machine.py:14
flexbe_core.core.ros_state_machine.RosStateMachine.wait
def wait(self, seconds=None, condition=None)
Definition: ros_state_machine.py:20
flexbe_core.core.concurrency_container.ConcurrencyContainer.sleep
def sleep(self)
Definition: concurrency_container.py:21
flexbe_core.core.concurrency_container.ConcurrencyContainer._last_exception
_last_exception
Definition: concurrency_container.py:91
flexbe_core.core.concurrency_container.ConcurrencyContainer._returned_outcomes
_returned_outcomes
Definition: concurrency_container.py:19
flexbe_core.core.concurrency_container.ConcurrencyContainer.on_exit
def on_exit(self, userdata, states=None)
Definition: concurrency_container.py:109
flexbe_core.core.state_machine.StateMachine.current_state_label
def current_state_label(self)
Definition: state_machine.py:116
flexbe_core.core.priority_container
Definition: priority_container.py:1
flexbe_core.core.state.State._parent
_parent
Definition: state.py:22
flexbe_core.core.concurrency_container.ConcurrencyContainer._execute_single_state
def _execute_single_state(self, state, force_exit=False)
Definition: concurrency_container.py:79


flexbe_core
Author(s): Philipp Schillinger
autogenerated on Fri Jul 21 2023 02:26:05