00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 PKG = 'test_rosparam'
00037 NAME = 'test_rosparam_command_line_online'
00038 import roslib; roslib.load_manifest(PKG)
00039
00040 import os
00041 import signal
00042 import sys
00043 import time
00044 import unittest
00045
00046 import rospy
00047 import rostest
00048
00049 from subprocess import Popen, PIPE, check_call, call
00050
00051 from roslib.scriptutil import get_param_server, script_resolve_name
00052
00053 class TestRosparamOnline(unittest.TestCase):
00054
00055 def setUp(self):
00056 self.vals = set()
00057 self.msgs = {}
00058
00059 def callback(self, msg, val):
00060 self.vals.add(val)
00061 self.msgs[val] = msg
00062
00063 def test_rosparam(self):
00064 ps = get_param_server()
00065
00066
00067 cmd = 'rosparam'
00068 names = ['/chatter', 'foo/chatter']
00069
00070
00071 params = ['/string', '/int', '/float',
00072 '/g1/string', '/g1/int', '/g1/float',
00073 '/g2/string', '/g2/int', '/g2/float',
00074 ]
00075
00076 output = Popen([cmd, 'list'], stdout=PIPE).communicate()[0]
00077 l = set(output.split())
00078 for t in params:
00079 self.assert_(t in l)
00080
00081
00082
00083 output = Popen([cmd, 'get', "string"], stdout=PIPE).communicate()[0]
00084 self.assertEquals('foo-value', output.strip())
00085
00086 output = Popen([cmd, 'get', '-p', "string"], stdout=PIPE).communicate()[0]
00087 self.assertEquals('foo-value', output.strip())
00088 output = Popen([cmd, 'get', "/string"], stdout=PIPE).communicate()[0]
00089 self.assertEquals('foo-value', output.strip())
00090 output = Popen([cmd, 'get', "g1/string"], stdout=PIPE).communicate()[0]
00091 self.assertEquals('g1-foo-value', output.strip())
00092 output = Popen([cmd, 'get', "/g1/string"], stdout=PIPE).communicate()[0]
00093 self.assertEquals('g1-foo-value', output.strip())
00094 output = Popen([cmd, 'get', "/g2/string"], stdout=PIPE).communicate()[0]
00095 self.assertEquals('g2-foo-value', output.strip())
00096
00097 output = Popen([cmd, 'get', "int"], stdout=PIPE).communicate()[0]
00098 self.assertEquals('1', output.strip())
00099
00100 output = Popen([cmd, 'get', '-p', "int"], stdout=PIPE).communicate()[0]
00101 self.assertEquals('1', output.strip())
00102 output = Popen([cmd, 'get', "/int"], stdout=PIPE).communicate()[0]
00103 self.assertEquals('1', output.strip())
00104 output = Popen([cmd, 'get', "g1/int"], stdout=PIPE).communicate()[0]
00105 self.assertEquals('10', output.strip())
00106 output = Popen([cmd, 'get', "/g1/int"], stdout=PIPE).communicate()[0]
00107 self.assertEquals('10', output.strip())
00108 output = Popen([cmd, 'get', "/g2/int"], stdout=PIPE).communicate()[0]
00109 self.assertEquals('20', output.strip())
00110
00111 output = Popen([cmd, 'get', "float"], stdout=PIPE).communicate()[0]
00112 self.assertEquals('1.0', output.strip())
00113
00114 output = Popen([cmd, 'get', '-p', "float"], stdout=PIPE).communicate()[0]
00115 self.assertEquals('1.0', output.strip())
00116 output = Popen([cmd, 'get', "/float"], stdout=PIPE).communicate()[0]
00117 self.assertEquals('1.0', output.strip())
00118 output = Popen([cmd, 'get', "g1/float"], stdout=PIPE).communicate()[0]
00119 self.assertEquals('10.0', output.strip())
00120 output = Popen([cmd, 'get', "/g1/float"], stdout=PIPE).communicate()[0]
00121 self.assertEquals('10.0', output.strip())
00122 output = Popen([cmd, 'get', "/g2/float"], stdout=PIPE).communicate()[0]
00123 self.assertEquals('20.0', output.strip())
00124
00125 output = Popen([cmd, 'get', "g1"], stdout=PIPE).communicate()[0]
00126 import yaml
00127 d = yaml.load(output)
00128 self.assertEquals(d['float'], 10.0)
00129 self.assertEquals(d['int'], 10.0)
00130 self.assertEquals(d['string'], "g1-foo-value")
00131 self.assertEquals(set(['float', 'int', 'string']), set(d.keys()))
00132
00133
00134 check_call([cmd, 'get', '-p', "g1"])
00135
00136 check_call([cmd, 'get', '-pv', "g1"])
00137
00138
00139
00140 Popen([cmd, 'set', "/set/test1", "1"], stdout=PIPE).communicate()[0]
00141 self.assertEquals(1, ps.getParam('/', '/set/test1')[2])
00142
00143 Popen([cmd, 'set', '-v', "/set/test1", "1"], stdout=PIPE).communicate()[0]
00144 self.assertEquals(1, ps.getParam('/', '/set/test1')[2])
00145 Popen([cmd, 'set', "set/test1", "2"], stdout=PIPE).communicate()[0]
00146 self.assertEquals(2, ps.getParam('/', '/set/test1')[2])
00147
00148 Popen([cmd, 'set', "/set/test2", "1.0"], stdout=PIPE).communicate()[0]
00149 self.assertEquals(1, ps.getParam('/', '/set/test2')[2])
00150 Popen([cmd, 'set', "set/test2", "2.0"], stdout=PIPE).communicate()[0]
00151 self.assertEquals(2, ps.getParam('/', '/set/test2')[2])
00152
00153 Popen([cmd, 'set', "/set/testbool", "true"], stdout=PIPE).communicate()[0]
00154 self.assertEquals(True, ps.getParam('/', '/set/testbool')[2])
00155 Popen([cmd, 'set', "set/testbool", "false"], stdout=PIPE).communicate()[0]
00156 self.assertEquals(False, ps.getParam('/', '/set/testbool')[2])
00157
00158
00159 Popen([cmd, 'set', "/set/teststr", "hi"], stdout=PIPE).communicate()[0]
00160 self.assertEquals("hi", ps.getParam('/', '/set/teststr')[2])
00161 Popen([cmd, 'set', "set/teststr", "hello world"], stdout=PIPE).communicate()[0]
00162 self.assertEquals("hello world", ps.getParam('/', '/set/teststr')[2])
00163 Popen([cmd, 'set', "set/teststr", "'true'"], stdout=PIPE).communicate()[0]
00164 self.assertEquals("true", ps.getParam('/', '/set/teststr')[2])
00165
00166 Popen([cmd, 'set', "set/testlist", "[]"], stdout=PIPE).communicate()[0]
00167 self.assertEquals([], ps.getParam('/', '/set/testlist')[2])
00168 Popen([cmd, 'set', "/set/testlist", "[1, 2, 3]"], stdout=PIPE).communicate()[0]
00169 self.assertEquals([1, 2, 3], ps.getParam('/', '/set/testlist')[2])
00170
00171 Popen([cmd, 'set', "/set/testdict", "{a: b, c: d}"], stdout=PIPE).communicate()[0]
00172 self.assertEquals('b', ps.getParam('/', '/set/testdict/a')[2])
00173 self.assertEquals('d', ps.getParam('/', '/set/testdict/c')[2])
00174
00175 Popen([cmd, 'set', "set/testdict", "{}"], stdout=PIPE).communicate()[0]
00176 self.assertEquals('b', ps.getParam('/', '/set/testdict/a')[2])
00177 self.assertEquals('d', ps.getParam('/', '/set/testdict/c')[2])
00178
00179 Popen([cmd, 'set', "/set/testdict", "{e: f, g: h}"], stdout=PIPE).communicate()[0]
00180 self.assertEquals('b', ps.getParam('/', '/set/testdict/a')[2])
00181 self.assertEquals('d', ps.getParam('/', '/set/testdict/c')[2])
00182 self.assertEquals('f', ps.getParam('/', '/set/testdict/e')[2])
00183 self.assertEquals('h', ps.getParam('/', '/set/testdict/g')[2])
00184
00185 check_call([cmd, 'set', '-v', "/set/testdictverbose", "{e: f, g: h}"])
00186
00187
00188 ps.setParam('/', '/delete/me', True)
00189 self.assert_(ps.hasParam('/', '/delete/me')[2])
00190 Popen([cmd, 'delete', "/delete/me"], stdout=PIPE).communicate()[0]
00191 self.failIf(ps.hasParam('/', '/delete/me')[2])
00192
00193
00194
00195
00196 if __name__ == '__main__':
00197 rostest.run(PKG, NAME, TestRosparamOnline, sys.argv)