test_sub_state.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import rospy
3 from flexbe_core import EventState
4 from flexbe_core.proxy import ProxySubscriberCached
5 
6 from std_msgs.msg import String
7 
8 class TestSubState(EventState):
9 
10  def __init__(self, topic):
11  '''Constructor'''
12  super(TestSubState, self).__init__(outcomes=['received', 'unavailable'],
13  output_keys=['output_value'])
14  self._topic = topic
15  self._sub = ProxySubscriberCached({self._topic: String})
16  self._msg_counter = 0
17  self._timeout = rospy.Time.now() + rospy.Duration(1.5)
18 
19 
20  def execute(self, userdata):
21  if self._msg_counter == 0 and rospy.Time.now() > self._timeout:
22  userdata.output_value = None
23  return 'unavailable'
24 
25  if self._sub.has_msg(self._topic):
26  msg = self._sub.get_last_msg(self._topic)
27  self._sub.remove_last_msg(self._topic)
28  userdata.output_value = msg.data
29  self._msg_counter += 1
30 
31  if self._msg_counter == 3:
32  return 'received'
33 


flexbe_testing
Author(s): Philipp Schillinger
autogenerated on Sun Dec 13 2020 04:01:44