Go to the documentation of this file.00001
00002
00003 import sys
00004 import unittest
00005
00006 import rosgraph
00007 import rospy
00008 import rosunit
00009
00010 from std_msgs.msg import Bool
00011 from std_msgs.msg import Float32
00012
00013 from std_msgs.msg import Time
00014
00015
00016 from jsk_recognition_msgs.msg import HistogramWithRange
00017 from jsk_recognition_msgs.msg import PlotData
00018
00019 NAME = "test_rqt_plugins"
00020 WAIT_TIMEOUT = 10.0
00021
00022
00023 class TestRqtPlugins(unittest.TestCase):
00024
00025 def __init__(self, *args):
00026 unittest.TestCase.__init__(self, *args)
00027 rospy.init_node(NAME)
00028
00029 def is_subscribed(self, topic_name, topic_type, sub_node_name):
00030 try:
00031 rospy.wait_for_message(topic_name, topic_type, WAIT_TIMEOUT)
00032 except rospy.ROSException:
00033 self.assertTrue(
00034 False, 'Could not subscribe topic ({}) in {} seconds'.
00035 format(topic_name, WAIT_TIMEOUT))
00036 subs = []
00037 i = 0
00038 while subs == [] and i < 10:
00039 pubs, subs, _ = rosgraph.Master('/rostopic').getSystemState()
00040 subs = [x[1] for x in subs if x[0] == topic_name]
00041 rospy.sleep(1.0)
00042 i = i + 1
00043 if subs == []:
00044 self.assertTrue(
00045 False, 'No node subscribes topic ({})'.format(topic_name))
00046 if sub_node_name not in subs[0]:
00047 self.assertTrue(
00048 False, 'Node ({}) does not subscribe topic ({}), but {} does'.
00049 format(sub_node_name, topic_name, subs))
00050 return True
00051
00052 def test_rqt_2d_plot(self):
00053 self.assertTrue(
00054 self.is_subscribed(
00055 '/pub_sample_2d_data/output', PlotData, '/rqt_2d_plot'))
00056
00057 def test_rqt_3d_plot(self):
00058 self.assertTrue(
00059 self.is_subscribed(
00060 '/pub_sample_3d_data/output1', Float32, '/rqt_3d_plot'))
00061
00062 def test_rqt_drc_mini_maxwell(self):
00063 self.assertTrue(
00064 self.is_subscribed('/drc_2015_environment/is_disabled', Bool,
00065 '/rqt_drc_mini_maxwell'))
00066 self.assertTrue(
00067 self.is_subscribed('/drc_2015_environment/is_blackout', Bool,
00068 '/rqt_drc_mini_maxwell'))
00069 self.assertTrue(
00070 self.is_subscribed(
00071 '/drc_2015_environment/next_whiteout_time', Time,
00072 '/rqt_drc_mini_maxwell'))
00073
00074 def test_rqt_histogram_plot(self):
00075 self.assertTrue(
00076 self.is_subscribed('/range_array', HistogramWithRange,
00077 '/rqt_histogram_plot'))
00078
00079 def test_rqt_image_view2(self):
00080
00081
00082
00083
00084
00085 pass
00086
00087 def test_rqt_service_buttons(self):
00088
00089 pass
00090
00091 def test_rqt_status_light(self):
00092
00093
00094
00095 pass
00096
00097 def test_rqt_string_label(self):
00098
00099
00100
00101
00102 pass
00103
00104 def test_rqt_yn_btn(self):
00105 try:
00106 rospy.wait_for_service('rqt_yn_btn', WAIT_TIMEOUT)
00107 self.assertTrue(True)
00108 except rospy.ROSException:
00109 self.assertTrue(False, 'Could not find service (rqt_yn_btn)')
00110
00111
00112 if __name__ == '__main__':
00113 rosunit.unitrun(NAME, sys.argv[0], TestRqtPlugins)