00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 from __future__ import with_statement
00037
00038 PKG = 'test_rosparam'
00039 NAME = 'test_rosparam_command_line_online'
00040 import roslib; roslib.load_manifest(PKG)
00041
00042 import os
00043 import signal
00044 import sys
00045 import time
00046 import unittest
00047
00048 import rostest
00049
00050 import cStringIO
00051 from subprocess import Popen, PIPE, check_call, call
00052
00053 from roslib.scriptutil import get_param_server, script_resolve_name
00054
00055 from contextlib import contextmanager
00056 @contextmanager
00057 def fakestdout():
00058 realstdout = sys.stdout
00059 fakestdout = cStringIO.StringIO()
00060 sys.stdout = fakestdout
00061 yield fakestdout
00062 sys.stdout = realstdout
00063
00064 def tolist(b):
00065 return [x.strip() for x in b.getvalue().split('\n') if x.strip()]
00066
00067 class TestRosparam(unittest.TestCase):
00068
00069 def setUp(self):
00070 pass
00071
00072 def _check(self, expected, actual):
00073 """
00074 Make sure all elements of expected are present in actual
00075 """
00076 for t in expected:
00077 self.assert_(t in actual)
00078 def _notcheck(self, not_expected, actual):
00079 """
00080 Make sure all elements of not_expected are not present in actual
00081 """
00082 for t in not_expected:
00083 self.failIf(t in actual)
00084
00085 def test_rosparam_list(self):
00086 from ros import rosparam
00087 cmd = 'rosparam'
00088
00089 params = ['/string', '/int', '/float',
00090 '/g1/string', '/g1/int', '/g1/float',
00091 '/g2/string', '/g2/int', '/g2/float',
00092 ]
00093 l = rosparam.list_params('')
00094 for t in params:
00095 self.assert_(t in l)
00096
00097 with fakestdout() as b:
00098 rosparam.yamlmain([cmd, 'list'])
00099 self._check(params, tolist(b))
00100 with fakestdout() as b:
00101 rosparam.yamlmain([cmd, 'list', '/'])
00102 self._check(params, tolist(b))
00103
00104
00105 g1p = [p for p in params if p.startswith('/g1/')]
00106 not_g1p = [p for p in params if not p.startswith('/g1/')]
00107 with fakestdout() as b:
00108 rosparam.yamlmain([cmd, 'list', '/g1'])
00109 self._check(g1p, tolist(b))
00110 self._notcheck(not_g1p, tolist(b))
00111 with fakestdout() as b:
00112 rosparam.yamlmain([cmd, 'list', '/g1/'])
00113 self._check(g1p, tolist(b))
00114 self._notcheck(not_g1p, tolist(b))
00115
00116 with fakestdout() as b:
00117 rosparam.yamlmain([cmd, 'list', '/not/a/namespace/'])
00118 self.assertEquals([], tolist(b))
00119
00120 def test_rosparam_load(self):
00121 from ros import rosparam
00122 import roslib.packages
00123 f = os.path.join(roslib.packages.get_pkg_dir('test_rosparam'), 'test', 'test.yaml')
00124 f_ns = os.path.join(roslib.packages.get_pkg_dir('test_rosparam'), 'test', 'test_ns.yaml')
00125
00126 cmd = 'rosparam'
00127 try:
00128 rosparam.yamlmain([cmd, 'load'])
00129 self.fail("command-line arg should have failed")
00130 except SystemExit, e:
00131 self.assertNotEquals(0, e.code)
00132 try:
00133 rosparam.yamlmain([cmd, 'load', 'fake-file.yaml'])
00134 self.fail("command-line arg should have failed")
00135 except SystemExit, e:
00136 self.assertNotEquals(0, e.code)
00137
00138 ps = get_param_server()
00139
00140
00141 rosparam.yamlmain([cmd, 'load', f])
00142 self.assertEquals('bar', ps.getParam('/', '/foo')[2])
00143
00144 self.assertEquals('foo-value', ps.getParam('/', '/string')[2])
00145
00146 rosparam.yamlmain([cmd, 'load', '-v', f])
00147 self.assertEquals('bar', ps.getParam('/', '/foo')[2])
00148
00149
00150 rosparam.yamlmain([cmd, 'load', f, '/rosparam_load/test'])
00151 self.assertEquals('bar', ps.getParam('/', '/rosparam_load/test/foo')[2])
00152 rosparam.yamlmain([cmd, 'load', '-v', f, '/rosparam_load/test'])
00153 self.assertEquals('bar', ps.getParam('/', '/rosparam_load/test/foo')[2])
00154
00155
00156
00157 rosparam.yamlmain([cmd, 'load', f_ns])
00158 self.assertEquals('baz', ps.getParam('/', '/a/b/foo')[2])
00159 self.assertEquals('bar', ps.getParam('/', '/foo')[2])
00160 rosparam.yamlmain([cmd, 'load', '-v', f_ns])
00161 self.assertEquals('baz', ps.getParam('/', '/a/b/foo')[2])
00162
00163
00164 rosparam.yamlmain([cmd, 'load', f_ns, '/rosparam_load/test2'])
00165 self.assertEquals('baz', ps.getParam('/', '/rosparam_load/test2/a/b/foo')[2])
00166 rosparam.yamlmain([cmd, 'load', '-v', f_ns, '/rosparam_load/test2'])
00167 self.assertEquals('baz', ps.getParam('/', '/rosparam_load/test2/a/b/foo')[2])
00168
00169 def test_rosparam_get(self):
00170 from ros import rosparam
00171 cmd = 'rosparam'
00172 try:
00173 rosparam.yamlmain([cmd, 'get'])
00174 self.fail("command-line arg should have failed")
00175 except SystemExit, e:
00176 self.assertNotEquals(0, e.code)
00177
00178 with fakestdout() as b:
00179 rosparam.yamlmain([cmd, 'get', "string"])
00180 self.assertEquals('foo-value', b.getvalue().strip())
00181 with fakestdout() as b:
00182 rosparam.yamlmain([cmd, 'get', '-p', "string"])
00183 self.assertEquals('foo-value', b.getvalue().strip())
00184 with fakestdout() as b:
00185 rosparam.yamlmain([cmd, 'get', "/string"])
00186 self.assertEquals('foo-value', b.getvalue().strip())
00187 with fakestdout() as b:
00188 rosparam.yamlmain([cmd, 'get', "/g1/string"])
00189 self.assertEquals('g1-foo-value', b.getvalue().strip())
00190 with fakestdout() as b:
00191 rosparam.yamlmain([cmd, 'get', "g1/string"])
00192 self.assertEquals('g1-foo-value', b.getvalue().strip())
00193 with fakestdout() as b:
00194 rosparam.yamlmain([cmd, 'get', "int"])
00195 self.assertEquals('1', b.getvalue().strip())
00196 with fakestdout() as b:
00197 rosparam.yamlmain([cmd, 'get', "/int"])
00198 self.assertEquals('1', b.getvalue().strip())
00199 with fakestdout() as b:
00200 rosparam.yamlmain([cmd, 'get', '-p', "int"])
00201 self.assertEquals('1', b.getvalue().strip())
00202 with fakestdout() as b:
00203 rosparam.yamlmain([cmd, 'get', "/g1/int"])
00204 self.assertEquals('10', b.getvalue().strip())
00205 with fakestdout() as b:
00206 rosparam.yamlmain([cmd, 'get', "g1/int"])
00207 self.assertEquals('10', b.getvalue().strip())
00208 with fakestdout() as b:
00209 rosparam.yamlmain([cmd, 'get', "float"])
00210 self.assertEquals('1.0', b.getvalue().strip())
00211 with fakestdout() as b:
00212 rosparam.yamlmain([cmd, 'get', '-p', "float"])
00213 self.assertEquals('1.0', b.getvalue().strip())
00214 with fakestdout() as b:
00215 rosparam.yamlmain([cmd, 'get', '-p', "g1/float"])
00216 self.assertEquals('10.0', b.getvalue().strip())
00217 with fakestdout() as b:
00218 rosparam.yamlmain([cmd, 'get', "g1"])
00219 import yaml
00220 d = yaml.load(b.getvalue())
00221 self.assertEquals(d['float'], 10.0)
00222 self.assertEquals(d['int'], 10.0)
00223 self.assertEquals(d['string'], "g1-foo-value")
00224 self.assertEquals(set(['float', 'int', 'string']), set(d.keys()))
00225 with fakestdout() as b:
00226 rosparam.yamlmain([cmd, 'get', '-p', "g1"])
00227 with fakestdout() as b:
00228 rosparam.yamlmain([cmd, 'get', '-pv', "g1"])
00229
00230 def test_rosparam_set(self):
00231 from ros import rosparam
00232 cmd = 'rosparam'
00233
00234 ps = get_param_server()
00235 with fakestdout() as b:
00236 rosparam.yamlmain([cmd, 'set', "/rosparam_set/test1", "1"])
00237 self.assertEquals(1, ps.getParam('/', '/rosparam_set/test1')[2])
00238 with fakestdout() as b:
00239
00240 rosparam.yamlmain([cmd, 'set', '-v', "/rosparam_set/test1", "1"])
00241 self.assertEquals(1, ps.getParam('/', '/rosparam_set/test1')[2])
00242 with fakestdout() as b:
00243 rosparam.yamlmain([cmd, 'set', "rosparam_set/test1", "2"])
00244 self.assertEquals(2, ps.getParam('/', '/rosparam_set/test1')[2])
00245
00246 with fakestdout() as b:
00247
00248 rosparam.yamlmain([cmd, 'set', "/rosparam_set/test2", "1.0"])
00249 self.assertEquals(1., ps.getParam('/', '/rosparam_set/test2')[2])
00250 with fakestdout() as b:
00251
00252 rosparam.yamlmain([cmd, 'set', "/rosparam_set/test2", "2.0"])
00253 self.assertEquals(2., ps.getParam('/', '/rosparam_set/test2')[2])
00254 with fakestdout() as b:
00255
00256 rosparam.yamlmain([cmd, 'set', "/rosparam_set/testbool", "true"])
00257 self.assertEquals(True, ps.getParam('/', '/rosparam_set/testbool')[2])
00258 with fakestdout() as b:
00259
00260 rosparam.yamlmain([cmd, 'set', "/rosparam_set/teststr", "hi"])
00261 self.assertEquals("hi", ps.getParam('/', '/rosparam_set/teststr')[2])
00262 with fakestdout() as b:
00263
00264 rosparam.yamlmain([cmd, 'set', "/rosparam_set/testlist", "[1, 2, 3]"])
00265 self.assertEquals([1, 2, 3], ps.getParam('/', '/rosparam_set/testlist')[2])
00266 with fakestdout() as b:
00267
00268 rosparam.yamlmain([cmd, 'set', "/rosparam_set/testdict", "{a: b, c: d}"])
00269 self.assertEquals('b', ps.getParam('/', '/rosparam_set/testdict/a')[2])
00270 self.assertEquals('d', ps.getParam('/', '/rosparam_set/testdict/c')[2])
00271 with fakestdout() as b:
00272
00273 rosparam.yamlmain([cmd, 'set', "set/testdict", "{}"])
00274 self.assertEquals('b', ps.getParam('/', '/rosparam_set/testdict/a')[2])
00275 self.assertEquals('d', ps.getParam('/', '/rosparam_set/testdict/c')[2])
00276 with fakestdout() as b:
00277
00278 rosparam.yamlmain([cmd, 'set', "/rosparam_set/testdict", "{e: f, g: h}"])
00279 self.assertEquals('b', ps.getParam('/', '/rosparam_set/testdict/a')[2])
00280 self.assertEquals('d', ps.getParam('/', '/rosparam_set/testdict/c')[2])
00281 self.assertEquals('f', ps.getParam('/', '/rosparam_set/testdict/e')[2])
00282 self.assertEquals('h', ps.getParam('/', '/rosparam_set/testdict/g')[2])
00283 with fakestdout() as b:
00284
00285 rosparam.yamlmain([cmd, 'set', '-v', "/rosparam_set/testdictverbose", "{e: f, g: h}"])
00286 self.assertEquals('f', ps.getParam('/', '/rosparam_set/testdictverbose/e')[2])
00287 self.assertEquals('h', ps.getParam('/', '/rosparam_set/testdictverbose/g')[2])
00288
00289 def test_rosparam_delete(self):
00290 from ros import rosparam
00291 cmd = 'rosparam'
00292 ps = get_param_server()
00293
00294 try:
00295 rosparam.yamlmain([cmd, 'delete'])
00296 except SystemExit, e:
00297 self.assertNotEquals(0, e.code)
00298 try:
00299 rosparam.yamlmain([cmd, 'delete', 'one', 'two'])
00300 except SystemExit, e:
00301 self.assertNotEquals(0, e.code)
00302
00303
00304 ps.setParam('/', '/delete/me', True)
00305 self.assert_(ps.hasParam('/', '/delete/me')[2])
00306 rosparam.yamlmain([cmd, 'delete', "/delete/me"])
00307 self.failIf(ps.hasParam('/', '/delete/me')[2])
00308
00309 ps.setParam('/', '/delete/me2', True)
00310 self.assert_(ps.hasParam('/', '/delete/me2')[2])
00311 rosparam.yamlmain([cmd, 'delete', '-v', "/delete/me2"])
00312 self.failIf(ps.hasParam('/', '/delete/me2')[2])
00313
00314 def test_rosparam_dump(self):
00315 from ros import rosparam
00316 import roslib.packages
00317 f = os.path.join(roslib.packages.get_pkg_dir('test_rosparam'), 'test', 'test.yaml')
00318 f_out = os.path.join(roslib.packages.get_pkg_dir('test_rosparam'), 'test', 'test_dump.yaml')
00319
00320 cmd = 'rosparam'
00321 ps = get_param_server()
00322
00323 try:
00324 rosparam.yamlmain([cmd, 'dump'])
00325 except SystemExit, e:
00326 self.assertNotEquals(0, e.code)
00327 try:
00328 rosparam.yamlmain([cmd, 'dump', f_out, 'rosparam_dump', 'rosparam_dump2'])
00329 except SystemExit, e:
00330 self.assertNotEquals(0, e.code)
00331
00332 rosparam.yamlmain([cmd, 'load', f, 'rosparam_dump'])
00333 self.assertEquals('bar', ps.getParam('/', 'rosparam_dump/foo')[2])
00334
00335 rosparam.yamlmain([cmd, 'dump', f_out, 'rosparam_dump'])
00336
00337 import yaml
00338 with open(f_out) as b:
00339 with open(f) as b2:
00340 self.assertEquals(yaml.load(b.read()), yaml.load(b2.read()))
00341
00342 rosparam.yamlmain([cmd, 'dump', '-v', f_out, 'rosparam_dump'])
00343 with open(f_out) as b:
00344 with open(f) as b2:
00345 self.assertEquals(yaml.load(b.read()), yaml.load(b2.read()))
00346
00347 def test_fullusage(self):
00348 from ros import rosparam
00349 try:
00350 rosparam._fullusage()
00351 except SystemExit: pass
00352 try:
00353 rosparam.yamlmain(['rosparam'])
00354 except SystemExit: pass
00355 try:
00356 rosparam.yamlmain(['rosparam', 'invalid'])
00357 except SystemExit: pass
00358
00359
00360 if __name__ == '__main__':
00361 rostest.unitrun(PKG, NAME, TestRosparam, sys.argv, coverage_packages=['rosparam'])