00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 import roslib; roslib.load_manifest('test_roslaunch')
00037
00038 import os, sys, unittest
00039
00040 import rostest
00041 import roslaunch.loader
00042 import roslaunch.xmlloader
00043
00044
00045 class RosLaunchMock(object):
00046 def __init__(self):
00047 self.nodes_core = []
00048 self.nodes = []
00049 self.tests = []
00050 self.params = []
00051 self.executables = []
00052 self.clear_params = []
00053 self.machines = []
00054 self.master = None
00055 self.config_errors = []
00056 def set_master(self, m):
00057 self.master = m
00058 def add_machine(self, m, verbose=True):
00059 self.machines.append(m)
00060 def add_node(self, n, core=False, verbose=True):
00061 if not core:
00062 self.nodes.append(n)
00063 else:
00064 self.nodes_core.append(n)
00065
00066 def add_config_error(self, msg):
00067 self.config_errors.append(msg)
00068
00069 def add_test(self, t, verbose=True):
00070 self.tests.append(t)
00071 def add_executable(self, t):
00072 self.executables.append(t)
00073
00074 def add_param(self, p, filename=None, verbose=True):
00075 matches = [x for x in self.params if x.key == p.key]
00076 for m in matches:
00077 self.params.remove(m)
00078 self.params.append(p)
00079 def add_clear_param(self, param):
00080 self.clear_params.append(param)
00081
00082
00083
00084 class TestXmlLoader(unittest.TestCase):
00085
00086 def setUp(self):
00087 from roslib.packages import get_pkg_dir
00088 self.xml_dir = os.path.join(get_pkg_dir('test_roslaunch'), 'test', 'xml')
00089
00090 def _load(self, test_file):
00091 loader = roslaunch.xmlloader.XmlLoader()
00092 mock = RosLaunchMock()
00093 self.assert_(os.path.exists(test_file), "cannot locate test file %s"%test_file)
00094 loader.load(test_file, mock)
00095 return mock
00096
00097 def _load_valid_nodes(self, tests):
00098 mock = self._load(os.path.join(self.xml_dir, 'test-node-valid.xml'))
00099 nodes = [n for n in mock.nodes if n.type in tests]
00100 self.assertEquals(len(tests), len(nodes))
00101 return nodes
00102
00103 def _load_valid_rostests(self, tests):
00104 mock = self._load(os.path.join(self.xml_dir, 'test-test-valid.xml'))
00105 nodes = [n for n in mock.tests if n.type in tests]
00106 self.assertEquals(len(tests), len(nodes))
00107 return nodes
00108
00109 def _load_valid_machines(self, tests):
00110 mock = self._load(os.path.join(self.xml_dir, 'test-machine-valid.xml'))
00111 machines = [m for m in mock.machines if m.name in tests]
00112 self.assertEquals(len(tests), len(machines))
00113 return machines
00114
00115 def test_load_string(self):
00116
00117 loader = roslaunch.xmlloader.XmlLoader()
00118 mock = RosLaunchMock()
00119
00120
00121
00122 try:
00123 loader.load_string('<foo />', mock)
00124 self.fail("no root lauch element passed")
00125 except Exception, e:
00126 self.assertEquals(str(e), "Invalid roslaunch XML syntax: no root <launch> tag")
00127
00128 f = open(os.path.join(self.xml_dir, 'test-node-valid.xml'), 'r')
00129 try:
00130 s = f.read()
00131 finally:
00132 f.close()
00133 loader.load_string(s, mock)
00134
00135 self.assert_(mock.nodes)
00136 self.assert_([n for n in mock.nodes if n.type == 'test_base'])
00137
00138
00139 f = open(os.path.join(self.xml_dir, 'invalid-xml.xml'), 'r')
00140 try:
00141 s = f.read()
00142 finally:
00143 f.close()
00144 try:
00145 loader.load_string(s, mock)
00146 self.fail('load_string should have thrown an exception')
00147 except roslaunch.xmlloader.XmlParseException:
00148 pass
00149
00150 def test_load(self):
00151
00152 loader = roslaunch.xmlloader.XmlLoader()
00153
00154
00155 loader.load(os.path.join(self.xml_dir, 'test-valid.xml'), RosLaunchMock())
00156
00157
00158 mock = RosLaunchMock()
00159
00160 loader.load(os.path.join(self.xml_dir, 'test-node-valid.xml'), mock)
00161 self.assert_(mock.nodes)
00162 self.assert_([n for n in mock.nodes if n.type == 'test_base'])
00163
00164
00165 try:
00166 loader.load(os.path.join(self.xml_dir, 'invalid-xml.xml'), mock)
00167 self.fail('load_string should have thrown an exception')
00168 except roslaunch.xmlloader.XmlParseException:
00169 pass
00170
00171 def test_params_invalid(self):
00172 tests = ['test-params-invalid-%s.xml'%i for i in range(1, 6)]
00173 loader = roslaunch.xmlloader.XmlLoader()
00174 for filename in tests:
00175 filename = os.path.join(self.xml_dir, filename)
00176 try:
00177 self.assert_(os.path.exists(filename))
00178 loader.load(filename, RosLaunchMock())
00179 self.fail("xmlloader did not throw an xmlparseexception for [%s]"%filename)
00180 except roslaunch.xmlloader.XmlParseException, e:
00181 pass
00182 except roslaunch.loader.LoadException, e:
00183 pass
00184
00185 def test_params(self):
00186 mock = self._load(os.path.join(self.xml_dir, 'test-params-valid.xml'))
00187
00188 param_d = {}
00189 for p in mock.params:
00190 param_d[p.key] = p.value
00191
00192 self.assertEquals('pass', param_d['/override'])
00193 self.assertEquals('bar2', param_d['/somestring1'])
00194 self.assertEquals('10', param_d['/somestring2'])
00195 self.assertEquals(1, param_d['/someinteger1'])
00196 self.assertEquals(2, param_d['/someinteger2'])
00197 self.assertAlmostEquals(3.14159, param_d['/somefloat1'], 2)
00198 self.assertAlmostEquals(5.0, param_d['/somefloat2'], 1)
00199 self.assertEquals("a child namespace parameter 1", param_d['/wg/wgchildparam'], p.value)
00200 self.assertEquals("a child namespace parameter 2", param_d['/wg2/wg2childparam1'], p.value)
00201 self.assertEquals("a child namespace parameter 3", param_d['/wg2/wg2childparam2'], p.value)
00202
00203 import xmlrpclib
00204 from roslib.packages import get_pkg_dir
00205 f = open(os.path.join(get_pkg_dir('roslaunch'), 'example.launch'))
00206 try:
00207 contents = f.read()
00208 finally:
00209 f.close()
00210 p = [p for p in mock.params if p.key == '/configfile'][0]
00211 self.assertEquals(contents, p.value, 1)
00212 p = [p for p in mock.params if p.key == '/binaryfile'][0]
00213 self.assertEquals(xmlrpclib.Binary(contents), p.value, 1)
00214
00215 f = open(os.path.join(get_pkg_dir('roslaunch'), 'example.launch'))
00216 try:
00217 contents = f.read()
00218 finally:
00219 f.close()
00220 p = [p for p in mock.params if p.key == '/commandoutput'][0]
00221 self.assertEquals(contents, p.value, 1)
00222
00223
00224 def test_rosparam_valid(self):
00225 mock = self._load(os.path.join(self.xml_dir, 'test-rosparam-valid.xml'))
00226
00227 for prefix in ['', '/rosparam', '/node_rosparam']:
00228 p = [p for p in mock.params if p.key == prefix+'/string1'][0]
00229 self.assertEquals('bar', p.value)
00230 p = [p for p in mock.params if p.key == prefix+'/robots/childparam'][0]
00231 self.assertEquals('a child namespace parameter', p.value)
00232
00233 p = [p for p in mock.params if p.key == '/node_rosparam/string1'][0]
00234 self.assertEquals('bar', p.value)
00235 p = [p for p in mock.params if p.key == '/node_rosparam/robots/childparam'][0]
00236 self.assertEquals('a child namespace parameter', p.value)
00237
00238 exes = [e for e in mock.executables if e.command == 'rosparam']
00239 self.assertEquals(len(exes), 2, "expected 2 rosparam exes, got %s"%len(exes))
00240 from roslaunch.core import PHASE_SETUP
00241 for e in exes:
00242 self.assertEquals(PHASE_SETUP, e.phase)
00243 args = e.args
00244 self.assertEquals(3, len(args), "expected 3 args, got %s"%str(args))
00245 rp_cmd, rp_file, rp_ctx = args
00246 self.failIf('$(find' in rp_file, "file attribute was not evaluated")
00247 self.assertEquals('dump', rp_cmd)
00248
00249
00250 if rp_file.endswith('dump.yaml'):
00251 self.assertEquals('/', rp_ctx)
00252 elif rp_file.endswith('dump2.yaml'):
00253 self.assertEquals('/rosparam/', rp_ctx)
00254
00255
00256 p = [p for p in mock.params if p.key == '/inline_str'][0]
00257 self.assertEquals('value1', p.value)
00258 p = [p for p in mock.params if p.key == '/inline_list'][0]
00259 self.assertEquals([1, 2, 3, 4], p.value)
00260 p = [p for p in mock.params if p.key == '/inline_dict/key1'][0]
00261 self.assertEquals('value1', p.value)
00262 p = [p for p in mock.params if p.key == '/inline_dict/key2'][0]
00263 self.assertEquals('value2', p.value)
00264 p = [p for p in mock.params if p.key == '/inline_dict2/key3'][0]
00265 self.assertEquals('value3', p.value)
00266 p = [p for p in mock.params if p.key == '/inline_dict2/key4'][0]
00267 self.assertEquals('value4', p.value)
00268
00269
00270
00271 self.assertEquals(1, len([p for p in mock.params if p.key == '/override/key1']))
00272 p = [p for p in mock.params if p.key == '/override/key1'][0]
00273 self.assertEquals('override1', p.value)
00274
00275 p = [p for p in mock.params if p.key == '/override/key2'][0]
00276 self.assertEquals('value2', p.value)
00277
00278
00279 p = [p for p in mock.params if p.key == '/noparam1'][0]
00280 self.assertEquals('value1', p.value)
00281 p = [p for p in mock.params if p.key == '/noparam2'][0]
00282 self.assertEquals('value2', p.value)
00283
00284
00285 import math
00286 p = [p for p in mock.params if p.key == '/inline_degrees0'][0]
00287 self.assertAlmostEquals(0, p.value)
00288 p = [p for p in mock.params if p.key == '/inline_degrees180'][0]
00289 self.assertAlmostEquals(p.value, math.pi)
00290 p = [p for p in mock.params if p.key == '/inline_degrees360'][0]
00291 self.assertAlmostEquals(p.value, 2 * math.pi)
00292
00293 p = [p for p in mock.params if p.key == '/dict_degrees/deg0'][0]
00294 self.assertAlmostEquals(0, p.value)
00295 p = [p for p in mock.params if p.key == '/dict_degrees/deg180'][0]
00296 self.assertAlmostEquals(p.value, math.pi)
00297 p = [p for p in mock.params if p.key == '/dict_degrees/deg360'][0]
00298 self.assertAlmostEquals(p.value, 2 * math.pi)
00299
00300 p = [p for p in mock.params if p.key == '/inline_rad0'][0]
00301 self.assertAlmostEquals(0, p.value)
00302 p = [p for p in mock.params if p.key == '/inline_radpi'][0]
00303 self.assertAlmostEquals(p.value, math.pi)
00304 p = [p for p in mock.params if p.key == '/inline_rad2pi'][0]
00305 self.assertAlmostEquals(p.value, 2 * math.pi)
00306
00307 p = [p for p in mock.params if p.key == '/dict_rad/rad0'][0]
00308 self.assertAlmostEquals(0, p.value)
00309 p = [p for p in mock.params if p.key == '/dict_rad/radpi'][0]
00310 self.assertAlmostEquals(p.value, math.pi)
00311 p = [p for p in mock.params if p.key == '/dict_rad/rad2pi'][0]
00312 self.assertAlmostEquals(p.value, 2 * math.pi)
00313
00314
00315 mock = self._load(os.path.join(self.xml_dir, 'test-rosparam-empty.xml'))
00316 self.assertEquals([], mock.params)
00317
00318 def test_rosparam_invalid(self):
00319 tests = ['test-rosparam-invalid-%s.xml'%i for i in range(1, 6)]
00320 loader = roslaunch.xmlloader.XmlLoader()
00321 for filename in tests:
00322 filename = os.path.join(self.xml_dir, filename)
00323 try:
00324 self.assert_(os.path.exists(filename))
00325 loader.load(filename, RosLaunchMock())
00326 self.fail("xmlloader did not throw an xmlloadexception for [%s]"%filename)
00327 except roslaunch.loader.LoadException, e:
00328 pass
00329
00330 def test_node_valid(self):
00331 nodes = self._load_valid_nodes([])
00332
00333 def test_rostest_valid(self):
00334 nodes = self._load_valid_rostests([])
00335
00336 def test_node_rosparam_invalid(self):
00337 tests = ['test-node-rosparam-invalid-name.xml']
00338 loader = roslaunch.xmlloader.XmlLoader()
00339 for filename in tests:
00340 filename = os.path.join(self.xml_dir, filename)
00341 try:
00342 self.assert_(os.path.exists(filename))
00343 loader.load(filename, RosLaunchMock())
00344 self.fail("xmlloader did not throw an xmlparseexception for [%s]"%filename)
00345 except roslaunch.xmlloader.XmlParseException, e:
00346 pass
00347
00348 def test_node_rosparam(self):
00349 from roslaunch.core import PHASE_SETUP
00350
00351 tests = [("test-node-rosparam-load.xml", "test_node_rosparam_load"),
00352 ("test-node-rosparam-dump.xml","test_node_rosparam_dump"),
00353 ("test-node-rosparam-delete.xml","test_node_rosparam_delete"),
00354 ("test-node-rosparam-load-multi.xml", "test_node_rosparam_multi"),
00355 ("test-node-rosparam-load-param.xml", "test_node_rosparam_load_param"),
00356 ("test-node-rosparam-load-ns.xml", "test_node_rosparam_load_ns")]
00357 for f, test in tests:
00358
00359 mock = self._load(os.path.join(self.xml_dir, f))
00360 nodes = [n for n in mock.nodes if n.type == test]
00361 self.assertEquals(1, len(nodes))
00362 n = nodes[0]
00363
00364 exes = [e for e in mock.executables if e.command == 'rosparam']
00365
00366 if n.type == "test_node_rosparam_load":
00367 self.assertEquals(0, len(exes))
00368 p = [p for p in mock.params if p.key == '/rosparam_load/string1'][0]
00369 self.assertEquals('bar', p.value)
00370 p = [p for p in mock.params if p.key == '/rosparam_load/robots/childparam'][0]
00371 self.assertEquals('a child namespace parameter', p.value)
00372 elif n.type == "test_node_rosparam_delete":
00373 self.assertEquals(1, len(exes))
00374 self.assert_(len(exes[0].args) == 2, "invalid arg: %s"%(str(exes[0].args)))
00375 rp_cmd, rp_param = exes[0].args
00376 self.assertEquals("delete", rp_cmd)
00377 self.assertEquals("/ns1/rosparam_delete/ns2/param", rp_param)
00378 elif n.type == "test_node_rosparam_dump":
00379 self.assertEquals(1, len(exes))
00380 rp_cmd, rp_file, rp_ctx = exes[0].args
00381 self.assertEquals("dump", rp_cmd)
00382 self.assertEquals("dump.yaml", rp_file)
00383 self.assertEquals('/rosparam_dump/', rp_ctx)
00384 elif n.type == "test_node_rosparam_load_ns":
00385 self.assertEquals(0, len(exes))
00386 p = [p for p in mock.params if p.key == '/load_ns/subns/string1'][0]
00387 self.assertEquals('bar', p.value)
00388 p = [p for p in mock.params if p.key == '/load_ns/subns/robots/childparam'][0]
00389 self.assertEquals('a child namespace parameter', p.value)
00390 elif n.type == "test_node_rosparam_load_param":
00391 self.assertEquals(0, len(exes))
00392 p = [p for p in mock.params if p.key == '/load_param/param/string1'][0]
00393 self.assertEquals('bar', p.value)
00394 p = [p for p in mock.params if p.key == '/load_param/param/robots/childparam'][0]
00395 self.assertEquals('a child namespace parameter', p.value)
00396 elif n.type == "test_node_rosparam_multi":
00397 self.assertEquals(1, len(exes))
00398 e = exes[0]
00399 rp_cmd, rp_file, rp_ctx = e.args
00400 self.assertEquals("dump", rp_cmd)
00401 self.assertEquals("mdump.yaml", rp_file)
00402 self.assertEquals('/rosparam_multi/', rp_ctx)
00403
00404
00405 p = [p for p in mock.params if p.key == '/rosparam_multi/string1'][0]
00406 self.assertEquals('bar', p.value)
00407 p = [p for p in mock.params if p.key == '/rosparam_multi/robots/childparam'][0]
00408 self.assertEquals('a child namespace parameter', p.value)
00409
00410 p = [p for p in mock.params if p.key == '/rosparam_multi/msubns/string1'][0]
00411 self.assertEquals('bar', p.value)
00412 p = [p for p in mock.params if p.key == '/rosparam_multi/msubns/robots/childparam'][0]
00413 self.assertEquals('a child namespace parameter', p.value)
00414
00415
00416 def test_local_param_group(self):
00417 mock = self._load(os.path.join(self.xml_dir, 'test-local-param-group.xml'))
00418 correct = [
00419 u'/group1/g1node1/gparam1',
00420 u'/group1/g1node2/gparam1',
00421 u'/group1/g1node2/gparam2',
00422 u'/node1/param1',
00423 ]
00424 p_names = [p.key for p in mock.params]
00425 self.assertEquals(set([]), set(correct) ^ set(p_names), "%s does not match %s"%(p_names, correct))
00426
00427 def test_node_param(self):
00428 mock = self._load(os.path.join(self.xml_dir, 'test-node-valid.xml'))
00429 tests = [('/test_private_param1/foo1', 'bar1'),
00430 ('/ns_test/test_private_param2/foo2', 'bar2'),
00431 ('/test_private_param3/foo3', 'bar3'), ]
00432 for k, v in tests:
00433 p = [p for p in mock.params if p.key == k]
00434 self.assertEquals(1, len(p), "%s not present in parameters: %s"%(k, mock.params))
00435 self.assertEquals(v, p[0].value)
00436 node_types = [n.type for n in mock.nodes]
00437
00438 def test_launch_prefix(self):
00439 nodes = self._load_valid_nodes(['test_launch_prefix'])
00440 self.assertEquals(1, len(nodes))
00441 self.assertEquals('xterm -e gdb --args', nodes[0].launch_prefix)
00442 nodes = self._load_valid_nodes(['test_base'])
00443 self.assertEquals(1, len(nodes))
00444 self.assertEquals(None, nodes[0].launch_prefix)
00445
00446 def test_respawn(self):
00447 tests = ["test_respawn_true", "test_respawn_TRUE",
00448 "test_respawn_false", "test_respawn_FALSE",
00449 "test_respawn_none",]
00450 respawn_nodes = self._load_valid_nodes(tests)
00451 self.assertEquals(len(tests), len(respawn_nodes))
00452 for n in respawn_nodes:
00453 if n.type in ['test_respawn_true', 'test_respawn_TRUE']:
00454 self.assertEquals(True, n.respawn, "respawn for [%s] should be True"%n.type)
00455 else:
00456 self.assertEquals(False, n.respawn, "respawn for [%s] should be False"%n.type)
00457
00458 def test_env_and_include(self):
00459 mock = self._load(os.path.join(self.xml_dir, 'test-env.xml'))
00460 expected = ['test_none', 'test_one', 'test_one_two', 'test_one_two_priv', 'test_one_two_include',]
00461 self.assertEquals(set(expected), set([n.type for n in mock.nodes]))
00462 for n in mock.nodes:
00463 if n.type == 'test_none':
00464 self.assertEquals([], n.env_args)
00465 elif n.type == 'test_one':
00466 self.assertEquals([("ONE", "one")], n.env_args)
00467 elif n.type == 'test_one_two':
00468 self.assert_(("ONE", "one") in n.env_args)
00469 self.assert_(("TWO", "two") in n.env_args)
00470 elif n.type == 'test_one_two_priv':
00471 self.assert_(("ONE", "one") in n.env_args)
00472 self.assert_(("TWO", "two") in n.env_args)
00473 self.assert_(("PRIVATE_TWO", "private_two") in n.env_args)
00474 elif n.type == 'test_one_two_include':
00475 self.assert_(("ONE", "one") in n.env_args)
00476 self.assert_(("TWO", "two") in n.env_args)
00477 self.assert_(("INCLUDE", "include") in n.env_args)
00478
00479 def test_clear_params(self):
00480 true_tests = ["/test_clear_params1/","/test_clear_params2/",
00481 "/clear_params_ns/test_clear_params_ns/",
00482 "/group_test/","/embed_group_test/embedded_group/",
00483 "/include_test/",
00484 ]
00485 mock = self._load(os.path.join(self.xml_dir, 'test-clear-params.xml'))
00486 self.assertEquals(len(true_tests), len(mock.clear_params), "clear params did not match expected true: %s"%(str(mock.clear_params)))
00487 for t in true_tests:
00488 self.assert_(t in mock.clear_params, "%s was not marked for clear: %s"%(t, mock.clear_params))
00489
00490 def test_clear_params_invalid(self):
00491 tests = ['test-clear-params-invalid-1.xml', 'test-clear-params-invalid-2.xml',
00492 'test-clear-params-invalid-3.xml','test-clear-params-invalid-4.xml',]
00493 loader = roslaunch.xmlloader.XmlLoader()
00494 for filename in tests:
00495 filename = os.path.join(self.xml_dir, filename)
00496 try:
00497 self.assert_(os.path.exists(filename))
00498 loader.load(filename, RosLaunchMock())
00499 self.fail("xmlloader did not throw an xmlparseexception for [%s]"%filename)
00500 except roslaunch.xmlloader.XmlParseException, e:
00501 pass
00502
00503 def _subtest_node_base(self, nodes):
00504 node = nodes[0]
00505 self.assertEquals("package", node.package)
00506 self.assertEquals("test_base", node.type)
00507
00508 def test_node_base(self):
00509 self._subtest_node_base(self._load_valid_nodes(['test_base']))
00510 tests = self._load_valid_rostests(['test_base'])
00511 self._subtest_node_base(tests)
00512 self.assertEquals('test1', tests[0].test_name)
00513 self.assertEquals(roslaunch.core.TEST_TIME_LIMIT_DEFAULT, tests[0].time_limit)
00514
00515 def _subtest_node_args(self, nodes):
00516 for n in nodes:
00517 if n.type == 'test_args':
00518 self.assertEquals("args test", n.args)
00519 elif n.type == 'test_args_empty':
00520 self.assertEquals("", n.args)
00521
00522 def test_node_args(self):
00523 self._subtest_node_args(self._load_valid_nodes(['test_args', 'test_args_empty']))
00524 tests = self._load_valid_rostests(['test_args', 'test_args_empty'])
00525 self._subtest_node_args(tests)
00526 for n in tests:
00527 if n.type == 'test_args':
00528 self.assertEquals("test2", n.test_name)
00529 elif n.type == 'test_args_empty':
00530 self.assertEquals("test3", n.test_name)
00531
00532 def test_rostest_time_limit(self):
00533 tests = self._load_valid_rostests(['test_time_limit_int_1', 'test_time_limit_float_10_1'])
00534 for n in tests:
00535 if n.type == 'test_time_limit_int_1':
00536 self.assertAlmostEquals(1.0, n.time_limit, 3)
00537 elif n.type == 'test_time_limit_float_10_1':
00538 self.assertAlmostEquals(10.1, n.time_limit, 3)
00539
00540 def test_rostest_retry(self):
00541 n = self._load_valid_rostests(['test_retry'])[0]
00542 self.assertEquals(2, n.retry)
00543
00544 def test_node_cwd(self):
00545 nodes = self._load_valid_nodes(['test_base', 'test_cwd_1', 'test_cwd_2', 'test_cwd_3', 'test_cwd_4'])
00546 for n in nodes:
00547 if n.type == 'test_base':
00548 self.assertEquals(None, n.cwd)
00549 elif n.type == 'test_cwd_1':
00550 self.assertEquals("ros-root", n.cwd)
00551 elif n.type == 'test_cwd_2':
00552 self.assertEquals("node", n.cwd)
00553 elif n.type in ['test_cwd_3', 'test_cwd_4']:
00554 self.assertEquals("ROS_HOME", n.cwd)
00555
00556 def test_node_output(self):
00557 nodes = self._load_valid_nodes(['test_output_log', 'test_output_screen'])
00558 for n in nodes:
00559 if n.type == 'test_output_log':
00560 self.assertEquals("log", n.output)
00561 elif n.type == 'test_output_screen':
00562 self.assertEquals("screen", n.output)
00563
00564 def test_node_required(self):
00565 nodes = self._load_valid_nodes(['test_base',
00566 'test_required_true_1',
00567 'test_required_true_2',
00568 'test_required_false_1',
00569 'test_required_false_2',
00570 ])
00571 for n in nodes:
00572 if n.type.startswith('test_required_true'):
00573 self.assertEquals(True, n.required)
00574 else:
00575 self.assertEquals(False, n.required)
00576
00577 def test_node_machine(self):
00578 nodes = self._load_valid_nodes(['test_machine'])
00579 node = nodes[0]
00580 self.assertEquals("machine_test", node.machine_name)
00581
00582 def test_node_ns(self):
00583 nodes = self._load_valid_nodes(['test_ns1', 'test_ns2','test_ns3'])
00584 for n in nodes:
00585 if n.type == 'test_ns1':
00586 self.assertEquals("/ns_test1/", n.namespace)
00587 elif n.type == 'test_ns2':
00588 self.assertEquals("/ns_test2/child2/", n.namespace)
00589 elif n.type == 'test_ns3':
00590 self.assertEquals("/ns_test3/child3/", n.namespace)
00591
00592 def test_machines(self):
00593 tests = ['test-machine-invalid.xml', 'test-machine-invalid-2.xml', \
00594 'test-machine-invalid-4.xml', 'test-machine-invalid-5.xml',
00595 'test-machine-invalid-6.xml', 'test-machine-invalid-7.xml',
00596 'test-machine-invalid-8.xml',
00597 ]
00598 loader = roslaunch.xmlloader.XmlLoader()
00599 for filename in tests:
00600 filename = os.path.join(self.xml_dir, filename)
00601 try:
00602 self.assert_(os.path.exists(filename))
00603 loader.load(filename, RosLaunchMock())
00604 self.fail("xmlloader did not throw an xmlparseexception for [%s]"%filename)
00605 except roslaunch.xmlloader.XmlParseException, e:
00606 pass
00607
00608
00609 old_rr = os.environ['ROS_ROOT']
00610 old_rpp = os.environ.get('ROS_PACKAGE_PATH', None)
00611 try:
00612 os.environ['ROS_ROOT'] = '/ros/root/tm'
00613 os.environ['ROS_PACKAGE_PATH'] = '/ros/package/path/tm'
00614
00615 machines = self._load_valid_machines(['machine1', 'machine2', 'machine3', 'machine4', 'machine5'])
00616 for m in machines:
00617 if m.name == 'machine1':
00618 self.assertEquals(m.address, 'address1')
00619 self.failIf(m.ros_ip, "ros ip should not be set")
00620 self.assertEquals(m.ros_root, os.environ['ROS_ROOT'])
00621 self.assertEquals(m.ros_package_path, os.environ['ROS_PACKAGE_PATH'])
00622 elif m.name == 'machine2':
00623 self.assertEquals(m.address, 'address2')
00624 self.assertEquals(m.ros_root, '/ros/root')
00625 self.assertEquals(m.ros_package_path, '/ros/package/path')
00626 self.assertEquals(m.ros_ip, 'hostname')
00627 elif m.name == 'machine3':
00628 self.assertEquals(m.address, 'address3')
00629 self.assertEquals(m.ros_ip, "hostname")
00630
00631 self.assertEquals(m.ros_root, os.environ['ROS_ROOT'])
00632 self.assertEquals(m.ros_package_path, os.environ['ROS_PACKAGE_PATH'])
00633 elif m.name == 'machine4':
00634 self.assertEquals(m.address, 'address4')
00635 self.assertEquals(m.env_args, [('ENV1', 'value1'), ('ENV2', 'value2'), ('ENV3', 'value3')])
00636 elif m.name == 'machine5':
00637 self.assertEquals(m.address, 'address5')
00638 self.assertEquals(m.ros_root, '/ros/root')
00639
00640 self.assertEquals(m.ros_package_path, '')
00641 elif m.name == 'machine7':
00642 self.assertEquals(m.timeout, 10.0)
00643 elif m.name == 'machine8':
00644 self.assertEquals(m.timeout, 1.)
00645
00646 finally:
00647 os.environ['ROS_ROOT'] = old_rr
00648 if old_rpp is not None:
00649 os.environ['ROS_PACKAGE_PATH'] = old_rpp
00650 else:
00651 del os.environ['ROS_PACKAGE_PATH']
00652
00653 def test_node_subst(self):
00654 test_file =os.path.join(self.xml_dir, 'test-node-substitution.xml')
00655 keys = ['PACKAGE', 'TYPE', 'OUTPUT', 'RESPAWN']
00656 for k in keys:
00657 if k in os.environ:
00658 del os.environ[k]
00659 import random
00660 r = random.randint(0, 100000)
00661 if r%2:
00662 output = 'screen'
00663 respawn = 'true'
00664 else:
00665 output = 'log'
00666 respawn = 'false'
00667
00668 for k in keys[:-1]:
00669 if k in ['PACKAGE', 'TYPE']:
00670 os.environ[k] = "%s-%s"%(k.lower(), r)
00671 elif k == 'OUTPUT':
00672 os.environ['OUTPUT'] = output
00673 try:
00674 mock = self._load(test_file)
00675 self.fail("xml loader should have thrown an exception due to missing environment var")
00676 except roslaunch.xmlloader.XmlParseException, e:
00677 pass
00678
00679
00680 os.environ['RESPAWN'] = respawn
00681 mock = self._load(test_file)
00682 self.assertEquals(1, len(mock.nodes), "should only be one test node")
00683 n = mock.nodes[0]
00684 self.assertEquals(n.package, 'package-%s'%r)
00685 self.assertEquals(n.type, 'type-%s'%r)
00686 self.assertEquals(n.output, output)
00687 if respawn == 'true':
00688 self.assert_(n.respawn)
00689 else:
00690 self.failIf(n.respawn)
00691
00692 def test_machine_subst(self):
00693 test_file = os.path.join(self.xml_dir, 'test-machine-substitution.xml')
00694 old_rr = os.environ['ROS_ROOT']
00695 old_rpp = os.environ.get('ROS_PACKAGE_PATH', None)
00696
00697 try:
00698 keys = ['NAME', 'ADDRESS', 'ROS_HOST_NAME', 'ROS_ROOT', 'ROS_PACKAGE_PATH']
00699 for k in keys:
00700 if k in os.environ:
00701 del os.environ[k]
00702 import random
00703 r = random.randint(0, 100000)
00704
00705 for k in keys[:-1]:
00706 os.environ[k] = "%s-%s"%(k.lower(), r)
00707 try:
00708 mock = self._load(test_file)
00709 self.fail("xml loader should have thrown an exception due to missing environment var")
00710 except roslaunch.xmlloader.XmlParseException, e:
00711 pass
00712
00713
00714 os.environ['ROS_PACKAGE_PATH'] = '/ros/package/path-%s'%r
00715 mock = self._load(test_file)
00716 self.assertEquals(1, len(mock.machines), "should only be one test machine")
00717 m = mock.machines[0]
00718 self.assertEquals(m.name, 'name-%s'%r)
00719 self.assertEquals(m.address, 'address-%s'%r)
00720 self.assertEquals(m.ros_root, 'ros_root-%s'%r)
00721 self.assertEquals(m.ros_package_path, '/ros/package/path-%s'%r)
00722 self.assertEquals(m.ros_ip, 'ros_host_name-%s'%r)
00723 finally:
00724 os.environ['ROS_ROOT'] = old_rr
00725 if old_rpp is not None:
00726 os.environ['ROS_PACKAGE_PATH'] = old_rpp
00727 else:
00728 del os.environ['ROS_PACKAGE_PATH']
00729
00730
00731 def test_master(self):
00732 from roslaunch.core import Master
00733 tests = ['test-master-1.xml','test-master-2.xml',
00734 'test-master-3.xml','test-master-4.xml',
00735 'test-master-5.xml',
00736 ]
00737
00738 for x in xrange(1, 6):
00739 loader = roslaunch.xmlloader.XmlLoader()
00740 for filename in tests:
00741 filename = os.path.join(self.xml_dir, 'test-master-%s.xml'%x)
00742 self.assert_(os.path.exists(filename))
00743 mock = RosLaunchMock()
00744 loader.load(filename, mock)
00745
00746 def test_env(self):
00747 nodes = self._load_valid_nodes(['test_env', 'test_env_empty'])
00748 for n in nodes:
00749 if n.type == 'test_env':
00750 self.assert_(("env1", "env1 value1") in n.env_args)
00751 self.assert_(("env2", "env2 value2") in n.env_args)
00752 self.assertEquals(2, len(n.env_args))
00753 elif n.type == 'test_env_empty':
00754 self.assert_(("env1", "") in n.env_args)
00755
00756 def test_remap(self):
00757 loader = roslaunch.xmlloader.XmlLoader()
00758 mock = RosLaunchMock()
00759 loader.load(os.path.join(self.xml_dir, 'test-remap-valid.xml'), mock)
00760 names = ["node%s"%i for i in xrange(1, 7)]
00761 nodes = [n for n in mock.nodes if n.type in names]
00762 for n in nodes:
00763 if n.type == 'node1':
00764 self.assertEquals([['foo', 'bar']], n.remap_args)
00765 elif n.type == 'node2':
00766 self.assertEquals([['foo', 'baz']], n.remap_args)
00767 elif n.type == 'node3':
00768 self.assertEquals([['foo', 'bar']], n.remap_args)
00769 elif n.type == 'node4':
00770 self.assertEquals([['foo', 'far']], n.remap_args)
00771 elif n.type == 'node5':
00772 self.assertEquals([['foo', 'fad'], ['a', 'b'], ['c', 'd']], n.remap_args)
00773 elif n.type == 'node6':
00774 self.assertEquals([['foo', 'far'], ['old1', 'new1'], ['old2', 'new2'], ['old3', 'new3']], n.remap_args)
00775
00776 def test_substitution(self):
00777 mock = self._load(os.path.join(self.xml_dir, 'test-substitution.xml'))
00778
00779 for p in mock.params:
00780 self.assert_('$' not in p.key)
00781 self.assert_('$' not in p.value)
00782 for n in mock.nodes:
00783 self.assert_('$' not in n.package)
00784 self.assert_('$' not in n.type)
00785 for e in n.env_args:
00786 self.assert_('$' not in e[0])
00787 self.assert_('$' not in e[1])
00788 for r in n.remap_args:
00789 self.assert_('$' not in r[0])
00790 self.assert_('$' not in r[1])
00791 for a in n.args:
00792 self.assert_('$' not in a)
00793
00794 def test_node_invalid(self):
00795 tests = ['test-node-invalid-type.xml','test-node-invalid-type-2.xml',
00796 'test-node-invalid-pkg.xml','test-node-invalid-pkg-2.xml',
00797
00798 'test-node-invalid-name-1.xml',
00799 'test-node-invalid-name-2.xml',
00800 'test-node-invalid-name-3.xml',
00801 'test-node-invalid-machine.xml',
00802 'test-node-invalid-respawn.xml',
00803 'test-node-invalid-respawn-required.xml',
00804 'test-node-invalid-required-1.xml',
00805 'test-node-invalid-required-2.xml',
00806 'test-node-invalid-ns.xml','test-node-invalid-ns-2.xml',
00807 'test-node-invalid-env-name.xml','test-node-invalid-env-name-2.xml',
00808 'test-node-invalid-env-value.xml',
00809 'test-node-invalid-output.xml',
00810 'test-node-invalid-cwd.xml',
00811 'test-node-invalid-exception.xml',
00812
00813
00814 'test-test-invalid-reqd-1.xml',
00815 'test-test-invalid-reqd-2.xml',
00816 'test-test-invalid-respawn.xml',
00817 'test-test-invalid-output.xml',
00818 'test-test-invalid-time-limit-1.xml',
00819 'test-test-invalid-time-limit-2.xml',
00820 'test-test-invalid-retry.xml',
00821 ]
00822 loader = roslaunch.xmlloader.XmlLoader()
00823 for filename in tests:
00824 filename = os.path.join(self.xml_dir, filename)
00825 try:
00826 self.assert_(os.path.exists(filename))
00827 loader.load(filename, RosLaunchMock())
00828 self.fail("xmlloader did not throw an xmlparseexception for [%s]"%filename)
00829 except roslaunch.xmlloader.XmlParseException, e:
00830 pass
00831
00832 def test_remap_invalid(self):
00833 tests = ['test-remap-invalid-1.xml',
00834 'test-remap-invalid-2.xml',
00835 'test-remap-invalid-3.xml',
00836 'test-remap-invalid-4.xml',
00837 'test-remap-invalid-name-from.xml',
00838 'test-remap-invalid-name-to.xml',
00839 ]
00840 loader = roslaunch.xmlloader.XmlLoader()
00841 for filename in tests:
00842 filename = os.path.join(self.xml_dir, filename)
00843 try:
00844 self.assert_(os.path.exists(filename))
00845 loader.load(filename, RosLaunchMock())
00846 self.fail("xmlloader did not throw an xmlparseexception for [%s]"%filename)
00847 except roslaunch.xmlloader.XmlParseException, e:
00848 pass
00849
00850 def test_if_unless(self):
00851 mock = RosLaunchMock()
00852 loader = roslaunch.xmlloader.XmlLoader()
00853 filename = os.path.join(self.xml_dir, 'test-if-unless.xml')
00854 loader.load(filename, mock, argv=[])
00855
00856 param_d = {}
00857 for p in mock.params:
00858 param_d[p.key] = p.value
00859
00860 keys = ['group_if', 'group_unless', 'param_if', 'param_unless']
00861 for k in keys:
00862 self.assert_('/'+k+'_pass' in param_d, param_d)
00863 self.failIf('/'+k+'_fail' in param_d, k)
00864
00865 n = mock.nodes[0]
00866 for k in ['if', 'unless']:
00867 self.assert_(['from_%s_pass'%k, 'to_%s_pass'%k] in n.remap_args)
00868 self.failIf(['from_%s_fail'%k, 'to_%s_fail'%k] in n.remap_args)
00869
00870 def test_if_unless_invalid(self):
00871 mock = RosLaunchMock()
00872 loader = roslaunch.xmlloader.XmlLoader()
00873 filename = os.path.join(self.xml_dir, 'test-if-unless-invalid-both.xml')
00874
00875 try:
00876 loader.load(filename, mock, argv=[])
00877 self.fail("should have raised with invalid if and unless spec")
00878 except roslaunch.xmlloader.XmlParseException, e:
00879 self.assert_('unless' in str(e))
00880 self.assert_('if' in str(e))
00881
00882 def test_arg_invalid(self):
00883 mock = RosLaunchMock()
00884 loader = roslaunch.xmlloader.XmlLoader()
00885 filename = os.path.join(self.xml_dir, 'test-arg.xml')
00886
00887 try:
00888 loader.load(filename, mock, argv=[])
00889 self.fail("should have raised with missing arg")
00890 except roslaunch.xmlloader.XmlParseException, e:
00891 self.assert_('required' in str(e))
00892
00893
00894 filename = os.path.join(self.xml_dir, 'test-arg-invalid-sub.xml')
00895 try:
00896 loader.load(filename, mock, argv=[])
00897 self.fail("should have raised with unknown arg")
00898 except roslaunch.xmlloader.XmlParseException, e:
00899 self.assert_('missing' in str(e))
00900
00901
00902 filename = os.path.join(self.xml_dir, 'test-arg-invalid-redecl.xml')
00903 try:
00904 loader.load(filename, mock, argv=[])
00905 self.fail("should have raised with multiple decl")
00906 except roslaunch.xmlloader.XmlParseException, e:
00907 self.assert_('grounded' in str(e))
00908
00909
00910 def test_arg(self):
00911 loader = roslaunch.xmlloader.XmlLoader()
00912 filename = os.path.join(self.xml_dir, 'test-arg.xml')
00913
00914 mock = RosLaunchMock()
00915 loader.load(filename, mock, argv=["required:=test_arg", "if_test:=0"])
00916
00917 param_d = {}
00918 for p in mock.params:
00919 param_d[p.key] = p.value
00920
00921 self.assertEquals(param_d['/p1_test'], 'test_arg')
00922 self.assertEquals(param_d['/p2_test'], 'not_set')
00923 self.assertEquals(param_d['/p3_test'], 'set')
00924 self.assertEquals(param_d['/succeed'], 'yes')
00925 self.assertEquals(param_d['/if_test'], 'not_ran')
00926 self.assertEquals(param_d['/if_param'], False)
00927 self.assertEquals(param_d['/int_param'], 1234)
00928 self.assertAlmostEquals(param_d['/float_param'], 3.)
00929 self.failIf('/fail' in param_d)
00930
00931
00932
00933 self.assertEquals(param_d['/context1'], 'group1')
00934 self.assertEquals(param_d['/context2'], 'group2')
00935
00936
00937 self.assertEquals(param_d['/include_test/p1_test'], 'required1')
00938 self.assertEquals(param_d['/include_test/p2_test'], 'not_set')
00939 self.assertEquals(param_d['/include_test/p3_test'], 'set')
00940 self.assertEquals(param_d['/include_test/p4_test'], 'initial')
00941
00942 self.assertEquals(param_d['/include2/include_test/p1_test'], 'required2')
00943 self.assertEquals(param_d['/include2/include_test/p2_test'], 'optional2')
00944 self.assertEquals(param_d['/include2/include_test/p3_test'], 'set')
00945 self.assertEquals(param_d['/include2/include_test/p4_test'], 'new2')
00946
00947 self.assert_('/include3/include_test/p1_test' not in param_d)
00948 self.assert_('/include3/include_test/p2_test' not in param_d)
00949 self.assert_('/include3/include_test/p3_test' not in param_d)
00950 self.assert_('/include3/include_test/p4_test' not in param_d)
00951
00952
00953 mock = RosLaunchMock()
00954 loader.load(filename, mock, argv=["required:=test_arg", "optional:=test_arg2", "if_test:=1"])
00955
00956 param_d = {}
00957 for p in mock.params:
00958 param_d[p.key] = p.value
00959
00960 self.assertEquals(param_d['/p1_test'], 'test_arg')
00961 self.assertEquals(param_d['/p2_test'], 'test_arg2')
00962 self.assertEquals(param_d['/p3_test'], 'set')
00963 self.assertEquals(param_d['/context1'], 'group1')
00964 self.assertEquals(param_d['/context2'], 'group2')
00965 self.assertEquals(param_d['/succeed'], 'yes')
00966 self.assertEquals(param_d['/if_test'], 'ran')
00967 self.assertEquals(param_d['/if_param'], True)
00968 self.failIf('/fail' in param_d)
00969
00970
00971 self.assertEquals(param_d['/include_test/p1_test'], 'required1')
00972 self.assertEquals(param_d['/include_test/p2_test'], 'not_set')
00973 self.assertEquals(param_d['/include_test/p3_test'], 'set')
00974 self.assertEquals(param_d['/include_test/p4_test'], 'initial')
00975
00976 self.assertEquals(param_d['/include2/include_test/p1_test'], 'required2')
00977 self.assertEquals(param_d['/include2/include_test/p2_test'], 'optional2')
00978 self.assertEquals(param_d['/include2/include_test/p3_test'], 'set')
00979 self.assertEquals(param_d['/include2/include_test/p4_test'], 'new2')
00980
00981 self.assertEquals(param_d['/include3/include_test/p1_test'], 'required3')
00982 self.assertEquals(param_d['/include3/include_test/p2_test'], 'optional3')
00983 self.assertEquals(param_d['/include3/include_test/p3_test'], 'set')
00984 self.assertEquals(param_d['/include3/include_test/p4_test'], 'new3')
00985
00986 if __name__ == '__main__':
00987 rostest.unitrun('test_roslaunch', sys.argv[0], TestXmlLoader, coverage_packages=['roslaunch.xmlloader', 'roslaunch.loader'])
00988