mirror_state.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import rospy
3 from rospy.exceptions import ROSInterruptException
4 from flexbe_core import EventState
5 
6 from flexbe_core.proxy import ProxyPublisher, ProxySubscriberCached
7 from std_msgs.msg import String, UInt8
8 
9 
10 class MirrorState(EventState):
11  '''
12  This state will display its possible outcomes as buttons in the GUI
13  and is designed in a way to be created dynamically.
14  '''
15 
16  def __init__(self, target_name, target_path, given_outcomes, outcome_autonomy):
17  super(MirrorState, self).__init__(outcomes=given_outcomes)
18  self.set_rate(100)
19  self._target_name = target_name
20  self._target_path = target_path
21 
22  self._outcome_topic = 'flexbe/mirror/outcome'
23 
24  self._pub = ProxyPublisher()
25  self._sub = ProxySubscriberCached({self._outcome_topic: UInt8})
26 
27  def execute(self, userdata):
28  if self._sub.has_buffered(self._outcome_topic):
29  msg = self._sub.get_from_buffer(self._outcome_topic)
30  if msg.data < len(self.outcomes):
31  rospy.loginfo("State update: %s > %s", self._target_name, self.outcomes[msg.data])
32  return self.outcomes[msg.data]
33  try:
34  self.sleep()
35  except ROSInterruptException:
36  print('Interrupted mirror sleep.')
37 
38  def on_enter(self, userdata):
39  self._pub.publish('flexbe/behavior_update', String("/" + "/".join(self._target_path.split("/")[1:])))
def __init__(self, target_name, target_path, given_outcomes, outcome_autonomy)
Definition: mirror_state.py:16


flexbe_mirror
Author(s): Philipp Schillinger
autogenerated on Sun Dec 13 2020 04:01:42