test_sub_state.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 import rospy
00003 from flexbe_core import EventState
00004 from flexbe_core.proxy import ProxySubscriberCached
00005 
00006 from std_msgs.msg import String
00007 
00008 class TestSubState(EventState):
00009 
00010         def __init__(self, topic):
00011                 '''Constructor'''
00012                 super(TestSubState, self).__init__(outcomes=['received', 'unavailable'],
00013                                                                                 output_keys=['output_value'])
00014                 self._topic = topic
00015                 self._sub = ProxySubscriberCached({self._topic: String})
00016                 self._msg_counter = 0
00017                 self._timeout = rospy.Time.now() + rospy.Duration(1.5)
00018 
00019                 
00020         def execute(self, userdata):
00021                 if self._msg_counter == 0 and rospy.Time.now() > self._timeout:
00022                         userdata.output_value = None
00023                         return 'unavailable'
00024 
00025                 if self._sub.has_msg(self._topic):
00026                         msg = self._sub.get_last_msg(self._topic)
00027                         self._sub.remove_last_msg(self._topic)
00028                         userdata.output_value = msg.data
00029                         self._msg_counter += 1
00030 
00031                 if self._msg_counter == 3:
00032                         return 'received'
00033 


flexbe_testing
Author(s): Philipp Schillinger
autogenerated on Thu Jun 6 2019 19:32:31