Go to the documentation of this file.00001
00002
00003 """ Testing the service pair client """
00004
00005
00006 from __future__ import absolute_import, print_function, unicode_literals
00007
00008 import unittest
00009 import rospy
00010 import rocon_service_pair_msgs.msg as rocon_service_pair_msgs
00011 import rocon_python_comms
00012 import rostest
00013 import unique_id
00014 import threading
00015
00016
00017 def generate_request_message():
00018 request = rocon_service_pair_msgs.TestiesRequest()
00019 request.data = "hello dude"
00020 return request
00021
00022 class TestServicePairClient(unittest.TestCase):
00023
00024 def __init__(self, *args):
00025 super(TestServicePairClient, self).__init__(*args)
00026 self.testies = rocon_python_comms.ServicePairClient('testies', rocon_service_pair_msgs.TestiesPair)
00027 self.response_message = "I heard ya dude"
00028 rospy.sleep(0.5)
00029 self._non_blocking_msg_id = None
00030 self._non_blocking_msg_response = None
00031 self._non_blocking_with_timeout_msg_id = None
00032 self._non_blocking_with_timeout_msg_response = None
00033 self._non_blocking_event = None
00034 self._non_blocking_with_timeout_event = None
00035
00036 def test_blocking_call(self):
00037 response = self.testies(generate_request_message())
00038 self.assertIsNotNone(response, "Response from the server is an invalid 'None'")
00039 self.assertEquals(response.data, self.response_message, "Should have received '%s' but got '%s'" % (self.response_message, response.data))
00040
00041 def test_blocking_call_with_timeout(self):
00042 response = self.testies(generate_request_message(), timeout=rospy.Duration(3.0))
00043 self.assertIsNotNone(response, "Response from the server is an invalid 'None'")
00044 self.assertEquals(response.data, self.response_message, "Should have received '%s' but got '%s'" % (self.response_message, response.data))
00045
00046 def test_non_blocking_call(self):
00047 self._non_blocking_event = threading.Event()
00048 msg_id = self.testies(generate_request_message(), callback=self.callback)
00049 self._non_blocking_event.wait()
00050 self.assertEquals(unique_id.toHexString(msg_id), unique_id.toHexString(self._non_blocking_msg_id))
00051 self.assertEquals(self._non_blocking_msg_response.data, self.response_message, "Should have received '%s' but got '%s'" % (self.response_message, self._non_blocking_msg_response.data))
00052
00053 def test_non_blocking_call_with_timeout(self):
00054 self._non_blocking_with_timeout_event = threading.Event()
00055 msg_id = self.testies(generate_request_message(), timeout=rospy.Duration(3.0), callback=self.callback_with_timeout)
00056 self._non_blocking_with_timeout_event.wait()
00057 self.assertEquals(unique_id.toHexString(msg_id), unique_id.toHexString(self._non_blocking_with_timeout_msg_id))
00058 self.assertEquals(self._non_blocking_with_timeout_msg_response.data, self.response_message, "Should have received '%s' but got '%s'" % (self.response_message, self._non_blocking_with_timeout_msg_response.data))
00059
00060
00061
00062 def callback(self, msg_id, msg):
00063 """ User callback to feed into non-blocking requests.
00064
00065 @param msg_id : id of the request-response pair.
00066 @type uuid_msgs.UniqueID
00067
00068 @param msg : message response received
00069 @type <name>Response
00070 """
00071 self._non_blocking_msg_id = msg_id
00072 self._non_blocking_msg_response = msg
00073 self._non_blocking_event.set()
00074
00075 def callback_with_timeout(self, msg_id, msg):
00076 self._non_blocking_with_timeout_msg_id = msg_id
00077 self._non_blocking_with_timeout_msg_response = msg
00078 self._non_blocking_with_timeout_event.set()
00079
00080 def error_callback(self, error_message):
00081 """ User callback to pick up error messages. """
00082
00083 if __name__ == '__main__':
00084 rospy.init_node("test_service_proxy")
00085 rostest.rosrun('rocon_python_comms',
00086 'test_service_pair_client',
00087 TestServicePairClient)