Go to the documentation of this file.00001
00002
00003 """ Testing the service pair timeout functionality """
00004
00005
00006 from __future__ import absolute_import, print_function, unicode_literals
00007
00008 import unittest
00009 import rospy
00010 import rocon_service_pair_msgs.msg as rocon_service_pair_msgs
00011 import rocon_python_comms
00012 import rostest
00013 import unique_id
00014 import threading
00015
00016
00017 def generate_request_message():
00018 request = rocon_service_pair_msgs.TestiesRequest()
00019 request.data = "hello dude"
00020 return request
00021
00022 class TestServicePairTimeouts(unittest.TestCase):
00023
00024 def __init__(self, *args):
00025 super(TestServicePairTimeouts, self).__init__(*args)
00026 self.testies = rocon_python_comms.ServicePairClient('testies_broken', rocon_service_pair_msgs.TestiesPair)
00027 self.response_message = "I heard ya dude"
00028 rospy.sleep(0.5)
00029 self._error_msg_id = None
00030 self._error_response = None
00031 self._error_event = None
00032
00033 def test_non_blocking_call_with_timeout(self):
00034 self._error_event = threading.Event()
00035 msg_id = self.testies(generate_request_message(), timeout=rospy.Duration(1.0), callback=self.callback, error_callback=self.error_callback)
00036 result = self._error_event.wait(2.0)
00037 self.assertTrue(result, "Did not receive an error message")
00038
00039 def test_wait_for_service_pair_server(self):
00040 wait_failed = False
00041 with self.assertRaises(rospy.ROSException):
00042 self.testies.wait_for_service(rospy.Duration(1.0))
00043
00044 def callback(self, msg_id, msg):
00045 """ User callback to feed into non-blocking requests.
00046
00047 @param msg_id : id of the request-response pair.
00048 @type uuid_msgs.UniqueID
00049
00050 @param msg : message response received
00051 @type <name>Response
00052 """
00053 pass
00054
00055 def error_callback(self, msg_id, error_message):
00056 """ User callback to feed into non-blocking requests.
00057
00058 @param msg_id : id of the request-response pair.
00059 @type uuid_msgs.UniqueID
00060
00061 @param error_message : error string received
00062 @type str
00063 """
00064
00065 self._error_event.set()
00066
00067 if __name__ == '__main__':
00068 rospy.init_node("test_service_pair_timeouts")
00069 rostest.rosrun('rocon_python_comms',
00070 'test_service_pair_timeouts',
00071 TestServicePairTimeouts)