test_quick_call_client.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 
00003 """ Testing the service pair client """
00004 
00005 # enable some python3 compatibility options:
00006 from __future__ import absolute_import, print_function, unicode_literals
00007 
00008 import unittest
00009 import rospy
00010 import rocon_service_pair_msgs.msg as rocon_service_pair_msgs
00011 import rocon_python_comms
00012 import rostest
00013 import unique_id
00014 import threading
00015 
00016 
00017 def generate_request_message():
00018     request = rocon_service_pair_msgs.TestiesRequest()
00019     request.data = "hello dude"
00020     return request
00021 
00022 class TestServicePairClient(unittest.TestCase):
00023     '''
00024       Make timeouts of 3.0 here (c.f. the 2.0s sleep in the server)
00025       so that the first gets processed in time, but holds up the second. Note that the
00026       publisher callbacks that queue up get processed serially, not in parallel so that
00027       the second callback doesn't get done in time before timing out.
00028       
00029       If you make them both 2.0, then neither gets done in time.
00030     '''
00031 
00032     def __init__(self, *args):
00033         super(TestServicePairClient, self).__init__(*args)
00034         self.testies = rocon_python_comms.ServicePairClient('testies', rocon_service_pair_msgs.TestiesPair)
00035         self.threaded_testies = rocon_python_comms.ServicePairClient('threaded_testies', rocon_service_pair_msgs.TestiesPair)
00036         self.response_message = "I heard ya dude"
00037         self.non_threaded_event = threading.Event()
00038         self.threaded_event = threading.Event()
00039         self.non_threaded_response = None
00040         self.threaded_response = None
00041         rospy.sleep(0.5)  # rospy hack to give publishers time to setup
00042         
00043     def test_quick_call(self):
00044         thread = threading.Thread(target=self.send_separate_non_threaded_test)
00045         thread.start()
00046         response = self.testies(generate_request_message(), timeout=rospy.Duration(3.0))
00047         rospy.loginfo("Response [expecting 'None']: %s" % (response.data if response is not None else None))
00048         self.assertIsNone(response, "Response from the server is an invalid 'None'")
00049         if response is not None:
00050             self.assertEquals(response.data, self.response_message, "Should have received '%s' but got '%s'" % (self.response_message, response.data))
00051         self.non_threaded_event.wait()
00052         self.assertIsNotNone(self.non_threaded_response, "Response from the server is an invalid 'None'")
00053         if self.non_threaded_response is not None:
00054             self.assertEquals(self.non_threaded_response.data, self.response_message, "Should have received '%s' but got '%s'" % (self.response_message, self.non_threaded_response.data))
00055  
00056     def send_separate_non_threaded_test(self):
00057         '''
00058           Asserts in here don't really work. Yeah, they fire and throw exceptions if they fail their
00059           assertion, but they don't register back in the rostest framework. Ideally need to set a condition back in
00060           the test function that lets it read the result.
00061         '''
00062         self.non_threaded_response = self.testies(generate_request_message(), timeout=rospy.Duration(3.0))
00063         rospy.loginfo("Response [expecting result]: %s" % (self.non_threaded_response.data if self.non_threaded_response is not None else None))
00064         self.non_threaded_event.set()
00065 
00066     def test_threaded_quick_call(self):
00067         thread = threading.Thread(target=self.send_separate_threaded_test)
00068         thread.start()
00069         response = self.threaded_testies(generate_request_message(), timeout=rospy.Duration(3.0))
00070         rospy.loginfo("Response [expecting result]: %s" % (response.data if response is not None else None))
00071         self.assertIsNotNone(response, "Response from the server is an invalid 'None'")
00072         if response is not None:
00073             self.assertEquals(response.data, self.response_message, "Should have received '%s' but got '%s'" % (self.response_message, response.data))
00074         self.threaded_event.wait()
00075         self.assertIsNotNone(self.threaded_response, "Response from the server is an invalid 'None'")
00076         if self.threaded_response is not None:
00077             self.assertEquals(self.threaded_response.data, self.response_message, "Should have received '%s' but got '%s'" % (self.response_message, self.threaded_response.data))
00078   
00079     def send_separate_threaded_test(self):
00080         '''
00081           Asserts in here don't really work. Yeah, they fire and throw exceptions if they fail their
00082           assertion, but they don't register back in the rostest framework. Ideally need to set a condition back in
00083           the test function that lets it read the result.
00084         '''
00085         self.threaded_response = self.threaded_testies(generate_request_message(), timeout=rospy.Duration(3.0))
00086         rospy.loginfo("Response [expecting result]: %s" % (self.threaded_response.data if self.threaded_response is not None else None))
00087         self.threaded_event.set()
00088        
00089     def error_callback(self, error_message):
00090         """ User callback to pick up error messages. """
00091 
00092 if __name__ == '__main__':
00093     rospy.init_node("test_service_proxy")
00094     rostest.rosrun('rocon_python_comms',
00095                    'test_service_pair_client',
00096                    TestServicePairClient) 


rocon_python_comms
Author(s): Daniel Stonier
autogenerated on Fri May 2 2014 10:35:42