42 from topic_tools.srv
import MuxAdd
43 from topic_tools.srv
import MuxDelete
44 from topic_tools.srv
import MuxList
45 from topic_tools.srv
import MuxSelect
50 rospy.wait_for_service(
'mux/add', 5)
51 rospy.wait_for_service(
'mux/delete', 5)
52 rospy.wait_for_service(
'mux/list', 5)
53 rospy.wait_for_service(
'mux/select', 5)
54 except rospy.ROSException
as e:
55 self.fail(
'failed to find a required service: ' + repr(e))
57 add_srv = rospy.ServiceProxy(
'mux/add', MuxAdd)
58 delete_srv = rospy.ServiceProxy(
'mux/delete', MuxDelete)
59 list_srv = rospy.ServiceProxy(
'mux/list', MuxList)
60 select_srv = rospy.ServiceProxy(
'mux/select', MuxSelect)
62 return (add_srv, delete_srv, list_srv, select_srv)
67 topics = list_srv().topics
68 self.assertEquals(set(topics), set([
'/input']))
71 topics = list_srv().topics
72 self.assertEquals(set(topics), set([
'/input',
'/new_input']))
77 except rospy.ServiceException:
80 self.fail(
'service call should have thrown an exception')
81 topics = list_srv().topics
82 self.assertEquals(set(topics), set([
'/input',
'/new_input']))
88 except rospy.ServiceException:
91 self.fail(
'service call should have thrown an exception')
92 topics = list_srv().topics
93 self.assertEquals(set(topics), set([
'/input',
'/new_input']))
98 topics = list_srv().topics
99 self.assertEquals(set(topics), set([
'/new_input']))
100 delete_srv(
'/new_input')
101 topics = list_srv().topics
102 self.assertEquals(set(topics), set([]))
104 if __name__ ==
"__main__":
106 rostest.unitrun(PKG,
'mux_services', MuxServiceTestCase)