00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 import os
00035 import sys
00036 import time
00037 import unittest
00038
00039 import rostest
00040
00041 from subprocess import Popen, PIPE, check_call, call
00042
00043 from rosgraph.names import script_resolve_name
00044
00045 def get_param_server():
00046 import rosgraph
00047 return rosgraph.Master('/rosparam')
00048
00049 class TestRosparamOnline(unittest.TestCase):
00050
00051 def setUp(self):
00052 self.vals = set()
00053 self.msgs = {}
00054
00055 def callback(self, msg, val):
00056 self.vals.add(val)
00057 self.msgs[val] = msg
00058
00059 def test_rosparam(self):
00060 ps = get_param_server()
00061
00062
00063 cmd = 'rosparam'
00064 names = ['/chatter', 'foo/chatter']
00065
00066
00067 params = ['/string', '/int', '/float',
00068 '/g1/string', '/g1/int', '/g1/float',
00069 '/g2/string', '/g2/int', '/g2/float',
00070 ]
00071
00072 output = Popen([cmd, 'list'], stdout=PIPE).communicate()[0]
00073 l = set(output.split())
00074 for t in params:
00075 self.assert_(t in l)
00076
00077
00078
00079 output = Popen([cmd, 'get', "string"], stdout=PIPE).communicate()[0]
00080 self.assertEquals('foo-value', output.strip())
00081
00082 output = Popen([cmd, 'get', '-p', "string"], stdout=PIPE).communicate()[0]
00083 self.assertEquals('foo-value', output.strip())
00084 output = Popen([cmd, 'get', "/string"], stdout=PIPE).communicate()[0]
00085 self.assertEquals('foo-value', output.strip())
00086 output = Popen([cmd, 'get', "g1/string"], stdout=PIPE).communicate()[0]
00087 self.assertEquals('g1-foo-value', output.strip())
00088 output = Popen([cmd, 'get', "/g1/string"], stdout=PIPE).communicate()[0]
00089 self.assertEquals('g1-foo-value', output.strip())
00090 output = Popen([cmd, 'get', "/g2/string"], stdout=PIPE).communicate()[0]
00091 self.assertEquals('g2-foo-value', output.strip())
00092
00093 output = Popen([cmd, 'get', "int"], stdout=PIPE).communicate()[0]
00094 self.assertEquals('1', output.strip())
00095
00096 output = Popen([cmd, 'get', '-p', "int"], stdout=PIPE).communicate()[0]
00097 self.assertEquals('1', output.strip())
00098 output = Popen([cmd, 'get', "/int"], stdout=PIPE).communicate()[0]
00099 self.assertEquals('1', output.strip())
00100 output = Popen([cmd, 'get', "g1/int"], stdout=PIPE).communicate()[0]
00101 self.assertEquals('10', output.strip())
00102 output = Popen([cmd, 'get', "/g1/int"], stdout=PIPE).communicate()[0]
00103 self.assertEquals('10', output.strip())
00104 output = Popen([cmd, 'get', "/g2/int"], stdout=PIPE).communicate()[0]
00105 self.assertEquals('20', output.strip())
00106
00107 output = Popen([cmd, 'get', "float"], stdout=PIPE).communicate()[0]
00108 self.assertEquals('1.0', output.strip())
00109
00110 output = Popen([cmd, 'get', '-p', "float"], stdout=PIPE).communicate()[0]
00111 self.assertEquals('1.0', output.strip())
00112 output = Popen([cmd, 'get', "/float"], stdout=PIPE).communicate()[0]
00113 self.assertEquals('1.0', output.strip())
00114 output = Popen([cmd, 'get', "g1/float"], stdout=PIPE).communicate()[0]
00115 self.assertEquals('10.0', output.strip())
00116 output = Popen([cmd, 'get', "/g1/float"], stdout=PIPE).communicate()[0]
00117 self.assertEquals('10.0', output.strip())
00118 output = Popen([cmd, 'get', "/g2/float"], stdout=PIPE).communicate()[0]
00119 self.assertEquals('20.0', output.strip())
00120
00121 output = Popen([cmd, 'get', "g1"], stdout=PIPE).communicate()[0]
00122 import yaml
00123 d = yaml.load(output)
00124 self.assertEquals(d['float'], 10.0)
00125 self.assertEquals(d['int'], 10.0)
00126 self.assertEquals(d['string'], "g1-foo-value")
00127 self.assertEquals(set(['float', 'int', 'string']), set(d.keys()))
00128
00129
00130 check_call([cmd, 'get', '-p', "g1"])
00131
00132 check_call([cmd, 'get', '-pv', "g1"])
00133
00134
00135
00136 Popen([cmd, 'set', "/set/test1", "1"], stdout=PIPE).communicate()[0]
00137 self.assertEquals(1, ps.getParam('/set/test1'))
00138
00139 Popen([cmd, 'set', '-v', "/set/test1", "1"], stdout=PIPE).communicate()[0]
00140 self.assertEquals(1, ps.getParam('/set/test1'))
00141 Popen([cmd, 'set', "set/test1", "2"], stdout=PIPE).communicate()[0]
00142 self.assertEquals(2, ps.getParam('/set/test1'))
00143
00144 Popen([cmd, 'set', "/set/test2", "1.0"], stdout=PIPE).communicate()[0]
00145 self.assertEquals(1, ps.getParam('/set/test2'))
00146 Popen([cmd, 'set', "set/test2", "2.0"], stdout=PIPE).communicate()[0]
00147 self.assertEquals(2, ps.getParam('/set/test2'))
00148
00149 Popen([cmd, 'set', "/set/testbool", "true"], stdout=PIPE).communicate()[0]
00150 self.assertEquals(True, ps.getParam('/set/testbool'))
00151 Popen([cmd, 'set', "set/testbool", "false"], stdout=PIPE).communicate()[0]
00152 self.assertEquals(False, ps.getParam('/set/testbool'))
00153
00154
00155 Popen([cmd, 'set', "/set/teststr", "hi"], stdout=PIPE).communicate()[0]
00156 self.assertEquals("hi", ps.getParam('/set/teststr'))
00157 Popen([cmd, 'set', "set/teststr", "hello world"], stdout=PIPE).communicate()[0]
00158 self.assertEquals("hello world", ps.getParam('/set/teststr'))
00159 Popen([cmd, 'set', "set/teststr", "'true'"], stdout=PIPE).communicate()[0]
00160 self.assertEquals("true", ps.getParam('/set/teststr'))
00161
00162 Popen([cmd, 'set', "set/testlist", "[]"], stdout=PIPE).communicate()[0]
00163 self.assertEquals([], ps.getParam('/set/testlist'))
00164 Popen([cmd, 'set', "/set/testlist", "[1, 2, 3]"], stdout=PIPE).communicate()[0]
00165 self.assertEquals([1, 2, 3], ps.getParam('/set/testlist'))
00166
00167 Popen([cmd, 'set', "/set/testdict", "{a: b, c: d}"], stdout=PIPE).communicate()[0]
00168 self.assertEquals('b', ps.getParam('/set/testdict/a'))
00169 self.assertEquals('d', ps.getParam('/set/testdict/c'))
00170
00171 Popen([cmd, 'set', "set/testdict", "{}"], stdout=PIPE).communicate()[0]
00172 self.assertEquals('b', ps.getParam('/set/testdict/a'))
00173 self.assertEquals('d', ps.getParam('/set/testdict/c'))
00174
00175 Popen([cmd, 'set', "/set/testdict", "{e: f, g: h}"], stdout=PIPE).communicate()[0]
00176 self.assertEquals('b', ps.getParam('/set/testdict/a'))
00177 self.assertEquals('d', ps.getParam('/set/testdict/c'))
00178 self.assertEquals('f', ps.getParam('/set/testdict/e'))
00179 self.assertEquals('h', ps.getParam('/set/testdict/g'))
00180
00181 check_call([cmd, 'set', '-v', "/set/testdictverbose", "{e: f, g: h}"])
00182
00183
00184 ps.setParam('/delete/me', True)
00185 self.assert_(ps.hasParam('/delete/me'))
00186 Popen([cmd, 'delete', "/delete/me"], stdout=PIPE).communicate()[0]
00187 self.failIf(ps.hasParam('/delete/me'))
00188
00189
00190
00191
00192 PKG = 'test_rosparam'
00193 NAME = 'test_rosparam_command_line_online'
00194 if __name__ == '__main__':
00195 rostest.run(PKG, NAME, TestRosparamOnline, sys.argv)