00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 import os
00035 import sys
00036 import unittest
00037 import rostest
00038
00039 import cStringIO
00040 from subprocess import Popen, PIPE, check_call, call
00041
00042 import rosgraph
00043 import rosparam
00044
00045 def get_param_server():
00046 return rosgraph.Master('/test_rosparam')
00047
00048 from contextlib import contextmanager
00049 @contextmanager
00050 def fakestdout():
00051 realstdout = sys.stdout
00052 fakestdout = cStringIO.StringIO()
00053 sys.stdout = fakestdout
00054 yield fakestdout
00055 sys.stdout = realstdout
00056
00057 def tolist(b):
00058 return [x.strip() for x in b.getvalue().split('\n') if x.strip()]
00059
00060 def get_test_path():
00061 return os.path.abspath(os.path.join(os.path.dirname(__file__)))
00062
00063 class TestRosparam(unittest.TestCase):
00064
00065 def setUp(self):
00066 pass
00067
00068 def _check(self, expected, actual):
00069 """
00070 Make sure all elements of expected are present in actual
00071 """
00072 for t in expected:
00073 self.assert_(t in actual)
00074 def _notcheck(self, not_expected, actual):
00075 """
00076 Make sure all elements of not_expected are not present in actual
00077 """
00078 for t in not_expected:
00079 self.failIf(t in actual)
00080
00081 def test_rosparam_list(self):
00082 cmd = 'rosparam'
00083
00084 params = ['/string', '/int', '/float',
00085 '/g1/string', '/g1/int', '/g1/float',
00086 '/g2/string', '/g2/int', '/g2/float',
00087 ]
00088 l = rosparam.list_params('')
00089 for t in params:
00090 self.assert_(t in l)
00091
00092 with fakestdout() as b:
00093 rosparam.yamlmain([cmd, 'list'])
00094 self._check(params, tolist(b))
00095 with fakestdout() as b:
00096 rosparam.yamlmain([cmd, 'list', '/'])
00097 self._check(params, tolist(b))
00098
00099
00100 g1p = [p for p in params if p.startswith('/g1/')]
00101 not_g1p = [p for p in params if not p.startswith('/g1/')]
00102 with fakestdout() as b:
00103 rosparam.yamlmain([cmd, 'list', '/g1'])
00104 self._check(g1p, tolist(b))
00105 self._notcheck(not_g1p, tolist(b))
00106 with fakestdout() as b:
00107 rosparam.yamlmain([cmd, 'list', '/g1/'])
00108 self._check(g1p, tolist(b))
00109 self._notcheck(not_g1p, tolist(b))
00110
00111 with fakestdout() as b:
00112 rosparam.yamlmain([cmd, 'list', '/not/a/namespace/'])
00113 self.assertEquals([], tolist(b))
00114
00115 def test_rosparam_load(self):
00116 f = os.path.join(get_test_path(), 'test.yaml')
00117 f_ns = os.path.join(get_test_path(), 'test_ns.yaml')
00118
00119 cmd = 'rosparam'
00120 try:
00121 rosparam.yamlmain([cmd, 'load'])
00122 self.fail("command-line arg should have failed")
00123 except SystemExit, e:
00124 self.assertNotEquals(0, e.code)
00125 try:
00126 rosparam.yamlmain([cmd, 'load', 'fake-file.yaml'])
00127 self.fail("command-line arg should have failed")
00128 except SystemExit, e:
00129 self.assertNotEquals(0, e.code)
00130
00131 ps = get_param_server()
00132
00133
00134 rosparam.yamlmain([cmd, 'load', f])
00135 self.assertEquals('bar', ps.getParam('/foo'))
00136
00137 self.assertEquals('foo-value', ps.getParam('/string'))
00138
00139 rosparam.yamlmain([cmd, 'load', '-v', f])
00140 self.assertEquals('bar', ps.getParam('/foo'))
00141
00142
00143 rosparam.yamlmain([cmd, 'load', f, '/rosparam_load/test'])
00144 self.assertEquals('bar', ps.getParam('/rosparam_load/test/foo'))
00145 rosparam.yamlmain([cmd, 'load', '-v', f, '/rosparam_load/test'])
00146 self.assertEquals('bar', ps.getParam('/rosparam_load/test/foo'))
00147
00148
00149
00150 rosparam.yamlmain([cmd, 'load', f_ns])
00151 self.assertEquals('baz', ps.getParam('/a/b/foo'))
00152 self.assertEquals('bar', ps.getParam('/foo'))
00153 rosparam.yamlmain([cmd, 'load', '-v', f_ns])
00154 self.assertEquals('baz', ps.getParam('/a/b/foo'))
00155
00156
00157 rosparam.yamlmain([cmd, 'load', f_ns, '/rosparam_load/test2'])
00158 self.assertEquals('baz', ps.getParam('/rosparam_load/test2/a/b/foo'))
00159 rosparam.yamlmain([cmd, 'load', '-v', f_ns, '/rosparam_load/test2'])
00160 self.assertEquals('baz', ps.getParam('/rosparam_load/test2/a/b/foo'))
00161
00162 def test_rosparam_get(self):
00163 import rosparam
00164 cmd = 'rosparam'
00165 try:
00166 rosparam.yamlmain([cmd, 'get'])
00167 self.fail("command-line arg should have failed")
00168 except SystemExit as e:
00169 self.assertNotEquals(0, e.code)
00170
00171 with fakestdout() as b:
00172 rosparam.yamlmain([cmd, 'get', "string"])
00173 self.assertEquals('foo-value', b.getvalue().strip())
00174 with fakestdout() as b:
00175 rosparam.yamlmain([cmd, 'get', '-p', "string"])
00176 self.assertEquals('foo-value', b.getvalue().strip())
00177 with fakestdout() as b:
00178 rosparam.yamlmain([cmd, 'get', "/string"])
00179 self.assertEquals('foo-value', b.getvalue().strip())
00180 with fakestdout() as b:
00181 rosparam.yamlmain([cmd, 'get', "/g1/string"])
00182 self.assertEquals('g1-foo-value', b.getvalue().strip())
00183 with fakestdout() as b:
00184 rosparam.yamlmain([cmd, 'get', "g1/string"])
00185 self.assertEquals('g1-foo-value', b.getvalue().strip())
00186 with fakestdout() as b:
00187 rosparam.yamlmain([cmd, 'get', "int"])
00188 self.assertEquals('1', b.getvalue().strip())
00189 with fakestdout() as b:
00190 rosparam.yamlmain([cmd, 'get', "/int"])
00191 self.assertEquals('1', b.getvalue().strip())
00192 with fakestdout() as b:
00193 rosparam.yamlmain([cmd, 'get', '-p', "int"])
00194 self.assertEquals('1', b.getvalue().strip())
00195 with fakestdout() as b:
00196 rosparam.yamlmain([cmd, 'get', "/g1/int"])
00197 self.assertEquals('10', b.getvalue().strip())
00198 with fakestdout() as b:
00199 rosparam.yamlmain([cmd, 'get', "g1/int"])
00200 self.assertEquals('10', b.getvalue().strip())
00201 with fakestdout() as b:
00202 rosparam.yamlmain([cmd, 'get', "float"])
00203 self.assertEquals('1.0', b.getvalue().strip())
00204 with fakestdout() as b:
00205 rosparam.yamlmain([cmd, 'get', '-p', "float"])
00206 self.assertEquals('1.0', b.getvalue().strip())
00207 with fakestdout() as b:
00208 rosparam.yamlmain([cmd, 'get', '-p', "g1/float"])
00209 self.assertEquals('10.0', b.getvalue().strip())
00210 with fakestdout() as b:
00211 rosparam.yamlmain([cmd, 'get', "g1"])
00212 import yaml
00213 d = yaml.load(b.getvalue())
00214 self.assertEquals(d['float'], 10.0)
00215 self.assertEquals(d['int'], 10.0)
00216 self.assertEquals(d['string'], "g1-foo-value")
00217 self.assertEquals(set(['float', 'int', 'string']), set(d.keys()))
00218 with fakestdout() as b:
00219 rosparam.yamlmain([cmd, 'get', '-p', "g1"])
00220 with fakestdout() as b:
00221 rosparam.yamlmain([cmd, 'get', '-pv', "g1"])
00222
00223 def test_rosparam_set(self):
00224 import rosparam
00225 cmd = 'rosparam'
00226
00227 ps = get_param_server()
00228 with fakestdout() as b:
00229 rosparam.yamlmain([cmd, 'set', "/rosparam_set/test1", "1"])
00230 self.assertEquals(1, ps.getParam('/rosparam_set/test1'))
00231 with fakestdout() as b:
00232
00233 rosparam.yamlmain([cmd, 'set', '-v', "/rosparam_set/test1", "1"])
00234 self.assertEquals(1, ps.getParam('/rosparam_set/test1'))
00235 with fakestdout() as b:
00236 rosparam.yamlmain([cmd, 'set', "rosparam_set/test1", "2"])
00237 self.assertEquals(2, ps.getParam('/rosparam_set/test1'))
00238
00239 with fakestdout() as b:
00240
00241 rosparam.yamlmain([cmd, 'set', "/rosparam_set/test2", "1.0"])
00242 self.assertEquals(1., ps.getParam('/rosparam_set/test2'))
00243 with fakestdout() as b:
00244
00245 rosparam.yamlmain([cmd, 'set', "/rosparam_set/test2", "2.0"])
00246 self.assertEquals(2., ps.getParam('/rosparam_set/test2'))
00247 with fakestdout() as b:
00248
00249 rosparam.yamlmain([cmd, 'set', "/rosparam_set/testbool", "true"])
00250 self.assertEquals(True, ps.getParam('/rosparam_set/testbool'))
00251 with fakestdout() as b:
00252
00253 rosparam.yamlmain([cmd, 'set', "/rosparam_set/teststr", "hi"])
00254 self.assertEquals("hi", ps.getParam('/rosparam_set/teststr'))
00255 with fakestdout() as b:
00256
00257 rosparam.yamlmain([cmd, 'set', "/rosparam_set/testlist", "[1, 2, 3]"])
00258 self.assertEquals([1, 2, 3], ps.getParam('/rosparam_set/testlist'))
00259 with fakestdout() as b:
00260
00261 rosparam.yamlmain([cmd, 'set', "/rosparam_set/testdict", "{a: b, c: d}"])
00262 self.assertEquals('b', ps.getParam('/rosparam_set/testdict/a'))
00263 self.assertEquals('d', ps.getParam('/rosparam_set/testdict/c'))
00264 with fakestdout() as b:
00265
00266 rosparam.yamlmain([cmd, 'set', "set/testdict", "{}"])
00267 self.assertEquals('b', ps.getParam('/rosparam_set/testdict/a'))
00268 self.assertEquals('d', ps.getParam('/rosparam_set/testdict/c'))
00269 with fakestdout() as b:
00270
00271 rosparam.yamlmain([cmd, 'set', "/rosparam_set/testdict", "{e: f, g: h}"])
00272 self.assertEquals('b', ps.getParam('/rosparam_set/testdict/a'))
00273 self.assertEquals('d', ps.getParam('/rosparam_set/testdict/c'))
00274 self.assertEquals('f', ps.getParam('/rosparam_set/testdict/e'))
00275 self.assertEquals('h', ps.getParam('/rosparam_set/testdict/g'))
00276 with fakestdout() as b:
00277
00278 rosparam.yamlmain([cmd, 'set', '-v', "/rosparam_set/testdictverbose", "{e: f, g: h}"])
00279 self.assertEquals('f', ps.getParam('/rosparam_set/testdictverbose/e'))
00280 self.assertEquals('h', ps.getParam('/rosparam_set/testdictverbose/g'))
00281
00282 def test_rosparam_delete(self):
00283 import rosparam
00284 cmd = 'rosparam'
00285 ps = get_param_server()
00286
00287 try:
00288 rosparam.yamlmain([cmd, 'delete'])
00289 except SystemExit, e:
00290 self.assertNotEquals(0, e.code)
00291 try:
00292 rosparam.yamlmain([cmd, 'delete', 'one', 'two'])
00293 except SystemExit, e:
00294 self.assertNotEquals(0, e.code)
00295
00296
00297 ps.setParam('/delete/me', True)
00298 self.assert_(ps.hasParam('/delete/me'))
00299 rosparam.yamlmain([cmd, 'delete', "/delete/me"])
00300 self.failIf(ps.hasParam('/delete/me'))
00301
00302 ps.setParam('/delete/me2', True)
00303 self.assert_(ps.hasParam('/delete/me2'))
00304 rosparam.yamlmain([cmd, 'delete', '-v', "/delete/me2"])
00305 self.failIf(ps.hasParam('/delete/me2'))
00306
00307 def test_rosparam_dump(self):
00308 import rosparam
00309 f = os.path.join(get_test_path(), 'test.yaml')
00310 f_out = os.path.join(get_test_path(), 'test_dump.yaml')
00311
00312 cmd = 'rosparam'
00313 ps = get_param_server()
00314
00315 try:
00316 rosparam.yamlmain([cmd, 'dump'])
00317 except SystemExit, e:
00318 self.assertNotEquals(0, e.code)
00319 try:
00320 rosparam.yamlmain([cmd, 'dump', f_out, 'rosparam_dump', 'rosparam_dump2'])
00321 except SystemExit, e:
00322 self.assertNotEquals(0, e.code)
00323
00324 rosparam.yamlmain([cmd, 'load', f, 'rosparam_dump'])
00325 self.assertEquals('bar', ps.getParam('rosparam_dump/foo'))
00326
00327 rosparam.yamlmain([cmd, 'dump', f_out, 'rosparam_dump'])
00328
00329 import yaml
00330 with open(f_out) as b:
00331 with open(f) as b2:
00332 self.assertEquals(yaml.load(b.read()), yaml.load(b2.read()))
00333
00334 rosparam.yamlmain([cmd, 'dump', '-v', f_out, 'rosparam_dump'])
00335 with open(f_out) as b:
00336 with open(f) as b2:
00337 self.assertEquals(yaml.load(b.read()), yaml.load(b2.read()))
00338
00339 def test_fullusage(self):
00340 import rosparam
00341 try:
00342 rosparam._fullusage()
00343 except SystemExit: pass
00344 try:
00345 rosparam.yamlmain(['rosparam'])
00346 except SystemExit: pass
00347 try:
00348 rosparam.yamlmain(['rosparam', 'invalid'])
00349 except SystemExit: pass
00350
00351 PKG = 'test_rosparam'
00352 NAME = 'test_rosparam_command_line_online'
00353 if __name__ == '__main__':
00354 rostest.unitrun(PKG, NAME, TestRosparam, sys.argv, coverage_packages=['rosparam'])