15 NAME =
'test_lazy_transport' 21 super(self.__class__, self).
__init__(*args)
25 check_connected_topics = rospy.get_param(
'~check_connected_topics')
26 master = rosgraph.Master(
'/test_connection')
27 _, sub, _ = master.getSystemState()
29 master = rosgraph.Master(
'test_connection')
30 _, subscriptions, _ = master.getSystemState()
31 for check_topic
in check_connected_topics:
32 for topic, sub_node
in subscriptions:
33 if topic == rospy.get_namespace() + check_topic:
34 raise ValueError(
'Found topic: {}'.format(check_topic))
37 topic_type = rospy.get_param(
'~input_topic_type')
38 check_connected_topics = rospy.get_param(
'~check_connected_topics')
39 wait_time = rospy.get_param(
'~wait_for_connection', 0)
40 msg_class = roslib.message.get_message_class(topic_type)
42 sub = rospy.Subscriber(
'~input', msg_class,
44 print(
'Waiting for connection for {} sec.'.format(wait_time))
45 rospy.sleep(wait_time)
47 master = rosgraph.Master(
'test_connection')
48 _, subscriptions, _ = master.getSystemState()
49 for check_topic
in check_connected_topics:
50 for topic, sub_node
in subscriptions:
51 if topic == rospy.get_namespace() + check_topic:
54 raise ValueError(
'Topic Not Found: {}' 55 .format(rospy.get_namespace() + check_topic))
62 if __name__ ==
"__main__":
64 rostest.rosrun(PKG, NAME, TestLazyTransport)
def test_subscriber_appears(self)
def test_no_subscribers(self)
def _cb_test_subscriber_appears(self, msg)