operatable_state.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 from flexbe_core.logger import Logger
3 from flexbe_core.state_logger import StateLogger
4 
5 from flexbe_msgs.msg import OutcomeRequest
6 from std_msgs.msg import UInt8, String
7 
8 from flexbe_core.core.preemptable_state import PreemptableState
9 
10 
11 @StateLogger.log_outcomes('flexbe.outcomes')
13  """
14  A state that supports autonomy levels and silent mode.
15  Also, it allows being tracked by a GUI or anything else
16  as it reports each transition and its initial structure.
17  An additional method is provided to report custom status messages to the widget.
18  """
19 
20  def __init__(self, *args, **kwargs):
21  super(OperatableState, self).__init__(*args, **kwargs)
22  self.__execute = self.execute
24 
26 
27  self._outcome_topic = 'flexbe/mirror/outcome'
28  self._request_topic = 'flexbe/outcome_request'
29  self._debug_topic = 'flexbe/debug/current_state'
30 
31  def _operatable_execute(self, *args, **kwargs):
32  outcome = self.__execute(*args, **kwargs)
33 
34  if self._is_controlled:
35  # reset previously requested outcome if applicable
36  if self._last_requested_outcome is not None and outcome is None:
37  self._pub.publish(self._request_topic, OutcomeRequest(outcome=255, target=self.path))
38  self._last_requested_outcome = None
39 
40  # request outcome because autonomy level is too low
41  if not self._force_transition and (not self.parent.is_transition_allowed(self.name, outcome) or
42  outcome is not None and self.is_breakpoint):
43  if outcome != self._last_requested_outcome:
44  self._pub.publish(self._request_topic, OutcomeRequest(outcome=self.outcomes.index(outcome),
45  target=self.path))
46  Logger.localinfo("<-- Want result: %s > %s" % (self.name, outcome))
47  StateLogger.log('flexbe.operator', self, type='request', request=outcome,
48  autonomy=self.parent.autonomy_level,
49  required=self.parent.get_required_autonomy(outcome))
50  self._last_requested_outcome = outcome
51  outcome = None
52 
53  # autonomy level is high enough, report the executed transition
54  elif outcome is not None and outcome in self.outcomes:
55  Logger.localinfo("State result: %s > %s" % (self.name, outcome))
56  self._pub.publish(self._outcome_topic, UInt8(self.outcomes.index(outcome)))
57  self._pub.publish(self._debug_topic, String("%s > %s" % (self.path, outcome)))
58  if self._force_transition:
59  StateLogger.log('flexbe.operator', self, type='forced', forced=outcome,
60  requested=self._last_requested_outcome)
61  self._last_requested_outcome = None
62 
63  self._force_transition = False
64  return outcome
65 
67  super(OperatableState, self)._enable_ros_control()
68  self._pub.createPublisher(self._outcome_topic, UInt8)
69  self._pub.createPublisher(self._debug_topic, String)
70  self._pub.createPublisher(self._request_topic, OutcomeRequest)
flexbe_core.core.operatable_state.OperatableState._outcome_topic
_outcome_topic
Definition: operatable_state.py:27
flexbe_core.core.lockable_state.LockableState.execute
execute
Definition: lockable_state.py:21
flexbe_core.state_logger
Definition: state_logger.py:1
flexbe_core.core.operatable_state.OperatableState._operatable_execute
def _operatable_execute(self, *args, **kwargs)
Definition: operatable_state.py:31
flexbe_core.core.operatable_state.OperatableState.__init__
def __init__(self, *args, **kwargs)
Definition: operatable_state.py:20
flexbe_core.logger
Definition: logger.py:1
flexbe_core.core.ros_state.RosState._is_controlled
_is_controlled
Definition: ros_state.py:16
flexbe_core.core.operatable_state.OperatableState._debug_topic
_debug_topic
Definition: operatable_state.py:29
flexbe_core.core.operatable_state.OperatableState._enable_ros_control
def _enable_ros_control(self)
Definition: operatable_state.py:66
flexbe_core.core.ros_state.RosState.is_breakpoint
def is_breakpoint(self)
Definition: ros_state.py:48
flexbe_core.core.preemptable_state.PreemptableState
Definition: preemptable_state.py:10
flexbe_core.core.state.State.outcomes
def outcomes(self)
Definition: state.py:35
flexbe_core.core.state.State.path
def path(self)
Definition: state.py:69
flexbe_core.core.state.State.parent
def parent(self)
Definition: state.py:59
flexbe_core.core.operatable_state.OperatableState._last_requested_outcome
_last_requested_outcome
Definition: operatable_state.py:25
flexbe_core.core.manually_transitionable_state.ManuallyTransitionableState._force_transition
_force_transition
Definition: manually_transitionable_state.py:20
flexbe_core.core.lockable_state.LockableState.__execute
__execute
Definition: lockable_state.py:20
flexbe_core.core.operatable_state.OperatableState._request_topic
_request_topic
Definition: operatable_state.py:28
flexbe_core.core.preemptable_state
Definition: preemptable_state.py:1
flexbe_core.core.operatable_state.OperatableState
Definition: operatable_state.py:12
flexbe_core.core.ros_state.RosState._pub
_pub
Definition: ros_state.py:18
flexbe_core.core.state.State.name
def name(self)
Definition: state.py:49


flexbe_core
Author(s): Philipp Schillinger
autogenerated on Fri Jul 21 2023 02:26:05