Go to the documentation of this file.00001
00002
00003 """ Testing the service pair client """
00004
00005
00006 from __future__ import absolute_import, print_function, unicode_literals
00007
00008 import unittest
00009 import rospy
00010 import rocon_service_pair_msgs.msg as rocon_service_pair_msgs
00011 import rocon_python_comms
00012 import rostest
00013 import unique_id
00014 import threading
00015
00016
00017 def generate_request_message():
00018 request = rocon_service_pair_msgs.TestiesRequest()
00019 request.data = "hello dude"
00020 return request
00021
00022 class TestServicePairClient(unittest.TestCase):
00023 '''
00024 Make timeouts of 3.0 here (c.f. the 2.0s sleep in the server)
00025 so that the first gets processed in time, but holds up the second. Note that the
00026 publisher callbacks that queue up get processed serially, not in parallel so that
00027 the second callback doesn't get done in time before timing out.
00028
00029 If you make them both 2.0, then neither gets done in time.
00030 '''
00031
00032 def __init__(self, *args):
00033 super(TestServicePairClient, self).__init__(*args)
00034 self.testies = rocon_python_comms.ServicePairClient('testies', rocon_service_pair_msgs.TestiesPair)
00035 self.threaded_testies = rocon_python_comms.ServicePairClient('threaded_testies', rocon_service_pair_msgs.TestiesPair)
00036 self.response_message = "I heard ya dude"
00037 self.non_threaded_event = threading.Event()
00038 self.threaded_event = threading.Event()
00039 self.non_threaded_response = None
00040 self.threaded_response = None
00041 rospy.sleep(0.5)
00042
00043 def test_quick_call(self):
00044 thread = threading.Thread(target=self.send_separate_non_threaded_test)
00045 thread.start()
00046 response = self.testies(generate_request_message(), timeout=rospy.Duration(3.0))
00047 rospy.loginfo("Response [expecting 'None']: %s" % (response.data if response is not None else None))
00048 self.assertIsNone(response, "Response from the server is an invalid 'None'")
00049 if response is not None:
00050 self.assertEquals(response.data, self.response_message, "Should have received '%s' but got '%s'" % (self.response_message, response.data))
00051 self.non_threaded_event.wait()
00052 self.assertIsNotNone(self.non_threaded_response, "Response from the server is an invalid 'None'")
00053 if self.non_threaded_response is not None:
00054 self.assertEquals(self.non_threaded_response.data, self.response_message, "Should have received '%s' but got '%s'" % (self.response_message, self.non_threaded_response.data))
00055
00056 def send_separate_non_threaded_test(self):
00057 '''
00058 Asserts in here don't really work. Yeah, they fire and throw exceptions if they fail their
00059 assertion, but they don't register back in the rostest framework. Ideally need to set a condition back in
00060 the test function that lets it read the result.
00061 '''
00062 self.non_threaded_response = self.testies(generate_request_message(), timeout=rospy.Duration(3.0))
00063 rospy.loginfo("Response [expecting result]: %s" % (self.non_threaded_response.data if self.non_threaded_response is not None else None))
00064 self.non_threaded_event.set()
00065
00066 def test_threaded_quick_call(self):
00067 thread = threading.Thread(target=self.send_separate_threaded_test)
00068 thread.start()
00069 response = self.threaded_testies(generate_request_message(), timeout=rospy.Duration(3.0))
00070 rospy.loginfo("Response [expecting result]: %s" % (response.data if response is not None else None))
00071 self.assertIsNotNone(response, "Response from the server is an invalid 'None'")
00072 if response is not None:
00073 self.assertEquals(response.data, self.response_message, "Should have received '%s' but got '%s'" % (self.response_message, response.data))
00074 self.threaded_event.wait()
00075 self.assertIsNotNone(self.threaded_response, "Response from the server is an invalid 'None'")
00076 if self.threaded_response is not None:
00077 self.assertEquals(self.threaded_response.data, self.response_message, "Should have received '%s' but got '%s'" % (self.response_message, self.threaded_response.data))
00078
00079 def send_separate_threaded_test(self):
00080 '''
00081 Asserts in here don't really work. Yeah, they fire and throw exceptions if they fail their
00082 assertion, but they don't register back in the rostest framework. Ideally need to set a condition back in
00083 the test function that lets it read the result.
00084 '''
00085 self.threaded_response = self.threaded_testies(generate_request_message(), timeout=rospy.Duration(3.0))
00086 rospy.loginfo("Response [expecting result]: %s" % (self.threaded_response.data if self.threaded_response is not None else None))
00087 self.threaded_event.set()
00088
00089 def error_callback(self, error_message):
00090 """ User callback to pick up error messages. """
00091
00092 if __name__ == '__main__':
00093 rospy.init_node("test_service_proxy")
00094 rostest.rosrun('rocon_python_comms',
00095 'test_service_pair_client',
00096 TestServicePairClient)