5 from smach.state_machine
import StateMachine
15 A state machine that can be operated. 16 It synchronizes its current state with the mirror and supports some control mechanisms. 19 def __init__(self, conditions=dict(), *args, **kwargs):
20 super(ConcurrencyContainer, self).
__init__(*args, **kwargs)
30 return OperatableStateMachine.execute(self, *args, **kwargs)
35 return OperatableStateMachine._build_msg(self, *args, **kwargs)
40 if self.preempt_requested()
or PreemptableState.preempt:
47 if PriorityContainer.active_container
is not None \
48 and not PriorityContainer.active_container.startswith(state._get_path()) \
49 and not state._get_path().startswith(PriorityContainer.active_container):
50 if isinstance(state, EventState):
51 state._notify_skipped()
52 elif state._get_deep_state()
is not None:
62 if all(self._returned_outcomes.has_key(sn)
and self.
_returned_outcomes[sn] == o
for sn,o
in cond):
70 if outcome
in self.get_registered_outcomes():
72 self.call_termination_cbs([s.name
for s
in self.
_ordered_states],outcome)
77 self._parent._inner_sync_request =
True 82 raise smach.InvalidTransitionError(
"Outcome '%s' of state '%s' with transition target '%s' is neither a registered state nor a registered container outcome." %
83 (outcome, self.
name, outcome))
90 state.get_registered_input_keys(),
91 state.get_registered_output_keys(),
92 self._remappings[state.name])
94 state._entering =
True 97 result = state.execute(ud)
99 except smach.InvalidUserCodeError
as ex:
100 smach.logerr(
"State '%s' failed to execute." % state.name)
103 raise smach.InvalidUserCodeError(
"Could not execute state '%s' of type '%s': " %
105 + traceback.format_exc())
111 if isinstance(state, EventState):
113 if isinstance(state, OperatableStateMachine):
114 state._notify_start()
118 if isinstance(state, EventState):
119 state._enable_ros_control()
120 if isinstance(state, OperatableStateMachine):
121 state._enable_ros_control()
125 if isinstance(state, EventState):
127 if isinstance(state, OperatableStateMachine):
129 if state._is_controlled:
130 state._disable_ros_control()
134 if isinstance(state, EventState):
135 state._disable_ros_control()
136 if isinstance(state, OperatableStateMachine):
137 state._disable_ros_control()
def _notify_skipped(self)
def _disable_ros_control(self)
def _concurrency_execute(self, args, kwargs)
def _build_msg(self, args, kwargs)
def _execute_state(self, state, force_exit=False)
def __init__(self, conditions=dict(), args, kwargs)
def on_exit(self, userdata, states=None)
def _enable_ros_control(self)