test_service_pair_client.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 
00003 """ Testing the service pair client """
00004 
00005 # enable some python3 compatibility options:
00006 from __future__ import absolute_import, print_function, unicode_literals
00007 
00008 import unittest
00009 import rospy
00010 import rocon_service_pair_msgs.msg as rocon_service_pair_msgs
00011 import rocon_python_comms
00012 import rostest
00013 import unique_id
00014 import threading
00015 
00016 
00017 def generate_request_message():
00018     request = rocon_service_pair_msgs.TestiesRequest()
00019     request.data = "hello dude"
00020     return request
00021 
00022 class TestServicePairClient(unittest.TestCase):
00023 
00024     def __init__(self, *args):
00025         super(TestServicePairClient, self).__init__(*args)
00026         self.testies = rocon_python_comms.ServicePairClient('testies', rocon_service_pair_msgs.TestiesPair)
00027         self.response_message = "I heard ya dude"
00028         rospy.sleep(0.5)  # rospy hack to give publishers time to setup
00029         self._non_blocking_msg_id = None
00030         self._non_blocking_msg_response = None
00031         self._non_blocking_with_timeout_msg_id = None
00032         self._non_blocking_with_timeout_msg_response = None
00033         self._non_blocking_event = None
00034         self._non_blocking_with_timeout_event = None
00035         
00036     def test_blocking_call(self):
00037         response = self.testies(generate_request_message())
00038         self.assertIsNotNone(response, "Response from the server is an invalid 'None'")
00039         self.assertEquals(response.data, self.response_message, "Should have received '%s' but got '%s'" % (self.response_message, response.data))
00040 
00041     def test_blocking_call_with_timeout(self):
00042         response = self.testies(generate_request_message(), timeout=rospy.Duration(3.0))
00043         self.assertIsNotNone(response, "Response from the server is an invalid 'None'")
00044         self.assertEquals(response.data, self.response_message, "Should have received '%s' but got '%s'" % (self.response_message, response.data))
00045 
00046     def test_non_blocking_call(self):
00047         self._non_blocking_event = threading.Event()
00048         msg_id = self.testies(generate_request_message(), callback=self.callback)
00049         self._non_blocking_event.wait()
00050         self.assertEquals(unique_id.toHexString(msg_id), unique_id.toHexString(self._non_blocking_msg_id))
00051         self.assertEquals(self._non_blocking_msg_response.data, self.response_message, "Should have received '%s' but got '%s'" % (self.response_message, self._non_blocking_msg_response.data))
00052 
00053     def test_non_blocking_call_with_timeout(self):
00054         self._non_blocking_with_timeout_event = threading.Event()
00055         msg_id = self.testies(generate_request_message(), timeout=rospy.Duration(3.0), callback=self.callback_with_timeout)
00056         self._non_blocking_with_timeout_event.wait()
00057         self.assertEquals(unique_id.toHexString(msg_id), unique_id.toHexString(self._non_blocking_with_timeout_msg_id))
00058         self.assertEquals(self._non_blocking_with_timeout_msg_response.data, self.response_message, "Should have received '%s' but got '%s'" % (self.response_message, self._non_blocking_with_timeout_msg_response.data))
00059 
00060     # def test_non_blocking_call_with_triggered_timeout(self):
00061 
00062     def callback(self, msg_id, msg):
00063         """ User callback to feed into non-blocking requests.
00064 
00065           @param msg_id : id of the request-response pair.
00066           @type uuid_msgs.UniqueID
00067           
00068           @param msg : message response received
00069           @type <name>Response
00070         """
00071         self._non_blocking_msg_id = msg_id
00072         self._non_blocking_msg_response = msg
00073         self._non_blocking_event.set()
00074 
00075     def callback_with_timeout(self, msg_id, msg):
00076         self._non_blocking_with_timeout_msg_id = msg_id
00077         self._non_blocking_with_timeout_msg_response = msg
00078         self._non_blocking_with_timeout_event.set()
00079 
00080     def error_callback(self, error_message):
00081         """ User callback to pick up error messages. """
00082 
00083 if __name__ == '__main__':
00084     rospy.init_node("test_service_proxy")
00085     rostest.rosrun('rocon_python_comms',
00086                    'test_service_pair_client',
00087                    TestServicePairClient) 


rocon_python_comms
Author(s): Daniel Stonier
autogenerated on Fri May 2 2014 10:35:42