$search
00001 #!/usr/bin/env python 00002 # Software License Agreement (BSD License) 00003 # 00004 # Copyright (c) 2008, Willow Garage, Inc. 00005 # All rights reserved. 00006 # 00007 # Redistribution and use in source and binary forms, with or without 00008 # modification, are permitted provided that the following conditions 00009 # are met: 00010 # 00011 # * Redistributions of source code must retain the above copyright 00012 # notice, this list of conditions and the following disclaimer. 00013 # * Redistributions in binary form must reproduce the above 00014 # copyright notice, this list of conditions and the following 00015 # disclaimer in the documentation and/or other materials provided 00016 # with the distribution. 00017 # * Neither the name of Willow Garage, Inc. nor the names of its 00018 # contributors may be used to endorse or promote products derived 00019 # from this software without specific prior written permission. 00020 # 00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 # POSSIBILITY OF SUCH DAMAGE. 00033 # 00034 # Revision $Id$ 00035 00036 import roslib; roslib.load_manifest('test_roslaunch') 00037 00038 import os, sys, unittest 00039 00040 import rostest 00041 import roslaunch.loader 00042 import roslaunch.xmlloader 00043 00044 ## Fake RosLaunch object 00045 class RosLaunchMock(object): 00046 def __init__(self): 00047 self.nodes_core = [] 00048 self.nodes = [] 00049 self.tests = [] 00050 self.params = [] 00051 self.executables = [] 00052 self.clear_params = [] 00053 self.machines = [] 00054 self.master = None 00055 self.config_errors = [] 00056 self.roslaunch_files = [] 00057 def set_master(self, m): 00058 self.master = m 00059 def add_machine(self, m, verbose=True): 00060 self.machines.append(m) 00061 def add_roslaunch_file(self, f): 00062 self.roslaunch_files.append(f) 00063 def add_node(self, n, core=False, verbose=True): 00064 if not core: 00065 self.nodes.append(n) 00066 else: 00067 self.nodes_core.append(n) 00068 00069 def add_config_error(self, msg): 00070 self.config_errors.append(msg) 00071 00072 def add_test(self, t, verbose=True): 00073 self.tests.append(t) 00074 def add_executable(self, t): 00075 self.executables.append(t) 00076 00077 def add_param(self, p, filename=None, verbose=True): 00078 matches = [x for x in self.params if x.key == p.key] 00079 for m in matches: 00080 self.params.remove(m) 00081 self.params.append(p) 00082 def add_clear_param(self, param): 00083 self.clear_params.append(param) 00084 00085 00086 ## Test Roslaunch XML parser 00087 class TestXmlLoader(unittest.TestCase): 00088 00089 def setUp(self): 00090 from roslib.packages import get_pkg_dir 00091 self.xml_dir = os.path.join(get_pkg_dir('test_roslaunch'), 'test', 'xml') 00092 00093 def _load(self, test_file): 00094 loader = roslaunch.xmlloader.XmlLoader() 00095 mock = RosLaunchMock() 00096 self.assert_(os.path.exists(test_file), "cannot locate test file %s"%test_file) 00097 loader.load(test_file, mock) 00098 return mock 00099 00100 def _load_valid_nodes(self, tests): 00101 mock = self._load(os.path.join(self.xml_dir, 'test-node-valid.xml')) 00102 nodes = [n for n in mock.nodes if n.type in tests] 00103 self.assertEquals(len(tests), len(nodes)) 00104 return nodes 00105 00106 def _load_valid_rostests(self, tests): 00107 mock = self._load(os.path.join(self.xml_dir, 'test-test-valid.xml')) 00108 nodes = [n for n in mock.tests if n.type in tests] 00109 self.assertEquals(len(tests), len(nodes)) 00110 return nodes 00111 00112 def _load_valid_machines(self, tests): 00113 mock = self._load(os.path.join(self.xml_dir, 'test-machine-valid.xml')) 00114 machines = [m for m in mock.machines if m.name in tests] 00115 self.assertEquals(len(tests), len(machines)) 00116 return machines 00117 00118 def test_load_string(self): 00119 # make sure load_string isn't broken 00120 loader = roslaunch.xmlloader.XmlLoader() 00121 mock = RosLaunchMock() 00122 00123 # test with no <launch /> element 00124 # #1582: test error message as well 00125 try: 00126 loader.load_string('<foo />', mock) 00127 self.fail("no root lauch element passed") 00128 except Exception, e: 00129 self.assertEquals(str(e), "Invalid roslaunch XML syntax: no root <launch> tag") 00130 00131 f = open(os.path.join(self.xml_dir, 'test-node-valid.xml'), 'r') 00132 try: 00133 s = f.read() 00134 finally: 00135 f.close() 00136 loader.load_string(s, mock) 00137 # sanity check 00138 self.assert_(mock.nodes) 00139 self.assert_([n for n in mock.nodes if n.type == 'test_base']) 00140 00141 # check exception case 00142 f = open(os.path.join(self.xml_dir, 'invalid-xml.xml'), 'r') 00143 try: 00144 s = f.read() 00145 finally: 00146 f.close() 00147 try: 00148 loader.load_string(s, mock) 00149 self.fail('load_string should have thrown an exception') 00150 except roslaunch.xmlloader.XmlParseException: 00151 pass 00152 00153 def test_load(self): 00154 # make sure load isn't broken 00155 loader = roslaunch.xmlloader.XmlLoader() 00156 00157 # test against empty data 00158 loader.load(os.path.join(self.xml_dir, 'test-valid.xml'), RosLaunchMock()) 00159 00160 # sanity check with real data 00161 mock = RosLaunchMock() 00162 00163 loader.load(os.path.join(self.xml_dir, 'test-node-valid.xml'), mock) 00164 self.assert_(mock.nodes) 00165 self.assert_([n for n in mock.nodes if n.type == 'test_base']) 00166 00167 # check exception case 00168 try: 00169 loader.load(os.path.join(self.xml_dir, 'invalid-xml.xml'), mock) 00170 self.fail('load_string should have thrown an exception') 00171 except roslaunch.xmlloader.XmlParseException: 00172 pass 00173 00174 def test_params_invalid(self): 00175 tests = ['test-params-invalid-%s.xml'%i for i in range(1, 6)] 00176 loader = roslaunch.xmlloader.XmlLoader() 00177 for filename in tests: 00178 filename = os.path.join(self.xml_dir, filename) 00179 try: 00180 self.assert_(os.path.exists(filename)) 00181 loader.load(filename, RosLaunchMock()) 00182 self.fail("xmlloader did not throw an xmlparseexception for [%s]"%filename) 00183 except roslaunch.xmlloader.XmlParseException, e: 00184 pass 00185 except roslaunch.loader.LoadException, e: 00186 pass 00187 00188 def test_params(self): 00189 mock = self._load(os.path.join(self.xml_dir, 'test-params-valid.xml')) 00190 00191 param_d = {} 00192 for p in mock.params: 00193 param_d[p.key] = p.value 00194 00195 self.assertEquals('pass', param_d['/override']) 00196 self.assertEquals('bar2', param_d['/somestring1']) 00197 self.assertEquals('10', param_d['/somestring2']) 00198 self.assertEquals(1, param_d['/someinteger1']) 00199 self.assertEquals(2, param_d['/someinteger2']) 00200 self.assertAlmostEquals(3.14159, param_d['/somefloat1'], 2) 00201 self.assertAlmostEquals(5.0, param_d['/somefloat2'], 1) 00202 self.assertEquals("a child namespace parameter 1", param_d['/wg/wgchildparam'], p.value) 00203 self.assertEquals("a child namespace parameter 2", param_d['/wg2/wg2childparam1'], p.value) 00204 self.assertEquals("a child namespace parameter 3", param_d['/wg2/wg2childparam2'], p.value) 00205 00206 import xmlrpclib 00207 from roslib.packages import get_pkg_dir 00208 f = open(os.path.join(get_pkg_dir('roslaunch'), 'example.launch')) 00209 try: 00210 contents = f.read() 00211 finally: 00212 f.close() 00213 p = [p for p in mock.params if p.key == '/configfile'][0] 00214 self.assertEquals(contents, p.value, 1) 00215 p = [p for p in mock.params if p.key == '/binaryfile'][0] 00216 self.assertEquals(xmlrpclib.Binary(contents), p.value, 1) 00217 00218 f = open(os.path.join(get_pkg_dir('roslaunch'), 'example.launch')) 00219 try: 00220 contents = f.read() 00221 finally: 00222 f.close() 00223 p = [p for p in mock.params if p.key == '/commandoutput'][0] 00224 self.assertEquals(contents, p.value, 1) 00225 00226 00227 def test_rosparam_valid(self): 00228 mock = self._load(os.path.join(self.xml_dir, 'test-rosparam-valid.xml')) 00229 00230 for prefix in ['', '/rosparam', '/node_rosparam']: 00231 p = [p for p in mock.params if p.key == prefix+'/string1'][0] 00232 self.assertEquals('bar', p.value) 00233 p = [p for p in mock.params if p.key == prefix+'/robots/childparam'][0] 00234 self.assertEquals('a child namespace parameter', p.value) 00235 00236 p = [p for p in mock.params if p.key == '/node_rosparam/string1'][0] 00237 self.assertEquals('bar', p.value) 00238 p = [p for p in mock.params if p.key == '/node_rosparam/robots/childparam'][0] 00239 self.assertEquals('a child namespace parameter', p.value) 00240 00241 exes = [e for e in mock.executables if e.command == 'rosparam'] 00242 self.assertEquals(len(exes), 2, "expected 2 rosparam exes, got %s"%len(exes)) 00243 from roslaunch.core import PHASE_SETUP 00244 for e in exes: 00245 self.assertEquals(PHASE_SETUP, e.phase) 00246 args = e.args 00247 self.assertEquals(3, len(args), "expected 3 args, got %s"%str(args)) 00248 rp_cmd, rp_file, rp_ctx = args 00249 self.failIf('$(find' in rp_file, "file attribute was not evaluated") 00250 self.assertEquals('dump', rp_cmd) 00251 00252 # verify that the context is passed in correctly 00253 if rp_file.endswith('dump.yaml'): 00254 self.assertEquals('/', rp_ctx) 00255 elif rp_file.endswith('dump2.yaml'): 00256 self.assertEquals('/rosparam/', rp_ctx) 00257 00258 # test inline yaml examples 00259 p = [p for p in mock.params if p.key == '/inline_str'][0] 00260 self.assertEquals('value1', p.value) 00261 p = [p for p in mock.params if p.key == '/inline_list'][0] 00262 self.assertEquals([1, 2, 3, 4], p.value) 00263 p = [p for p in mock.params if p.key == '/inline_dict/key1'][0] 00264 self.assertEquals('value1', p.value) 00265 p = [p for p in mock.params if p.key == '/inline_dict/key2'][0] 00266 self.assertEquals('value2', p.value) 00267 p = [p for p in mock.params if p.key == '/inline_dict2/key3'][0] 00268 self.assertEquals('value3', p.value) 00269 p = [p for p in mock.params if p.key == '/inline_dict2/key4'][0] 00270 self.assertEquals('value4', p.value) 00271 00272 # verify that later tags override 00273 # - key2 is overriden 00274 self.assertEquals(1, len([p for p in mock.params if p.key == '/override/key1'])) 00275 p = [p for p in mock.params if p.key == '/override/key1'][0] 00276 self.assertEquals('override1', p.value) 00277 # - key2 is not overriden 00278 p = [p for p in mock.params if p.key == '/override/key2'][0] 00279 self.assertEquals('value2', p.value) 00280 00281 # verify that 'param' attribute is not required 00282 p = [p for p in mock.params if p.key == '/noparam1'][0] 00283 self.assertEquals('value1', p.value) 00284 p = [p for p in mock.params if p.key == '/noparam2'][0] 00285 self.assertEquals('value2', p.value) 00286 00287 # #3580: test degree/rad conversions 00288 import math 00289 p = [p for p in mock.params if p.key == '/inline_degrees0'][0] 00290 self.assertAlmostEquals(0, p.value) 00291 p = [p for p in mock.params if p.key == '/inline_degrees180'][0] 00292 self.assertAlmostEquals(p.value, math.pi) 00293 p = [p for p in mock.params if p.key == '/inline_degrees360'][0] 00294 self.assertAlmostEquals(p.value, 2 * math.pi) 00295 00296 p = [p for p in mock.params if p.key == '/dict_degrees/deg0'][0] 00297 self.assertAlmostEquals(0, p.value) 00298 p = [p for p in mock.params if p.key == '/dict_degrees/deg180'][0] 00299 self.assertAlmostEquals(p.value, math.pi) 00300 p = [p for p in mock.params if p.key == '/dict_degrees/deg360'][0] 00301 self.assertAlmostEquals(p.value, 2 * math.pi) 00302 00303 p = [p for p in mock.params if p.key == '/inline_rad0'][0] 00304 self.assertAlmostEquals(0, p.value) 00305 p = [p for p in mock.params if p.key == '/inline_radpi'][0] 00306 self.assertAlmostEquals(p.value, math.pi) 00307 p = [p for p in mock.params if p.key == '/inline_rad2pi'][0] 00308 self.assertAlmostEquals(p.value, 2 * math.pi) 00309 00310 p = [p for p in mock.params if p.key == '/dict_rad/rad0'][0] 00311 self.assertAlmostEquals(0, p.value) 00312 p = [p for p in mock.params if p.key == '/dict_rad/radpi'][0] 00313 self.assertAlmostEquals(p.value, math.pi) 00314 p = [p for p in mock.params if p.key == '/dict_rad/rad2pi'][0] 00315 self.assertAlmostEquals(p.value, 2 * math.pi) 00316 00317 # rosparam file also contains empty params 00318 mock = self._load(os.path.join(self.xml_dir, 'test-rosparam-empty.xml')) 00319 self.assertEquals([], mock.params) 00320 00321 def test_rosparam_invalid(self): 00322 tests = ['test-rosparam-invalid-%s.xml'%i for i in range(1, 6)] 00323 loader = roslaunch.xmlloader.XmlLoader() 00324 for filename in tests: 00325 filename = os.path.join(self.xml_dir, filename) 00326 try: 00327 self.assert_(os.path.exists(filename)) 00328 loader.load(filename, RosLaunchMock()) 00329 self.fail("xmlloader did not throw an xmlloadexception for [%s]"%filename) 00330 except roslaunch.loader.LoadException, e: 00331 pass 00332 00333 def test_node_valid(self): 00334 nodes = self._load_valid_nodes([]) 00335 00336 def test_rostest_valid(self): 00337 nodes = self._load_valid_rostests([]) 00338 00339 def test_node_rosparam_invalid(self): 00340 tests = ['test-node-rosparam-invalid-name.xml'] 00341 loader = roslaunch.xmlloader.XmlLoader() 00342 for filename in tests: 00343 filename = os.path.join(self.xml_dir, filename) 00344 try: 00345 self.assert_(os.path.exists(filename)) 00346 loader.load(filename, RosLaunchMock()) 00347 self.fail("xmlloader did not throw an xmlparseexception for [%s]"%filename) 00348 except roslaunch.xmlloader.XmlParseException, e: 00349 pass 00350 00351 def test_node_rosparam(self): 00352 from roslaunch.core import PHASE_SETUP 00353 00354 tests = [("test-node-rosparam-load.xml", "test_node_rosparam_load"), 00355 ("test-node-rosparam-dump.xml","test_node_rosparam_dump"), 00356 ("test-node-rosparam-delete.xml","test_node_rosparam_delete"), 00357 ("test-node-rosparam-load-multi.xml", "test_node_rosparam_multi"), 00358 ("test-node-rosparam-load-param.xml", "test_node_rosparam_load_param"), 00359 ("test-node-rosparam-load-ns.xml", "test_node_rosparam_load_ns")] 00360 for f, test in tests: 00361 00362 mock = self._load(os.path.join(self.xml_dir, f)) 00363 nodes = [n for n in mock.nodes if n.type == test] 00364 self.assertEquals(1, len(nodes)) 00365 n = nodes[0] 00366 00367 exes = [e for e in mock.executables if e.command == 'rosparam'] 00368 00369 if n.type == "test_node_rosparam_load": 00370 self.assertEquals(0, len(exes)) 00371 p = [p for p in mock.params if p.key == '/rosparam_load/string1'][0] 00372 self.assertEquals('bar', p.value) 00373 p = [p for p in mock.params if p.key == '/rosparam_load/robots/childparam'][0] 00374 self.assertEquals('a child namespace parameter', p.value) 00375 elif n.type == "test_node_rosparam_delete": 00376 self.assertEquals(1, len(exes)) 00377 self.assert_(len(exes[0].args) == 2, "invalid arg: %s"%(str(exes[0].args))) 00378 rp_cmd, rp_param = exes[0].args 00379 self.assertEquals("delete", rp_cmd) 00380 self.assertEquals("/ns1/rosparam_delete/ns2/param", rp_param) 00381 elif n.type == "test_node_rosparam_dump": 00382 self.assertEquals(1, len(exes)) 00383 rp_cmd, rp_file, rp_ctx = exes[0].args 00384 self.assertEquals("dump", rp_cmd) 00385 self.assertEquals("dump.yaml", rp_file) 00386 self.assertEquals('/rosparam_dump/', rp_ctx) 00387 elif n.type == "test_node_rosparam_load_ns": 00388 self.assertEquals(0, len(exes)) 00389 p = [p for p in mock.params if p.key == '/load_ns/subns/string1'][0] 00390 self.assertEquals('bar', p.value) 00391 p = [p for p in mock.params if p.key == '/load_ns/subns/robots/childparam'][0] 00392 self.assertEquals('a child namespace parameter', p.value) 00393 elif n.type == "test_node_rosparam_load_param": 00394 self.assertEquals(0, len(exes)) 00395 p = [p for p in mock.params if p.key == '/load_param/param/string1'][0] 00396 self.assertEquals('bar', p.value) 00397 p = [p for p in mock.params if p.key == '/load_param/param/robots/childparam'][0] 00398 self.assertEquals('a child namespace parameter', p.value) 00399 elif n.type == "test_node_rosparam_multi": 00400 self.assertEquals(1, len(exes)) 00401 e = exes[0] 00402 rp_cmd, rp_file, rp_ctx = e.args 00403 self.assertEquals("dump", rp_cmd) 00404 self.assertEquals("mdump.yaml", rp_file) 00405 self.assertEquals('/rosparam_multi/', rp_ctx) 00406 00407 # test two other rosparam tags 00408 p = [p for p in mock.params if p.key == '/rosparam_multi/string1'][0] 00409 self.assertEquals('bar', p.value) 00410 p = [p for p in mock.params if p.key == '/rosparam_multi/robots/childparam'][0] 00411 self.assertEquals('a child namespace parameter', p.value) 00412 00413 p = [p for p in mock.params if p.key == '/rosparam_multi/msubns/string1'][0] 00414 self.assertEquals('bar', p.value) 00415 p = [p for p in mock.params if p.key == '/rosparam_multi/msubns/robots/childparam'][0] 00416 self.assertEquals('a child namespace parameter', p.value) 00417 00418 ## test that ~params in groups get applied to later members of group 00419 def test_local_param_group(self): 00420 mock = self._load(os.path.join(self.xml_dir, 'test-local-param-group.xml')) 00421 correct = [ 00422 u'/group1/g1node1/gparam1', 00423 u'/group1/g1node2/gparam1', 00424 u'/group1/g1node2/gparam2', 00425 u'/node1/param1', 00426 ] 00427 p_names = [p.key for p in mock.params] 00428 self.assertEquals(set([]), set(correct) ^ set(p_names), "%s does not match %s"%(p_names, correct)) 00429 00430 def test_node_param(self): 00431 mock = self._load(os.path.join(self.xml_dir, 'test-node-valid.xml')) 00432 tests = [('/test_private_param1/foo1', 'bar1'), 00433 ('/ns_test/test_private_param2/foo2', 'bar2'), 00434 ('/test_private_param3/foo3', 'bar3'), ] 00435 for k, v in tests: 00436 p = [p for p in mock.params if p.key == k] 00437 self.assertEquals(1, len(p), "%s not present in parameters: %s"%(k, mock.params)) 00438 self.assertEquals(v, p[0].value) 00439 node_types = [n.type for n in mock.nodes] 00440 00441 def test_roslaunch_files(self): 00442 f = os.path.join(self.xml_dir, 'test-env.xml') 00443 f2 = os.path.join(self.xml_dir, 'test-env-include.xml') 00444 mock = self._load(f) 00445 self.assertEquals(set([f, f2]), set(mock.roslaunch_files)) 00446 00447 def test_launch_prefix(self): 00448 nodes = self._load_valid_nodes(['test_launch_prefix']) 00449 self.assertEquals(1, len(nodes)) 00450 self.assertEquals('xterm -e gdb --args', nodes[0].launch_prefix) 00451 nodes = self._load_valid_nodes(['test_base']) 00452 self.assertEquals(1, len(nodes)) 00453 self.assertEquals(None, nodes[0].launch_prefix) 00454 00455 def test_respawn(self): 00456 tests = ["test_respawn_true", "test_respawn_TRUE", 00457 "test_respawn_false", "test_respawn_FALSE", 00458 "test_respawn_none",] 00459 respawn_nodes = self._load_valid_nodes(tests) 00460 self.assertEquals(len(tests), len(respawn_nodes)) 00461 for n in respawn_nodes: 00462 if n.type in ['test_respawn_true', 'test_respawn_TRUE']: 00463 self.assertEquals(True, n.respawn, "respawn for [%s] should be True"%n.type) 00464 else: 00465 self.assertEquals(False, n.respawn, "respawn for [%s] should be False"%n.type) 00466 00467 def test_env_and_include(self): 00468 mock = self._load(os.path.join(self.xml_dir, 'test-env.xml')) 00469 expected = ['test_none', 'test_one', 'test_one_two', 'test_one_two_priv', 'test_one_two_include',] 00470 self.assertEquals(set(expected), set([n.type for n in mock.nodes])) 00471 for n in mock.nodes: 00472 if n.type == 'test_none': 00473 self.assertEquals([], n.env_args) 00474 elif n.type == 'test_one': 00475 self.assertEquals([("ONE", "one")], n.env_args) 00476 elif n.type == 'test_one_two': 00477 self.assert_(("ONE", "one") in n.env_args) 00478 self.assert_(("TWO", "two") in n.env_args) 00479 elif n.type == 'test_one_two_priv': 00480 self.assert_(("ONE", "one") in n.env_args) 00481 self.assert_(("TWO", "two") in n.env_args) 00482 self.assert_(("PRIVATE_TWO", "private_two") in n.env_args) 00483 elif n.type == 'test_one_two_include': 00484 self.assert_(("ONE", "one") in n.env_args) 00485 self.assert_(("TWO", "two") in n.env_args) 00486 self.assert_(("INCLUDE", "include") in n.env_args) 00487 00488 def test_clear_params(self): 00489 true_tests = ["/test_clear_params1/","/test_clear_params2/", 00490 "/clear_params_ns/test_clear_params_ns/", 00491 "/group_test/","/embed_group_test/embedded_group/", 00492 "/include_test/", 00493 ] 00494 mock = self._load(os.path.join(self.xml_dir, 'test-clear-params.xml')) 00495 self.assertEquals(len(true_tests), len(mock.clear_params), "clear params did not match expected true: %s"%(str(mock.clear_params))) 00496 for t in true_tests: 00497 self.assert_(t in mock.clear_params, "%s was not marked for clear: %s"%(t, mock.clear_params)) 00498 00499 def test_clear_params_invalid(self): 00500 tests = ['test-clear-params-invalid-1.xml', 'test-clear-params-invalid-2.xml', 00501 'test-clear-params-invalid-3.xml','test-clear-params-invalid-4.xml',] 00502 loader = roslaunch.xmlloader.XmlLoader() 00503 for filename in tests: 00504 filename = os.path.join(self.xml_dir, filename) 00505 try: 00506 self.assert_(os.path.exists(filename)) 00507 loader.load(filename, RosLaunchMock()) 00508 self.fail("xmlloader did not throw an xmlparseexception for [%s]"%filename) 00509 except roslaunch.xmlloader.XmlParseException, e: 00510 pass 00511 00512 def _subtest_node_base(self, nodes): 00513 node = nodes[0] 00514 self.assertEquals("package", node.package) 00515 self.assertEquals("test_base", node.type) 00516 00517 def test_node_base(self): 00518 self._subtest_node_base(self._load_valid_nodes(['test_base'])) 00519 tests = self._load_valid_rostests(['test_base']) 00520 self._subtest_node_base(tests) 00521 self.assertEquals('test1', tests[0].test_name) 00522 self.assertEquals(roslaunch.core.TEST_TIME_LIMIT_DEFAULT, tests[0].time_limit) 00523 00524 def _subtest_node_args(self, nodes): 00525 for n in nodes: 00526 if n.type == 'test_args': 00527 self.assertEquals("args test", n.args) 00528 elif n.type == 'test_args_empty': 00529 self.assertEquals("", n.args) 00530 00531 def test_node_args(self): 00532 self._subtest_node_args(self._load_valid_nodes(['test_args', 'test_args_empty'])) 00533 tests = self._load_valid_rostests(['test_args', 'test_args_empty']) 00534 self._subtest_node_args(tests) 00535 for n in tests: 00536 if n.type == 'test_args': 00537 self.assertEquals("test2", n.test_name) 00538 elif n.type == 'test_args_empty': 00539 self.assertEquals("test3", n.test_name) 00540 00541 def test_rostest_time_limit(self): 00542 tests = self._load_valid_rostests(['test_time_limit_int_1', 'test_time_limit_float_10_1']) 00543 for n in tests: 00544 if n.type == 'test_time_limit_int_1': 00545 self.assertAlmostEquals(1.0, n.time_limit, 3) 00546 elif n.type == 'test_time_limit_float_10_1': 00547 self.assertAlmostEquals(10.1, n.time_limit, 3) 00548 00549 def test_rostest_retry(self): 00550 n = self._load_valid_rostests(['test_retry'])[0] 00551 self.assertEquals(2, n.retry) 00552 00553 def test_node_cwd(self): 00554 nodes = self._load_valid_nodes(['test_base', 'test_cwd_1', 'test_cwd_2', 'test_cwd_3', 'test_cwd_4']) 00555 for n in nodes: 00556 if n.type == 'test_base': 00557 self.assertEquals(None, n.cwd) 00558 elif n.type == 'test_cwd_1': 00559 self.assertEquals("ros-root", n.cwd) 00560 elif n.type == 'test_cwd_2': 00561 self.assertEquals("node", n.cwd) 00562 elif n.type in ['test_cwd_3', 'test_cwd_4']: 00563 self.assertEquals("ROS_HOME", n.cwd) 00564 00565 def test_node_output(self): 00566 nodes = self._load_valid_nodes(['test_output_log', 'test_output_screen']) 00567 for n in nodes: 00568 if n.type == 'test_output_log': 00569 self.assertEquals("log", n.output) 00570 elif n.type == 'test_output_screen': 00571 self.assertEquals("screen", n.output) 00572 00573 def test_node_required(self): 00574 nodes = self._load_valid_nodes(['test_base', 00575 'test_required_true_1', 00576 'test_required_true_2', 00577 'test_required_false_1', 00578 'test_required_false_2', 00579 ]) 00580 for n in nodes: 00581 if n.type.startswith('test_required_true'): 00582 self.assertEquals(True, n.required) 00583 else: 00584 self.assertEquals(False, n.required) 00585 00586 def test_node_machine(self): 00587 nodes = self._load_valid_nodes(['test_machine']) 00588 node = nodes[0] 00589 self.assertEquals("machine_test", node.machine_name) 00590 00591 def test_node_ns(self): 00592 nodes = self._load_valid_nodes(['test_ns1', 'test_ns2','test_ns3']) 00593 for n in nodes: 00594 if n.type == 'test_ns1': 00595 self.assertEquals("/ns_test1/", n.namespace) 00596 elif n.type == 'test_ns2': 00597 self.assertEquals("/ns_test2/child2/", n.namespace) 00598 elif n.type == 'test_ns3': 00599 self.assertEquals("/ns_test3/child3/", n.namespace) 00600 00601 def test_machines(self): 00602 tests = ['test-machine-invalid.xml', 'test-machine-invalid-2.xml', \ 00603 'test-machine-invalid-4.xml', 'test-machine-invalid-5.xml', 00604 'test-machine-invalid-6.xml', 'test-machine-invalid-7.xml', 00605 'test-machine-invalid-8.xml', 00606 ] 00607 loader = roslaunch.xmlloader.XmlLoader() 00608 for filename in tests: 00609 filename = os.path.join(self.xml_dir, filename) 00610 try: 00611 self.assert_(os.path.exists(filename)) 00612 loader.load(filename, RosLaunchMock()) 00613 self.fail("xmlloader did not throw an xmlparseexception for [%s]"%filename) 00614 except roslaunch.xmlloader.XmlParseException, e: 00615 pass 00616 00617 00618 old_rr = os.environ['ROS_ROOT'] 00619 old_rpp = os.environ.get('ROS_PACKAGE_PATH', None) 00620 try: 00621 os.environ['ROS_ROOT'] = '/ros/root/tm' 00622 os.environ['ROS_PACKAGE_PATH'] = '/ros/package/path/tm' 00623 00624 machines = self._load_valid_machines(['machine1', 'machine2', 'machine3', 'machine4', 'machine5']) 00625 for m in machines: 00626 if m.name == 'machine1': 00627 self.assertEquals(m.address, 'address1') 00628 self.failIf(m.ros_ip, "ros ip should not be set") 00629 self.assertEquals(m.ros_root, os.environ['ROS_ROOT']) 00630 self.assertEquals(m.ros_package_path, os.environ['ROS_PACKAGE_PATH']) 00631 elif m.name == 'machine2': 00632 self.assertEquals(m.address, 'address2') 00633 self.assertEquals(m.ros_root, '/ros/root') 00634 self.assertEquals(m.ros_package_path, '/ros/package/path') 00635 self.assertEquals(m.ros_ip, 'hostname') 00636 elif m.name == 'machine3': 00637 self.assertEquals(m.address, 'address3') 00638 self.assertEquals(m.ros_ip, "hostname") 00639 # should default to environment if not set 00640 self.assertEquals(m.ros_root, os.environ['ROS_ROOT']) 00641 self.assertEquals(m.ros_package_path, os.environ['ROS_PACKAGE_PATH']) 00642 elif m.name == 'machine4': 00643 self.assertEquals(m.address, 'address4') 00644 self.assertEquals(m.env_args, [('ENV1', 'value1'), ('ENV2', 'value2'), ('ENV3', 'value3')]) 00645 elif m.name == 'machine5': 00646 self.assertEquals(m.address, 'address5') 00647 self.assertEquals(m.ros_root, '/ros/root') 00648 # ros_package_path does not get a default value if ros-root is explicitly set 00649 self.assertEquals(m.ros_package_path, '') 00650 elif m.name == 'machine7': 00651 self.assertEquals(m.timeout, 10.0) 00652 elif m.name == 'machine8': 00653 self.assertEquals(m.timeout, 1.) 00654 00655 finally: 00656 os.environ['ROS_ROOT'] = old_rr 00657 if old_rpp is not None: 00658 os.environ['ROS_PACKAGE_PATH'] = old_rpp 00659 else: 00660 del os.environ['ROS_PACKAGE_PATH'] 00661 00662 def test_node_subst(self): 00663 test_file =os.path.join(self.xml_dir, 'test-node-substitution.xml') 00664 keys = ['PACKAGE', 'TYPE', 'OUTPUT', 'RESPAWN'] 00665 for k in keys: 00666 if k in os.environ: 00667 del os.environ[k] 00668 import random 00669 r = random.randint(0, 100000) 00670 if r%2: 00671 output = 'screen' 00672 respawn = 'true' 00673 else: 00674 output = 'log' 00675 respawn = 'false' 00676 # append all but one required key and make sure we fail 00677 for k in keys[:-1]: 00678 if k in ['PACKAGE', 'TYPE']: 00679 os.environ[k] = "%s-%s"%(k.lower(), r) 00680 elif k == 'OUTPUT': 00681 os.environ['OUTPUT'] = output 00682 try: 00683 mock = self._load(test_file) 00684 self.fail("xml loader should have thrown an exception due to missing environment var") 00685 except roslaunch.xmlloader.XmlParseException, e: 00686 pass 00687 00688 # load the last required env var 00689 os.environ['RESPAWN'] = respawn 00690 mock = self._load(test_file) 00691 self.assertEquals(1, len(mock.nodes), "should only be one test node") 00692 n = mock.nodes[0] 00693 self.assertEquals(n.package, 'package-%s'%r) 00694 self.assertEquals(n.type, 'type-%s'%r) 00695 self.assertEquals(n.output, output) 00696 if respawn == 'true': 00697 self.assert_(n.respawn) 00698 else: 00699 self.failIf(n.respawn) 00700 00701 def test_machine_subst(self): 00702 test_file = os.path.join(self.xml_dir, 'test-machine-substitution.xml') 00703 old_rr = os.environ['ROS_ROOT'] 00704 old_rpp = os.environ.get('ROS_PACKAGE_PATH', None) 00705 00706 try: 00707 keys = ['NAME', 'ADDRESS', 'ROS_HOST_NAME', 'ROS_ROOT', 'ROS_PACKAGE_PATH'] 00708 for k in keys: 00709 if k in os.environ: 00710 del os.environ[k] 00711 import random 00712 r = random.randint(0, 100000) 00713 # append all but one required key and make sure we fail 00714 for k in keys[:-1]: 00715 os.environ[k] = "%s-%s"%(k.lower(), r) 00716 try: 00717 mock = self._load(test_file) 00718 self.fail("xml loader should have thrown an exception due to missing environment var") 00719 except roslaunch.xmlloader.XmlParseException, e: 00720 pass 00721 00722 # load the last required env var 00723 os.environ['ROS_PACKAGE_PATH'] = '/ros/package/path-%s'%r 00724 mock = self._load(test_file) 00725 self.assertEquals(1, len(mock.machines), "should only be one test machine") 00726 m = mock.machines[0] 00727 self.assertEquals(m.name, 'name-%s'%r) 00728 self.assertEquals(m.address, 'address-%s'%r) 00729 self.assertEquals(m.ros_root, 'ros_root-%s'%r) 00730 self.assertEquals(m.ros_package_path, '/ros/package/path-%s'%r) 00731 self.assertEquals(m.ros_ip, 'ros_host_name-%s'%r) 00732 finally: 00733 os.environ['ROS_ROOT'] = old_rr 00734 if old_rpp is not None: 00735 os.environ['ROS_PACKAGE_PATH'] = old_rpp 00736 else: 00737 del os.environ['ROS_PACKAGE_PATH'] 00738 00739 00740 def test_master(self): 00741 from roslaunch.core import Master 00742 tests = ['test-master-1.xml','test-master-2.xml', 00743 'test-master-3.xml','test-master-4.xml', 00744 'test-master-5.xml', 00745 ] 00746 # tests should still load, but nothing more 00747 for x in xrange(1, 6): 00748 loader = roslaunch.xmlloader.XmlLoader() 00749 for filename in tests: 00750 filename = os.path.join(self.xml_dir, 'test-master-%s.xml'%x) 00751 self.assert_(os.path.exists(filename)) 00752 mock = RosLaunchMock() 00753 loader.load(filename, mock) 00754 00755 def test_env(self): 00756 nodes = self._load_valid_nodes(['test_env', 'test_env_empty']) 00757 for n in nodes: 00758 if n.type == 'test_env': 00759 self.assert_(("env1", "env1 value1") in n.env_args) 00760 self.assert_(("env2", "env2 value2") in n.env_args) 00761 self.assertEquals(2, len(n.env_args)) 00762 elif n.type == 'test_env_empty': 00763 self.assert_(("env1", "") in n.env_args) 00764 00765 def test_remap(self): 00766 loader = roslaunch.xmlloader.XmlLoader() 00767 mock = RosLaunchMock() 00768 loader.load(os.path.join(self.xml_dir, 'test-remap-valid.xml'), mock) 00769 names = ["node%s"%i for i in xrange(1, 7)] 00770 nodes = [n for n in mock.nodes if n.type in names] 00771 for n in nodes: 00772 if n.type == 'node1': 00773 self.assertEquals([['foo', 'bar']], n.remap_args) 00774 elif n.type == 'node2': 00775 self.assertEquals([['foo', 'baz']], n.remap_args) 00776 elif n.type == 'node3': 00777 self.assertEquals([['foo', 'bar']], n.remap_args) 00778 elif n.type == 'node4': 00779 self.assertEquals([['foo', 'far']], n.remap_args) 00780 elif n.type == 'node5': 00781 self.assertEquals([['foo', 'fad'], ['a', 'b'], ['c', 'd']], n.remap_args) 00782 elif n.type == 'node6': 00783 self.assertEquals([['foo', 'far'], ['old1', 'new1'], ['old2', 'new2'], ['old3', 'new3']], n.remap_args) 00784 00785 def test_substitution(self): 00786 mock = self._load(os.path.join(self.xml_dir, 'test-substitution.xml')) 00787 # for now this is mostly a trip wire test due to #1776 00788 for p in mock.params: 00789 self.assert_('$' not in p.key) 00790 self.assert_('$' not in p.value) 00791 for n in mock.nodes: 00792 self.assert_('$' not in n.package) 00793 self.assert_('$' not in n.type) 00794 for e in n.env_args: 00795 self.assert_('$' not in e[0]) 00796 self.assert_('$' not in e[1]) 00797 for r in n.remap_args: 00798 self.assert_('$' not in r[0]) 00799 self.assert_('$' not in r[1]) 00800 for a in n.args: 00801 self.assert_('$' not in a) 00802 00803 def test_node_invalid(self): 00804 tests = ['test-node-invalid-type.xml','test-node-invalid-type-2.xml', 00805 'test-node-invalid-pkg.xml','test-node-invalid-pkg-2.xml', 00806 # 1 and 2 have been disabled for now until we re-remove ability to have unnamed nodes with params 00807 'test-node-invalid-name-1.xml', 00808 'test-node-invalid-name-2.xml', 00809 'test-node-invalid-name-3.xml', 00810 'test-node-invalid-machine.xml', 00811 'test-node-invalid-respawn.xml', 00812 'test-node-invalid-respawn-required.xml', 00813 'test-node-invalid-required-1.xml', 00814 'test-node-invalid-required-2.xml', 00815 'test-node-invalid-ns.xml','test-node-invalid-ns-2.xml', 00816 'test-node-invalid-env-name.xml','test-node-invalid-env-name-2.xml', 00817 'test-node-invalid-env-value.xml', 00818 'test-node-invalid-output.xml', 00819 'test-node-invalid-cwd.xml', 00820 'test-node-invalid-exception.xml', 00821 00822 # rostest <test> tests 00823 'test-test-invalid-reqd-1.xml', 00824 'test-test-invalid-reqd-2.xml', 00825 'test-test-invalid-respawn.xml', 00826 'test-test-invalid-output.xml', 00827 'test-test-invalid-time-limit-1.xml', 00828 'test-test-invalid-time-limit-2.xml', 00829 'test-test-invalid-retry.xml', 00830 ] 00831 loader = roslaunch.xmlloader.XmlLoader() 00832 for filename in tests: 00833 filename = os.path.join(self.xml_dir, filename) 00834 try: 00835 self.assert_(os.path.exists(filename)) 00836 loader.load(filename, RosLaunchMock()) 00837 self.fail("xmlloader did not throw an xmlparseexception for [%s]"%filename) 00838 except roslaunch.xmlloader.XmlParseException, e: 00839 pass 00840 00841 def test_remap_invalid(self): 00842 tests = ['test-remap-invalid-1.xml', 00843 'test-remap-invalid-2.xml', 00844 'test-remap-invalid-3.xml', 00845 'test-remap-invalid-4.xml', 00846 'test-remap-invalid-name-from.xml', 00847 'test-remap-invalid-name-to.xml', 00848 ] 00849 loader = roslaunch.xmlloader.XmlLoader() 00850 for filename in tests: 00851 filename = os.path.join(self.xml_dir, filename) 00852 try: 00853 self.assert_(os.path.exists(filename)) 00854 loader.load(filename, RosLaunchMock()) 00855 self.fail("xmlloader did not throw an xmlparseexception for [%s]"%filename) 00856 except roslaunch.xmlloader.XmlParseException, e: 00857 pass 00858 00859 def test_if_unless(self): 00860 mock = RosLaunchMock() 00861 loader = roslaunch.xmlloader.XmlLoader() 00862 filename = os.path.join(self.xml_dir, 'test-if-unless.xml') 00863 loader.load(filename, mock, argv=[]) 00864 00865 param_d = {} 00866 for p in mock.params: 00867 param_d[p.key] = p.value 00868 00869 keys = ['group_if', 'group_unless', 'param_if', 'param_unless'] 00870 for k in keys: 00871 self.assert_('/'+k+'_pass' in param_d, param_d) 00872 self.failIf('/'+k+'_fail' in param_d, k) 00873 00874 n = mock.nodes[0] 00875 for k in ['if', 'unless']: 00876 self.assert_(['from_%s_pass'%k, 'to_%s_pass'%k] in n.remap_args) 00877 self.failIf(['from_%s_fail'%k, 'to_%s_fail'%k] in n.remap_args) 00878 00879 def test_if_unless_invalid(self): 00880 mock = RosLaunchMock() 00881 loader = roslaunch.xmlloader.XmlLoader() 00882 filename = os.path.join(self.xml_dir, 'test-if-unless-invalid-both.xml') 00883 # this should raise, not sure XmlParseException is what we want as it destroys semantic info 00884 try: 00885 loader.load(filename, mock, argv=[]) 00886 self.fail("should have raised with invalid if and unless spec") 00887 except roslaunch.xmlloader.XmlParseException, e: 00888 self.assert_('unless' in str(e)) 00889 self.assert_('if' in str(e)) 00890 00891 def test_arg_invalid(self): 00892 mock = RosLaunchMock() 00893 loader = roslaunch.xmlloader.XmlLoader() 00894 filename = os.path.join(self.xml_dir, 'test-arg.xml') 00895 # this should raise, not sure XmlParseException is what we want as it destroys semantic info 00896 try: 00897 loader.load(filename, mock, argv=[]) 00898 self.fail("should have raised with missing arg") 00899 except roslaunch.xmlloader.XmlParseException, e: 00900 self.assert_('required' in str(e)) 00901 00902 # test with invalid $(arg unknown) 00903 filename = os.path.join(self.xml_dir, 'test-arg-invalid-sub.xml') 00904 try: 00905 loader.load(filename, mock, argv=[]) 00906 self.fail("should have raised with unknown arg") 00907 except roslaunch.xmlloader.XmlParseException, e: 00908 self.assert_('missing' in str(e)) 00909 00910 # test with invalid $(arg unknown) 00911 filename = os.path.join(self.xml_dir, 'test-arg-invalid-redecl.xml') 00912 try: 00913 loader.load(filename, mock, argv=[]) 00914 self.fail("should have raised with multiple decl") 00915 except roslaunch.xmlloader.XmlParseException, e: 00916 self.assert_('grounded' in str(e)) 00917 00918 00919 def test_arg(self): 00920 loader = roslaunch.xmlloader.XmlLoader() 00921 filename = os.path.join(self.xml_dir, 'test-arg.xml') 00922 00923 mock = RosLaunchMock() 00924 loader.load(filename, mock, argv=["required:=test_arg", "if_test:=0"]) 00925 00926 param_d = {} 00927 for p in mock.params: 00928 param_d[p.key] = p.value 00929 00930 self.assertEquals(param_d['/p1_test'], 'test_arg') 00931 self.assertEquals(param_d['/p2_test'], 'not_set') 00932 self.assertEquals(param_d['/p3_test'], 'set') 00933 self.assertEquals(param_d['/succeed'], 'yes') 00934 self.assertEquals(param_d['/if_test'], 'not_ran') 00935 self.assertEquals(param_d['/if_param'], False) 00936 self.assertEquals(param_d['/int_param'], 1234) 00937 self.assertAlmostEquals(param_d['/float_param'], 3.) 00938 self.failIf('/fail' in param_d) 00939 00940 # context tests 00941 # - args are scoped to their context, and thus can be rebound in a sibling context 00942 self.assertEquals(param_d['/context1'], 'group1') 00943 self.assertEquals(param_d['/context2'], 'group2') 00944 00945 # include tests 00946 self.assertEquals(param_d['/include_test/p1_test'], 'required1') 00947 self.assertEquals(param_d['/include_test/p2_test'], 'not_set') 00948 self.assertEquals(param_d['/include_test/p3_test'], 'set') 00949 self.assertEquals(param_d['/include_test/p4_test'], 'initial') 00950 00951 self.assertEquals(param_d['/include2/include_test/p1_test'], 'required2') 00952 self.assertEquals(param_d['/include2/include_test/p2_test'], 'optional2') 00953 self.assertEquals(param_d['/include2/include_test/p3_test'], 'set') 00954 self.assertEquals(param_d['/include2/include_test/p4_test'], 'new2') 00955 00956 self.assert_('/include3/include_test/p1_test' not in param_d) 00957 self.assert_('/include3/include_test/p2_test' not in param_d) 00958 self.assert_('/include3/include_test/p3_test' not in param_d) 00959 self.assert_('/include3/include_test/p4_test' not in param_d) 00960 00961 # test again with optional value set 00962 mock = RosLaunchMock() 00963 loader.load(filename, mock, argv=["required:=test_arg", "optional:=test_arg2", "if_test:=1"]) 00964 00965 param_d = {} 00966 for p in mock.params: 00967 param_d[p.key] = p.value 00968 00969 self.assertEquals(param_d['/p1_test'], 'test_arg') 00970 self.assertEquals(param_d['/p2_test'], 'test_arg2') 00971 self.assertEquals(param_d['/p3_test'], 'set') 00972 self.assertEquals(param_d['/context1'], 'group1') 00973 self.assertEquals(param_d['/context2'], 'group2') 00974 self.assertEquals(param_d['/succeed'], 'yes') 00975 self.assertEquals(param_d['/if_test'], 'ran') 00976 self.assertEquals(param_d['/if_param'], True) 00977 self.failIf('/fail' in param_d) 00978 00979 # include tests 00980 self.assertEquals(param_d['/include_test/p1_test'], 'required1') 00981 self.assertEquals(param_d['/include_test/p2_test'], 'not_set') 00982 self.assertEquals(param_d['/include_test/p3_test'], 'set') 00983 self.assertEquals(param_d['/include_test/p4_test'], 'initial') 00984 00985 self.assertEquals(param_d['/include2/include_test/p1_test'], 'required2') 00986 self.assertEquals(param_d['/include2/include_test/p2_test'], 'optional2') 00987 self.assertEquals(param_d['/include2/include_test/p3_test'], 'set') 00988 self.assertEquals(param_d['/include2/include_test/p4_test'], 'new2') 00989 00990 self.assertEquals(param_d['/include3/include_test/p1_test'], 'required3') 00991 self.assertEquals(param_d['/include3/include_test/p2_test'], 'optional3') 00992 self.assertEquals(param_d['/include3/include_test/p3_test'], 'set') 00993 self.assertEquals(param_d['/include3/include_test/p4_test'], 'new3') 00994 00995 if __name__ == '__main__': 00996 rostest.unitrun('test_roslaunch', sys.argv[0], TestXmlLoader, coverage_packages=['roslaunch.xmlloader', 'roslaunch.loader']) 00997