manually_transitionable_state.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 from flexbe_core.logger import Logger
3 
4 from flexbe_msgs.msg import CommandFeedback, OutcomeRequest
5 
6 from flexbe_core.core.ros_state import RosState
7 
8 
10  """
11  A state for that a desired outcome can be declared.
12  If any outcome is declared, this outcome is forced.
13  """
14 
15  def __init__(self, *args, **kwargs):
16  super(ManuallyTransitionableState, self).__init__(*args, **kwargs)
17  self.__execute = self.execute
19 
20  self._force_transition = False
21 
22  self._feedback_topic = 'flexbe/command_feedback'
23  self._transition_topic = 'flexbe/command/transition'
24 
25  def _manually_transitionable_execute(self, *args, **kwargs):
26  if self._is_controlled and self._sub.has_buffered(self._transition_topic):
27  command_msg = self._sub.get_from_buffer(self._transition_topic)
28  self._pub.publish(self._feedback_topic, CommandFeedback(command="transition",
29  args=[command_msg.target, self.name]))
30  if command_msg.target != self.name:
31  Logger.logwarn("Requested outcome for state %s but active state is %s" %
32  (command_msg.target, self.name))
33  else:
34  self._force_transition = True
35  outcome = self.outcomes[command_msg.outcome]
36  Logger.localinfo("--> Manually triggered outcome %s of state %s" % (outcome, self.name))
37  return outcome
38  # otherwise, return the normal outcome
39  self._force_transition = False
40  return self.__execute(*args, **kwargs)
41 
43  super(ManuallyTransitionableState, self)._enable_ros_control()
44  self._pub.createPublisher(self._feedback_topic, CommandFeedback)
45  self._sub.subscribe(self._transition_topic, OutcomeRequest)
46  self._sub.enable_buffer(self._transition_topic)
47 
49  super(ManuallyTransitionableState, self)._disable_ros_control()
50  self._sub.unsubscribe_topic(self._transition_topic)


flexbe_core
Author(s): Philipp Schillinger
autogenerated on Sun Dec 13 2020 04:01:39