00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 PKG = 'topic_tools'
00038 import roslib; roslib.load_manifest(PKG)
00039
00040 import unittest
00041 import rospy
00042
00043 from topic_tools.srv import MuxAdd
00044 from topic_tools.srv import MuxDelete
00045 from topic_tools.srv import MuxList
00046 from topic_tools.srv import MuxSelect
00047
00048 class MuxServiceTestCase(unittest.TestCase):
00049 def make_srv_proxies(self):
00050 try:
00051 rospy.wait_for_service('mux/add', 5)
00052 rospy.wait_for_service('mux/delete', 5)
00053 rospy.wait_for_service('mux/list', 5)
00054 rospy.wait_for_service('mux/select', 5)
00055 except rospy.ROSException, e:
00056 self.fail('failed to find a required service: ' + `e`)
00057
00058 add_srv = rospy.ServiceProxy('mux/add', MuxAdd)
00059 delete_srv = rospy.ServiceProxy('mux/delete', MuxDelete)
00060 list_srv = rospy.ServiceProxy('mux/list', MuxList)
00061 select_srv = rospy.ServiceProxy('mux/select', MuxSelect)
00062
00063 return (add_srv, delete_srv, list_srv, select_srv)
00064
00065 def test_add_delete_list(self):
00066 add_srv, delete_srv, list_srv, select_srv = self.make_srv_proxies()
00067
00068 topics = list_srv().topics
00069 self.assertEquals(set(topics), set(['/input']))
00070
00071 add_srv('/new_input')
00072 topics = list_srv().topics
00073 self.assertEquals(set(topics), set(['/input', '/new_input']))
00074
00075
00076 try:
00077 add_srv('/new_input')
00078 except rospy.ServiceException, e:
00079 pass
00080 else:
00081 self.fail('service call should have thrown an exception')
00082 topics = list_srv().topics
00083 self.assertEquals(set(topics), set(['/input', '/new_input']))
00084
00085
00086 select_srv('/input')
00087 try:
00088 delete_srv('/input')
00089 except rospy.ServiceException, e:
00090 pass
00091 else:
00092 self.fail('service call should have thrown an exception')
00093 topics = list_srv().topics
00094 self.assertEquals(set(topics), set(['/input', '/new_input']))
00095
00096 select_srv('__none')
00097
00098 delete_srv('/input')
00099 topics = list_srv().topics
00100 self.assertEquals(set(topics), set(['/new_input']))
00101 delete_srv('/new_input')
00102 topics = list_srv().topics
00103 self.assertEquals(set(topics), set([]))
00104
00105 if __name__ == "__main__":
00106 import rostest
00107 rostest.unitrun(PKG, 'mux_services', MuxServiceTestCase)
00108