test_connection.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 
00003 import os
00004 import sys
00005 
00006 import unittest
00007 
00008 import rosgraph
00009 import rospy
00010 import rosmsg
00011 import roslib
00012 
00013 
00014 PKG = 'jsk_topic_tools'
00015 NAME = 'test_connection'
00016 
00017 
00018 class TestConnection(unittest.TestCase):
00019 
00020     def __init__(self, *args):
00021         super(TestConnection, self).__init__(*args)
00022         rospy.init_node(NAME)
00023 
00024     def test_no_subscribers(self):
00025         check_connected_topics = rospy.get_param('~check_connected_topics')
00026         master = rosgraph.Master('/test_connection')
00027         _, sub, _ = master.getSystemState()
00028         # Check assumed topics are not there
00029         master = rosgraph.Master('test_connection')
00030         _, subscriptions, _ = master.getSystemState()
00031         for check_topic in check_connected_topics:
00032             for topic, sub_node in subscriptions:
00033                 if topic == rospy.get_namespace() + check_topic:
00034                     raise ValueError('Found topic: {}'.format(check_topic))
00035 
00036     def test_subscriber_appears_disappears(self):
00037         topic_type = rospy.get_param('~input_topic_type')
00038         check_connected_topics = rospy.get_param('~check_connected_topics')
00039         wait_time = rospy.get_param('~wait_for_connection', 0)
00040         msg_class = roslib.message.get_message_class(topic_type)
00041         # Subscribe topic and bond connection
00042         sub = rospy.Subscriber('~input', msg_class,
00043                                self._cb_test_subscriber_appears_disappears)
00044         print('Waiting for connection for {} sec.'.format(wait_time))
00045         rospy.sleep(wait_time)
00046         # Check assumed topics are there
00047         master = rosgraph.Master('test_connection')
00048         _, subscriptions, _ = master.getSystemState()
00049         for check_topic in check_connected_topics:
00050             for topic, sub_node in subscriptions:
00051                 if topic == rospy.get_namespace() + check_topic:
00052                     break
00053             else:
00054                 raise ValueError('Not found topic: {}'.format(check_topic))
00055         sub.unregister()
00056         # FIXME: below test won't pass on hydro
00057         ROS_DISTRO = os.environ.get('ROS_DISTRO', 'indigo')
00058         if ord(ROS_DISTRO[0]) < ord('i'):
00059             sys.stderr.write('WARNING: running on rosdistro %s, and skipping '
00060                              'test for disconnection.\n' % ROS_DISTRO)
00061             return
00062         rospy.sleep(1)  # wait for disconnection
00063         # Check specified topics do not exist
00064         _, subscriptions, _ = master.getSystemState()
00065         for check_topic in check_connected_topics:
00066             for topic, sub_node in subscriptions:
00067                 if topic == rospy.get_namespace() + check_topic:
00068                     raise ValueError('Found topic: {}'.format(check_topic))
00069 
00070     def _cb_test_subscriber_appears_disappears(self, msg):
00071         pass
00072 
00073 
00074 if __name__ == "__main__":
00075     import rostest
00076     rostest.rosrun(PKG, NAME, TestConnection)


jsk_topic_tools
Author(s): Kei Okada , Yusuke Furuta
autogenerated on Fri Sep 8 2017 03:38:56