Go to the documentation of this file.00001
00002
00003 import os
00004 import sys
00005
00006 import unittest
00007
00008 import rosgraph
00009 import rospy
00010 import rosmsg
00011 import roslib
00012
00013
00014 PKG = 'jsk_topic_tools'
00015 NAME = 'test_connection'
00016
00017
00018 class TestConnection(unittest.TestCase):
00019
00020 def __init__(self, *args):
00021 super(TestConnection, self).__init__(*args)
00022 rospy.init_node(NAME)
00023
00024 def test_no_subscribers(self):
00025 check_connected_topics = rospy.get_param('~check_connected_topics')
00026 master = rosgraph.Master('/test_connection')
00027 _, sub, _ = master.getSystemState()
00028
00029 master = rosgraph.Master('test_connection')
00030 _, subscriptions, _ = master.getSystemState()
00031 for check_topic in check_connected_topics:
00032 for topic, sub_node in subscriptions:
00033 if topic == rospy.get_namespace() + check_topic:
00034 raise ValueError('Found topic: {}'.format(check_topic))
00035
00036 def test_subscriber_appears_disappears(self):
00037 topic_type = rospy.get_param('~input_topic_type')
00038 check_connected_topics = rospy.get_param('~check_connected_topics')
00039 wait_time = rospy.get_param('~wait_for_connection', 0)
00040 msg_class = roslib.message.get_message_class(topic_type)
00041
00042 sub = rospy.Subscriber('~input', msg_class,
00043 self._cb_test_subscriber_appears_disappears)
00044 print('Waiting for connection for {} sec.'.format(wait_time))
00045 rospy.sleep(wait_time)
00046
00047 master = rosgraph.Master('test_connection')
00048 _, subscriptions, _ = master.getSystemState()
00049 for check_topic in check_connected_topics:
00050 for topic, sub_node in subscriptions:
00051 if topic == rospy.get_namespace() + check_topic:
00052 break
00053 else:
00054 raise ValueError('Not found topic: {}'.format(check_topic))
00055 sub.unregister()
00056
00057 ROS_DISTRO = os.environ.get('ROS_DISTRO', 'indigo')
00058 if ord(ROS_DISTRO[0]) < ord('i'):
00059 sys.stderr.write('WARNING: running on rosdistro %s, and skipping '
00060 'test for disconnection.\n' % ROS_DISTRO)
00061 return
00062 rospy.sleep(1)
00063
00064 _, subscriptions, _ = master.getSystemState()
00065 for check_topic in check_connected_topics:
00066 for topic, sub_node in subscriptions:
00067 if topic == rospy.get_namespace() + check_topic:
00068 raise ValueError('Found topic: {}'.format(check_topic))
00069
00070 def _cb_test_subscriber_appears_disappears(self, msg):
00071 pass
00072
00073
00074 if __name__ == "__main__":
00075 import rostest
00076 rostest.rosrun(PKG, NAME, TestConnection)