test_service_pair_timeouts.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 
00003 """ Testing the service pair timeout functionality """
00004 
00005 # enable some python3 compatibility options:
00006 from __future__ import absolute_import, print_function, unicode_literals
00007 
00008 import unittest
00009 import rospy
00010 import rocon_service_pair_msgs.msg as rocon_service_pair_msgs
00011 import rocon_python_comms
00012 import rostest
00013 import unique_id
00014 import threading
00015 
00016 
00017 def generate_request_message():
00018     request = rocon_service_pair_msgs.TestiesRequest()
00019     request.data = "hello dude"
00020     return request
00021 
00022 class TestServicePairTimeouts(unittest.TestCase):
00023 
00024     def __init__(self, *args):
00025         super(TestServicePairTimeouts, self).__init__(*args)
00026         self.testies = rocon_python_comms.ServicePairClient('testies_broken', rocon_service_pair_msgs.TestiesPair)
00027         self.response_message = "I heard ya dude"
00028         rospy.sleep(0.5)  # rospy hack to give publishers time to setup
00029         self._error_msg_id = None
00030         self._error_response = None
00031         self._error_event = None
00032 
00033     def test_non_blocking_call_with_timeout(self):
00034         self._error_event = threading.Event()
00035         msg_id = self.testies(generate_request_message(), timeout=rospy.Duration(1.0), callback=self.callback, error_callback=self.error_callback)
00036         result = self._error_event.wait(2.0)
00037         self.assertTrue(result, "Did not receive an error message")
00038 
00039     def test_wait_for_service_pair_server(self):
00040         wait_failed = False
00041         with self.assertRaises(rospy.ROSException):
00042             self.testies.wait_for_service(rospy.Duration(1.0))
00043 
00044     def callback(self, msg_id, msg):
00045         """ User callback to feed into non-blocking requests.
00046 
00047           @param msg_id : id of the request-response pair.
00048           @type uuid_msgs.UniqueID
00049           
00050           @param msg : message response received
00051           @type <name>Response
00052         """
00053         pass
00054 
00055     def error_callback(self, msg_id, error_message):
00056         """ User callback to feed into non-blocking requests.
00057 
00058           @param msg_id : id of the request-response pair.
00059           @type uuid_msgs.UniqueID
00060           
00061           @param error_message : error string received
00062           @type str
00063         """
00064         #rospy.loginfo("Error Callback: %s" % error_message)
00065         self._error_event.set()
00066 
00067 if __name__ == '__main__':
00068     rospy.init_node("test_service_pair_timeouts")
00069     rostest.rosrun('rocon_python_comms',
00070                    'test_service_pair_timeouts',
00071                    TestServicePairTimeouts) 


rocon_python_comms
Author(s): Daniel Stonier
autogenerated on Fri May 2 2014 10:35:42