00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 import os
00035 import sys
00036 import unittest
00037 import rostest
00038
00039 try:
00040 from cStringIO import StringIO
00041 except ImportError:
00042 from io import StringIO
00043 from subprocess import Popen, PIPE, check_call, call
00044
00045 import rosgraph
00046 import rosparam
00047
00048 def get_param_server():
00049 return rosgraph.Master('/test_rosparam')
00050
00051 from contextlib import contextmanager
00052 @contextmanager
00053 def fakestdout():
00054 realstdout = sys.stdout
00055 fakestdout = StringIO()
00056 sys.stdout = fakestdout
00057 yield fakestdout
00058 sys.stdout = realstdout
00059
00060 @contextmanager
00061 def fakestdin(input_str):
00062 realstdin = sys.stdin
00063 fakestdin = StringIO(input_str)
00064 sys.stdin = fakestdin
00065 yield fakestdin
00066 sys.stdin = realstdin
00067
00068 def tolist(b):
00069 return [x.strip() for x in b.getvalue().split('\n') if x.strip()]
00070
00071 def get_test_path():
00072 return os.path.abspath(os.path.join(os.path.dirname(__file__)))
00073
00074 class TestRosparam(unittest.TestCase):
00075
00076 def setUp(self):
00077 pass
00078
00079 def _check(self, expected, actual):
00080 """
00081 Make sure all elements of expected are present in actual
00082 """
00083 for t in expected:
00084 self.assert_(t in actual)
00085 def _notcheck(self, not_expected, actual):
00086 """
00087 Make sure all elements of not_expected are not present in actual
00088 """
00089 for t in not_expected:
00090 self.failIf(t in actual)
00091
00092 def test_rosparam_list(self):
00093 cmd = 'rosparam'
00094
00095 params = ['/string', '/int', '/float',
00096 '/g1/string', '/g1/int', '/g1/float',
00097 '/g2/string', '/g2/int', '/g2/float',
00098 ]
00099 l = rosparam.list_params('')
00100 for t in params:
00101 self.assert_(t in l)
00102
00103 with fakestdout() as b:
00104 rosparam.yamlmain([cmd, 'list'])
00105 self._check(params, tolist(b))
00106 with fakestdout() as b:
00107 rosparam.yamlmain([cmd, 'list', '/'])
00108 self._check(params, tolist(b))
00109
00110
00111 g1p = [p for p in params if p.startswith('/g1/')]
00112 not_g1p = [p for p in params if not p.startswith('/g1/')]
00113 with fakestdout() as b:
00114 rosparam.yamlmain([cmd, 'list', '/g1'])
00115 self._check(g1p, tolist(b))
00116 self._notcheck(not_g1p, tolist(b))
00117 with fakestdout() as b:
00118 rosparam.yamlmain([cmd, 'list', '/g1/'])
00119 self._check(g1p, tolist(b))
00120 self._notcheck(not_g1p, tolist(b))
00121
00122 with fakestdout() as b:
00123 rosparam.yamlmain([cmd, 'list', '/not/a/namespace/'])
00124 self.assertEquals([], tolist(b))
00125
00126 def test_rosparam_load(self):
00127 f = os.path.join(get_test_path(), 'test.yaml')
00128 f_ns = os.path.join(get_test_path(), 'test_ns.yaml')
00129
00130 cmd = 'rosparam'
00131 try:
00132 rosparam.yamlmain([cmd, 'load'])
00133 self.fail("command-line arg should have failed")
00134 except SystemExit as e:
00135 self.assertNotEquals(0, e.code)
00136 try:
00137 rosparam.yamlmain([cmd, 'load', 'fake-file.yaml'])
00138 self.fail("command-line arg should have failed")
00139 except SystemExit as e:
00140 self.assertNotEquals(0, e.code)
00141
00142 ps = get_param_server()
00143
00144
00145 rosparam.yamlmain([cmd, 'load', f])
00146 self.assertEquals('bar', ps.getParam('/foo'))
00147
00148 self.assertEquals('foo-value', ps.getParam('/string'))
00149
00150 rosparam.yamlmain([cmd, 'load', '-v', f])
00151 self.assertEquals('bar', ps.getParam('/foo'))
00152
00153
00154 with fakestdin('stdin_string: stdin_foo\nstdin_string2: stdin_bar'):
00155 rosparam.yamlmain([cmd, 'load', '-'])
00156 self.assertEquals('stdin_foo', ps.getParam('/stdin_string'))
00157 self.assertEquals('stdin_bar', ps.getParam('/stdin_string2'))
00158
00159
00160 rosparam.yamlmain([cmd, 'load', f, '/rosparam_load/test'])
00161 self.assertEquals('bar', ps.getParam('/rosparam_load/test/foo'))
00162 rosparam.yamlmain([cmd, 'load', '-v', f, '/rosparam_load/test'])
00163 self.assertEquals('bar', ps.getParam('/rosparam_load/test/foo'))
00164
00165
00166
00167 rosparam.yamlmain([cmd, 'load', f_ns])
00168 self.assertEquals('baz', ps.getParam('/a/b/foo'))
00169 self.assertEquals('bar', ps.getParam('/foo'))
00170 rosparam.yamlmain([cmd, 'load', '-v', f_ns])
00171 self.assertEquals('baz', ps.getParam('/a/b/foo'))
00172
00173
00174 rosparam.yamlmain([cmd, 'load', f_ns, '/rosparam_load/test2'])
00175 self.assertEquals('baz', ps.getParam('/rosparam_load/test2/a/b/foo'))
00176 rosparam.yamlmain([cmd, 'load', '-v', f_ns, '/rosparam_load/test2'])
00177 self.assertEquals('baz', ps.getParam('/rosparam_load/test2/a/b/foo'))
00178
00179 def test_rosparam_get(self):
00180 import rosparam
00181 cmd = 'rosparam'
00182 try:
00183 rosparam.yamlmain([cmd, 'get'])
00184 self.fail("command-line arg should have failed")
00185 except SystemExit as e:
00186 self.assertNotEquals(0, e.code)
00187
00188 with fakestdout() as b:
00189 rosparam.yamlmain([cmd, 'get', "string"])
00190 self.assertEquals('foo-value', b.getvalue().strip())
00191 with fakestdout() as b:
00192 rosparam.yamlmain([cmd, 'get', '-p', "string"])
00193 self.assertEquals('foo-value', b.getvalue().strip())
00194 with fakestdout() as b:
00195 rosparam.yamlmain([cmd, 'get', "/string"])
00196 self.assertEquals('foo-value', b.getvalue().strip())
00197 with fakestdout() as b:
00198 rosparam.yamlmain([cmd, 'get', "/g1/string"])
00199 self.assertEquals('g1-foo-value', b.getvalue().strip())
00200 with fakestdout() as b:
00201 rosparam.yamlmain([cmd, 'get', "g1/string"])
00202 self.assertEquals('g1-foo-value', b.getvalue().strip())
00203 with fakestdout() as b:
00204 rosparam.yamlmain([cmd, 'get', "int"])
00205 self.assertEquals('1', b.getvalue().strip())
00206 with fakestdout() as b:
00207 rosparam.yamlmain([cmd, 'get', "/int"])
00208 self.assertEquals('1', b.getvalue().strip())
00209 with fakestdout() as b:
00210 rosparam.yamlmain([cmd, 'get', '-p', "int"])
00211 self.assertEquals('1', b.getvalue().strip())
00212 with fakestdout() as b:
00213 rosparam.yamlmain([cmd, 'get', "/g1/int"])
00214 self.assertEquals('10', b.getvalue().strip())
00215 with fakestdout() as b:
00216 rosparam.yamlmain([cmd, 'get', "g1/int"])
00217 self.assertEquals('10', b.getvalue().strip())
00218 with fakestdout() as b:
00219 rosparam.yamlmain([cmd, 'get', "float"])
00220 self.assertEquals('1.0', b.getvalue().strip())
00221 with fakestdout() as b:
00222 rosparam.yamlmain([cmd, 'get', '-p', "float"])
00223 self.assertEquals('1.0', b.getvalue().strip())
00224 with fakestdout() as b:
00225 rosparam.yamlmain([cmd, 'get', '-p', "g1/float"])
00226 self.assertEquals('10.0', b.getvalue().strip())
00227 with fakestdout() as b:
00228 rosparam.yamlmain([cmd, 'get', "g1"])
00229 import yaml
00230 d = yaml.load(b.getvalue())
00231 self.assertEquals(d['float'], 10.0)
00232 self.assertEquals(d['int'], 10.0)
00233 self.assertEquals(d['string'], "g1-foo-value")
00234 self.assertEquals(set(['float', 'int', 'string']), set(d.keys()))
00235 with fakestdout() as b:
00236 rosparam.yamlmain([cmd, 'get', '-p', "g1"])
00237 with fakestdout() as b:
00238 rosparam.yamlmain([cmd, 'get', '-pv', "g1"])
00239
00240 def test_rosparam_set(self):
00241 import rosparam
00242 cmd = 'rosparam'
00243
00244 ps = get_param_server()
00245 with fakestdout() as b:
00246 rosparam.yamlmain([cmd, 'set', "/rosparam_set/test1", "1"])
00247 self.assertEquals(1, ps.getParam('/rosparam_set/test1'))
00248 with fakestdout() as b:
00249
00250 rosparam.yamlmain([cmd, 'set', '-v', "/rosparam_set/test1", "1"])
00251 self.assertEquals(1, ps.getParam('/rosparam_set/test1'))
00252 with fakestdout() as b:
00253 rosparam.yamlmain([cmd, 'set', "rosparam_set/test1", "2"])
00254 self.assertEquals(2, ps.getParam('/rosparam_set/test1'))
00255
00256 with fakestdout() as b:
00257
00258 rosparam.yamlmain([cmd, 'set', "/rosparam_set/test2", "1.0"])
00259 self.assertEquals(1., ps.getParam('/rosparam_set/test2'))
00260 with fakestdout() as b:
00261
00262 rosparam.yamlmain([cmd, 'set', "/rosparam_set/test2", "2.0"])
00263 self.assertEquals(2., ps.getParam('/rosparam_set/test2'))
00264 with fakestdout() as b:
00265
00266 rosparam.yamlmain([cmd, 'set', "/rosparam_set/testbool", "true"])
00267 self.assertEquals(True, ps.getParam('/rosparam_set/testbool'))
00268 with fakestdout() as b:
00269
00270 rosparam.yamlmain([cmd, 'set', "/rosparam_set/teststr", "hi"])
00271 self.assertEquals("hi", ps.getParam('/rosparam_set/teststr'))
00272 with fakestdout() as b:
00273
00274 rosparam.yamlmain([cmd, 'set', "/rosparam_set/testlist", "[1, 2, 3]"])
00275 self.assertEquals([1, 2, 3], ps.getParam('/rosparam_set/testlist'))
00276 with fakestdout() as b:
00277
00278 rosparam.yamlmain([cmd, 'set', "/rosparam_set/testdict", "{a: b, c: d}"])
00279 self.assertEquals('b', ps.getParam('/rosparam_set/testdict/a'))
00280 self.assertEquals('d', ps.getParam('/rosparam_set/testdict/c'))
00281 with fakestdout() as b:
00282
00283 rosparam.yamlmain([cmd, 'set', "set/testdict", "{}"])
00284 self.assertEquals('b', ps.getParam('/rosparam_set/testdict/a'))
00285 self.assertEquals('d', ps.getParam('/rosparam_set/testdict/c'))
00286 with fakestdout() as b:
00287
00288 rosparam.yamlmain([cmd, 'set', "/rosparam_set/testdict", "{e: f, g: h}"])
00289 self.assertEquals('b', ps.getParam('/rosparam_set/testdict/a'))
00290 self.assertEquals('d', ps.getParam('/rosparam_set/testdict/c'))
00291 self.assertEquals('f', ps.getParam('/rosparam_set/testdict/e'))
00292 self.assertEquals('h', ps.getParam('/rosparam_set/testdict/g'))
00293 with fakestdout() as b:
00294
00295 rosparam.yamlmain([cmd, 'set', '-v', "/rosparam_set/testdictverbose", "{e: f, g: h}"])
00296 self.assertEquals('f', ps.getParam('/rosparam_set/testdictverbose/e'))
00297 self.assertEquals('h', ps.getParam('/rosparam_set/testdictverbose/g'))
00298
00299 def test_rosparam_delete(self):
00300 import rosparam
00301 cmd = 'rosparam'
00302 ps = get_param_server()
00303
00304 try:
00305 rosparam.yamlmain([cmd, 'delete'])
00306 except SystemExit as e:
00307 self.assertNotEquals(0, e.code)
00308 try:
00309 rosparam.yamlmain([cmd, 'delete', 'one', 'two'])
00310 except SystemExit as e:
00311 self.assertNotEquals(0, e.code)
00312
00313
00314 ps.setParam('/delete/me', True)
00315 self.assert_(ps.hasParam('/delete/me'))
00316 rosparam.yamlmain([cmd, 'delete', "/delete/me"])
00317 self.failIf(ps.hasParam('/delete/me'))
00318
00319 ps.setParam('/delete/me2', True)
00320 self.assert_(ps.hasParam('/delete/me2'))
00321 rosparam.yamlmain([cmd, 'delete', '-v', "/delete/me2"])
00322 self.failIf(ps.hasParam('/delete/me2'))
00323
00324 def test_rosparam_dump(self):
00325 import rosparam
00326 f = os.path.join(get_test_path(), 'test.yaml')
00327 f_out = os.path.join(get_test_path(), 'test_dump.yaml')
00328
00329 cmd = 'rosparam'
00330 ps = get_param_server()
00331
00332 try:
00333 rosparam.yamlmain([cmd, 'dump'])
00334 except SystemExit as e:
00335 self.assertNotEquals(0, e.code)
00336 try:
00337 rosparam.yamlmain([cmd, 'dump', f_out, 'rosparam_dump', 'rosparam_dump2'])
00338 except SystemExit as e:
00339 self.assertNotEquals(0, e.code)
00340
00341 rosparam.yamlmain([cmd, 'load', f, 'rosparam_dump'])
00342 self.assertEquals('bar', ps.getParam('rosparam_dump/foo'))
00343
00344 rosparam.yamlmain([cmd, 'dump', f_out, 'rosparam_dump'])
00345
00346 import yaml
00347 with open(f_out) as b:
00348 with open(f) as b2:
00349 self.assertEquals(yaml.load(b.read()), yaml.load(b2.read()))
00350
00351 rosparam.yamlmain([cmd, 'dump', '-v', f_out, 'rosparam_dump'])
00352 with open(f_out) as b:
00353 with open(f) as b2:
00354 self.assertEquals(yaml.load(b.read()), yaml.load(b2.read()))
00355
00356
00357 with fakestdout() as b:
00358 rosparam.yamlmain([cmd, 'dump'])
00359 with open(f) as b2:
00360 self.assertEquals(yaml.load(b.getvalue())['rosparam_dump'], yaml.load(b2.read()))
00361
00362 def test_fullusage(self):
00363 import rosparam
00364 try:
00365 rosparam._fullusage()
00366 except SystemExit: pass
00367 try:
00368 rosparam.yamlmain(['rosparam'])
00369 except SystemExit: pass
00370 try:
00371 rosparam.yamlmain(['rosparam', 'invalid'])
00372 except SystemExit: pass
00373
00374 PKG = 'test_rosparam'
00375 NAME = 'test_rosparam_command_line_online'
00376 if __name__ == '__main__':
00377 rostest.unitrun(PKG, NAME, TestRosparam, sys.argv, coverage_packages=['rosparam'])