36 __author__ =
'Kentaro Wada <www.kentaro.wada@gmail.com>' 49 PKG =
'test_nodelet_topic_tools' 56 super(TestConnection, self).
__init__(*args)
61 check_connected_topics = rospy.get_param(
'~check_connected_topics')
63 _, subscriptions, _ = self.master.getSystemState()
64 for check_topic
in check_connected_topics:
65 for topic, sub_node
in subscriptions:
66 if topic == rospy.get_namespace() + check_topic:
67 raise ValueError(
'Found topic: {}'.format(check_topic))
70 topic_type = rospy.get_param(
'~input_topic_type')
71 check_connected_topics = rospy.get_param(
'~check_connected_topics')
72 wait_time = rospy.get_param(
'~wait_for_connection', 0)
73 msg_class = roslib.message.get_message_class(topic_type)
75 sub = rospy.Subscriber(
'~input', msg_class,
77 print(
'Waiting for connection for {} sec.'.format(wait_time))
78 rospy.sleep(wait_time)
80 _, subscriptions, _ = self.master.getSystemState()
81 for check_topic
in check_connected_topics:
82 for topic, sub_node
in subscriptions:
83 if topic == rospy.get_namespace() + check_topic:
86 raise ValueError(
'Not found topic: {}'.format(check_topic))
92 _, subscriptions, _ = self.master.getSystemState()
93 for check_topic
in check_connected_topics:
94 for topic, sub_node
in subscriptions:
95 if topic == rospy.get_namespace() + check_topic:
96 raise ValueError(
'Found topic: {}'.format(check_topic))
102 if __name__ ==
"__main__":
104 rostest.rosrun(PKG, NAME, TestConnection)
def _cb_test_subscriber_appears(self, msg)
def test_subscriber_appears(self)
def test_no_subscribers(self)