00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 PKG = 'test_roslaunch'
00037 NAME = 'test_roslaunch_launch'
00038
00039 import roslib; roslib.load_manifest(PKG)
00040
00041 import os
00042 import sys
00043 import unittest
00044
00045
00046 class TestRoslaunchLaunch(unittest.TestCase):
00047
00048 def setUp(self):
00049 self.printerrlog_msg = None
00050
00051 def my_printerrlog(self, msg):
00052 self.printerrlog_msg = msg
00053
00054 def test_validate_master_launch(self):
00055 import roslaunch.launch
00056 from roslaunch.core import Master
00057 from roslaunch.launch import validate_master_launch
00058 roslaunch.launch.printerrlog = self.my_printerrlog
00059
00060
00061 os.environ['ROS_MASTER_URI'] = 'http://localhost:11311'
00062 m = Master(uri='http://localhost:11311')
00063 validate_master_launch(m, True)
00064 self.assertEquals(None, self.printerrlog_msg)
00065 validate_master_launch(m, False)
00066 self.assertEquals(None, self.printerrlog_msg)
00067
00068
00069 os.environ['ROS_MASTER_URI'] = 'http://localhost:11312'
00070 validate_master_launch(m, True)
00071 self.assert_('port' in self.printerrlog_msg)
00072 self.printerrlog_msg = None
00073
00074
00075 os.environ['ROS_MASTER_URI'] = 'http://fake:11311'
00076 validate_master_launch(m, True)
00077 self.assert_('host' in self.printerrlog_msg)
00078 self.printerrlog_msg = None
00079
00080
00081 os.environ['ROS_MASTER_URI'] = 'http://fake:11311'
00082 self.assertEquals(None, self.printerrlog_msg)
00083
00084
00085 os.environ['ROS_MASTER_URI'] = 'http://localhost:11311'
00086 m = Master(uri="http://fake:11311")
00087
00088
00089
00090
00091 validate_master_launch(m, True)
00092 self.assert_("WARNING" in self.printerrlog_msg)
00093 self.printerrlog_msg = None
00094
00095
00096 m = Master(uri="http://willowgarage.com:11311")
00097 validate_master_launch(m, True)
00098 self.assert_("WARNING" in self.printerrlog_msg)
00099 self.printerrlog_msg = None
00100
00101
00102 try:
00103 validate_master_launch(m, False)
00104 self.fail("should not pass if remote master cannot be contacted")
00105 except roslaunch.RLException:
00106 pass
00107
00108 def test__unify_clear_params(self):
00109 from roslaunch.launch import _unify_clear_params
00110 self.assertEquals([], _unify_clear_params([]))
00111 for t in [['/foo'], ['/foo/'], ['/foo/', '/foo'],
00112 ['/foo/', '/foo/'], ['/foo/', '/foo/bar', '/foo/'],
00113 ['/foo/', '/foo/bar', '/foo/bar/baz']]:
00114 self.assertEquals(['/foo/'], _unify_clear_params(t))
00115 for t in [['/'], ['/', '/foo/'], ['/foo/', '/', '/baz', '/car/dog']]:
00116 self.assertEquals(['/'], _unify_clear_params(t))
00117
00118 self.assertEquals(['/foo/', '/bar/', '/baz/'], _unify_clear_params(['/foo', '/bar', '/baz']))
00119 self.assertEquals(['/foo/', '/bar/', '/baz/'], _unify_clear_params(['/foo', '/bar', '/baz', '/bar/delta', '/baz/foo']))
00120 self.assertEquals(['/foo/bar/'], _unify_clear_params(['/foo/bar', '/foo/bar/baz']))
00121
00122
00123 def test__hostname_to_rosname(self):
00124 from roslaunch.launch import _hostname_to_rosname
00125 self.assertEquals("host_ann", _hostname_to_rosname('ann'))
00126 self.assertEquals("host_ann", _hostname_to_rosname('ANN'))
00127 self.assertEquals("host_", _hostname_to_rosname(''))
00128 self.assertEquals("host_1", _hostname_to_rosname('1'))
00129 self.assertEquals("host__", _hostname_to_rosname('_'))
00130 self.assertEquals("host__", _hostname_to_rosname('-'))
00131 self.assertEquals("host_foo_laptop", _hostname_to_rosname('foo-laptop'))
00132
00133 def test_ROSLaunchListeners(self):
00134 import roslaunch.launch
00135 class L(roslaunch.launch.ROSLaunchListener):
00136 def process_died(self, process_name, exit_code):
00137 self.process_name = process_name
00138 self.exit_code = exit_code
00139 class LBad(roslaunch.launch.ROSLaunchListener):
00140 def process_died(self, process_name, exit_code):
00141 raise Exception("foo")
00142
00143 listeners = roslaunch.launch._ROSLaunchListeners()
00144 l1 = L()
00145 l2 = L()
00146 lbad = L()
00147 l3 = L()
00148
00149 listeners.process_died('p0', 0)
00150
00151 listeners.add_process_listener(l1)
00152 listeners.process_died('p1', 1)
00153 self.assertEquals(l1.process_name, 'p1')
00154 self.assertEquals(l1.exit_code, 1)
00155
00156
00157 listeners.add_process_listener(l2)
00158 listeners.process_died('p2', 2)
00159 for l in [l1, l2]:
00160 self.assertEquals(l.process_name, 'p2')
00161 self.assertEquals(l.exit_code, 2)
00162
00163 listeners.add_process_listener(lbad)
00164
00165 listeners.process_died('p3', 3)
00166 for l in [l1, l2]:
00167 self.assertEquals(l.process_name, 'p3')
00168 self.assertEquals(l.exit_code, 3)
00169
00170 listeners.add_process_listener(l3)
00171 listeners.process_died('p4', 4)
00172 for l in [l1, l2, l3]:
00173 self.assertEquals(l.process_name, 'p4')
00174 self.assertEquals(l.exit_code, 4)
00175
00176 if __name__ == '__main__':
00177 import rostest
00178 rostest.unitrun('test_roslaunch', NAME, TestRoslaunchLaunch, coverage_packages=['roslaunch.launch'])
00179