14 PKG =
'jsk_topic_tools' 15 NAME =
'test_connection' 21 super(TestConnection, self).
__init__(*args)
25 check_connected_topics = rospy.get_param(
'~check_connected_topics')
26 master = rosgraph.Master(
'/test_connection')
27 _, sub, _ = master.getSystemState()
29 master = rosgraph.Master(
'test_connection')
30 _, subscriptions, _ = master.getSystemState()
31 for check_topic
in check_connected_topics:
32 for topic, sub_node
in subscriptions:
33 if topic == rospy.get_namespace() + check_topic:
34 raise ValueError(
'Found topic: {}'.format(check_topic))
37 topic_type = rospy.get_param(
'~input_topic_type')
38 check_connected_topics = rospy.get_param(
'~check_connected_topics')
39 wait_time = rospy.get_param(
'~wait_for_connection', 0)
40 msg_class = roslib.message.get_message_class(topic_type)
42 sub = rospy.Subscriber(
'~input', msg_class,
44 print(
'Waiting for connection for {} sec.'.format(wait_time))
45 rospy.sleep(wait_time)
47 master = rosgraph.Master(
'test_connection')
48 _, subscriptions, _ = master.getSystemState()
49 for check_topic
in check_connected_topics:
50 for topic, sub_node
in subscriptions:
51 if topic == rospy.get_namespace() + check_topic:
54 raise ValueError(
'Not found topic: {}'.format(check_topic))
57 ROS_DISTRO = os.environ.get(
'ROS_DISTRO',
'indigo')
58 if ord(ROS_DISTRO[0]) < ord(
'i'):
59 sys.stderr.write(
'WARNING: running on rosdistro %s, and skipping ' 60 'test for disconnection.\n' % ROS_DISTRO)
64 _, subscriptions, _ = master.getSystemState()
65 for check_topic
in check_connected_topics:
66 for topic, sub_node
in subscriptions:
67 if topic == rospy.get_namespace() + check_topic:
68 raise ValueError(
'Found topic: {}'.format(check_topic))
74 if __name__ ==
"__main__":
76 rostest.rosrun(PKG, NAME, TestConnection)
def test_no_subscribers(self)
def test_subscriber_appears_disappears(self)
def _cb_test_subscriber_appears_disappears(self, msg)