operatable_state.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 from flexbe_core.logger import Logger
3 from flexbe_core.state_logger import StateLogger
4 
5 from flexbe_msgs.msg import OutcomeRequest
6 from std_msgs.msg import UInt8, String
7 
8 from flexbe_core.core.preemptable_state import PreemptableState
9 
10 
11 @StateLogger.log_outcomes('flexbe.outcomes')
13  """
14  A state that supports autonomy levels and silent mode.
15  Also, it allows being tracked by a GUI or anything else
16  as it reports each transition and its initial structure.
17  An additional method is provided to report custom status messages to the widget.
18  """
19 
20  def __init__(self, *args, **kwargs):
21  super(OperatableState, self).__init__(*args, **kwargs)
22  self.__execute = self.execute
24 
26 
27  self._outcome_topic = 'flexbe/mirror/outcome'
28  self._request_topic = 'flexbe/outcome_request'
29  self._debug_topic = 'flexbe/debug/current_state'
30 
31  def _operatable_execute(self, *args, **kwargs):
32  outcome = self.__execute(*args, **kwargs)
33 
34  if self._is_controlled:
35  # reset previously requested outcome if applicable
36  if self._last_requested_outcome is not None and outcome is None:
37  self._pub.publish(self._request_topic, OutcomeRequest(outcome=255, target=self.path))
38  self._last_requested_outcome = None
39 
40  # request outcome because autonomy level is too low
41  if not self._force_transition and (not self.parent.is_transition_allowed(self.name, outcome) or
42  outcome is not None and self.is_breakpoint):
43  if outcome != self._last_requested_outcome:
44  self._pub.publish(self._request_topic, OutcomeRequest(outcome=self.outcomes.index(outcome),
45  target=self.path))
46  Logger.localinfo("<-- Want result: %s > %s" % (self.name, outcome))
47  StateLogger.log('flexbe.operator', self, type='request', request=outcome,
48  autonomy=self.parent.autonomy_level,
49  required=self.parent.get_required_autonomy(outcome))
50  self._last_requested_outcome = outcome
51  outcome = None
52 
53  # autonomy level is high enough, report the executed transition
54  elif outcome is not None and outcome in self.outcomes:
55  Logger.localinfo("State result: %s > %s" % (self.name, outcome))
56  self._pub.publish(self._outcome_topic, UInt8(self.outcomes.index(outcome)))
57  self._pub.publish(self._debug_topic, String("%s > %s" % (self.path, outcome)))
58  if self._force_transition:
59  StateLogger.log('flexbe.operator', self, type='forced', forced=outcome,
60  requested=self._last_requested_outcome)
61  self._last_requested_outcome = None
62 
63  self._force_transition = False
64  return outcome
65 
67  super(OperatableState, self)._enable_ros_control()
68  self._pub.createPublisher(self._outcome_topic, UInt8)
69  self._pub.createPublisher(self._debug_topic, String)
70  self._pub.createPublisher(self._request_topic, OutcomeRequest)


flexbe_core
Author(s): Philipp Schillinger
autogenerated on Sun Dec 13 2020 04:01:39