00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 import roslib; roslib.load_manifest('test_rosmaster')
00037
00038 import os
00039 import sys
00040 import struct
00041 import unittest
00042 import time
00043 import random
00044 import datetime
00045
00046 import rostest
00047
00048 from roslib.names import make_global_ns, ns_join
00049
00050
00051 class ThreadPoolMock(object):
00052 def queue_task(*args): pass
00053
00054
00055 class TestRospyParamServer(unittest.TestCase):
00056
00057 def test_compute_param_updates(self):
00058 from rosmaster.paramserver import compute_param_updates
00059
00060 tests = [
00061
00062 ([],({}, '/foo', 1)),
00063 ([],({'/bar': 'barapi'}, '/foo/', 1)),
00064 ([],({'/bar/': 'barapi'}, '/foo/', 1)),
00065
00066
00067 ([('fooapi', '/foo/', 1)], ({'/foo/': 'fooapi'}, '/foo', 1)),
00068 ([('fooapi', '/foo/', 1)], ({'/foo/': 'fooapi'}, '/foo/', 1)),
00069
00070
00071 ([('fooapi', '/foo/val/', 1)], ({'/foo/': 'fooapi'}, '/foo/val', 1)),
00072
00073
00074 ([],({'/bar/': 'barapi'}, '/foo/', {'bar': 2})),
00075 ([('fooapi', '/foo/val/', 1)], ({'/foo/val/': 'fooapi'}, '/foo', {'val' : 1})),
00076
00077 ([('fooapi', '/foo/bar/val/', 1)], ({'/foo/bar/val/': 'fooapi'}, '/foo', {'bar' : {'val' : 1}})),
00078 ([('fooapi', '/foo/bar/', {'val': 1})], ({'/foo/bar/': 'fooapi'}, '/foo', {'bar' : {'val' : 1}})),
00079 ([('fooapi', '/foo/', {'bar':{'val': 1}})], ({'/foo/': 'fooapi'}, '/foo', {'bar' : {'val' : 1}})),
00080
00081 ([('fooapi', '/foo/', {'bar': 1, 'baz': 2}), ('foobazapi', '/foo/baz/', 2)],
00082 ({'/foo/': 'fooapi', '/foo/baz/': 'foobazapi'}, '/foo', {'bar' : 1, 'baz': 2})),
00083
00084 ([('foobarapi', '/foo/bar/', 1), ('foobazapi', '/foo/baz/', 2)],
00085 ({'/foo/bar/': 'foobarapi', '/foo/baz/': 'foobazapi'}, '/foo', {'bar' : 1, 'baz': 2})),
00086
00087
00088 ([('delapi', '/del/bar/', {})],
00089 ({'/del/bar/': 'delapi'}, '/del', {})),
00090
00091 ]
00092 for correct, args in tests:
00093 val = compute_param_updates(*args)
00094 self.assertEquals(len(correct), len(val), "Failed: \n%s \nreturned \n%s\nvs correct\n%s"%(str(args), str(val), str(correct)))
00095 for c in correct:
00096 self.assert_(c in val, "Failed: \n%s \ndid not include \n%s. \nIt returned \n%s"%(str(args), c, val))
00097
00098
00099 def notify_task(self, updates):
00100 self.last_update = updates
00101
00102 def test_subscribe_param_simple(self):
00103 from rosmaster.registrations import RegistrationManager
00104 from rosmaster.paramserver import ParamDictionary
00105
00106
00107 reg_manager = RegistrationManager(ThreadPoolMock())
00108 param_server = ParamDictionary(reg_manager)
00109
00110
00111 self.last_update = None
00112 self.assertEquals({}, param_server.subscribe_param('/foo', ('node1', 'http://node1:1')))
00113 param_server.set_param('/foo', 1, notify_task=self.notify_task)
00114 self.assertEquals([([('node1', 'http://node1:1')], '/foo/', 1), ], self.last_update)
00115
00116
00117 self.assertEquals(1, param_server.subscribe_param('/foo', ('node1', 'http://node1:1')))
00118 param_server.set_param('/foo', 2, notify_task=self.notify_task)
00119 self.assertEquals([([('node1', 'http://node1:1')], '/foo/', 2), ], self.last_update)
00120
00121
00122 self.assertEquals(2, param_server.subscribe_param('/foo/', ('node1', 'http://node1:1')))
00123 param_server.set_param('/foo', 'resub2', notify_task=self.notify_task)
00124 self.assertEquals([([('node1', 'http://node1:1')], '/foo/', 'resub2'), ], self.last_update)
00125
00126
00127 self.assertEquals('resub2', param_server.subscribe_param('/foo', ('node1', 'http://node1b:1')))
00128 self.assertEquals('http://node1b:1', reg_manager.get_node('node1').api)
00129 param_server.set_param('/foo', 3, notify_task=self.notify_task)
00130 self.assertEquals([([('node1', 'http://node1b:1')], '/foo/', 3), ], self.last_update)
00131
00132
00133 self.assertEquals(3, param_server.subscribe_param('/foo', ('node2', 'http://node2:2')))
00134 self.assertEquals('http://node2:2', reg_manager.get_node('node2').api)
00135 param_server.set_param('/foo', 4, notify_task=self.notify_task)
00136 self.assertEquals([([('node1', 'http://node1b:1'), ('node2', 'http://node2:2')], '/foo/', 4), ], self.last_update)
00137
00138 def test_subscribe_param_tree(self):
00139 from rosmaster.registrations import RegistrationManager
00140 from rosmaster.paramserver import ParamDictionary
00141
00142
00143 reg_manager = RegistrationManager(ThreadPoolMock())
00144 param_server = ParamDictionary(reg_manager)
00145
00146
00147
00148
00149 gains = {'p': 'P', 'i': 'I', 'd' : 'D'}
00150 self.assertEquals({}, param_server.subscribe_param('/gains', ('ptnode', 'http://ptnode:1')))
00151 param_server.set_param('/gains', gains.copy(), notify_task=self.notify_task)
00152 self.assertEquals([([('ptnode', 'http://ptnode:1')], '/gains/', gains), ], self.last_update)
00153
00154 param_server.set_param('/gains/', gains.copy(), notify_task=self.notify_task)
00155 self.assertEquals([([('ptnode', 'http://ptnode:1')], '/gains/', gains), ], self.last_update)
00156
00157
00158 param_server.set_param('/gains/p', 'P2', notify_task=self.notify_task)
00159 self.assertEquals([([('ptnode', 'http://ptnode:1')], '/gains/p/', 'P2'), ], self.last_update)
00160 param_server.set_param('/gains/i', 'I2', notify_task=self.notify_task)
00161 self.assertEquals([([('ptnode', 'http://ptnode:1')], '/gains/i/', 'I2'), ], self.last_update)
00162
00163
00164 self.assertEquals('P2', param_server.subscribe_param('/gains/p', ('ptnode2', 'http://ptnode2:2')))
00165 param_server.set_param('/gains', gains.copy(), notify_task=self.notify_task)
00166 self.assertEquals([([('ptnode', 'http://ptnode:1')], '/gains/', gains), \
00167 ([('ptnode2', 'http://ptnode2:2')], '/gains/p/', 'P'), \
00168 ], self.last_update)
00169
00170 self.last_update = None
00171 self.assertEquals('P', param_server.subscribe_param('/gains/p/', ('ptnode2', 'http://ptnode2:2')))
00172 param_server.set_param('/gains', gains.copy(), notify_task=self.notify_task)
00173 self.assertEquals([([('ptnode', 'http://ptnode:1')], '/gains/', gains), \
00174 ([('ptnode2', 'http://ptnode2:2')], '/gains/p/', 'P'), \
00175 ], self.last_update)
00176
00177 param_server.set_param('/gains/p', 'P3', notify_task=self.notify_task)
00178
00179 self.assertEquals([([('ptnode2', 'http://ptnode2:2')], '/gains/p/', 'P3'), \
00180 ([('ptnode', 'http://ptnode:1')], '/gains/p/', 'P3'), \
00181 ], self.last_update)
00182
00183
00184 self.last_update = None
00185 param_server.set_param('/gains2', gains.copy(), notify_task=self.notify_task)
00186 self.assertEquals('P', param_server.subscribe_param('/gains2/p/', ('ptnode3', 'http://ptnode3:3')))
00187
00188 param_server.set_param('/gains2', {}, notify_task=self.notify_task)
00189 self.assertEquals([([('ptnode3', 'http://ptnode3:3')], '/gains2/p/', {}), ], self.last_update)
00190
00191
00192 self.last_update = None
00193 param_server.delete_param('/gains')
00194 param_server.delete_param('/gains2')
00195 self.assertEquals({}, param_server.get_param('/'))
00196 self.assertEquals({}, param_server.subscribe_param('/', ('allnode', 'http://allnode:1')))
00197 param_server.set_param('/one', 1, notify_task=self.notify_task)
00198 self.assertEquals([([('allnode', 'http://allnode:1')], '/one/', 1), ], self.last_update)
00199 param_server.set_param('/two', 2, notify_task=self.notify_task)
00200 self.assertEquals([([('allnode', 'http://allnode:1')], '/two/', 2), ], self.last_update)
00201 param_server.set_param('/foo/bar', 'bar', notify_task=self.notify_task)
00202 self.assertEquals([([('allnode', 'http://allnode:1')], '/foo/bar/', 'bar'), ], self.last_update)
00203
00204
00205
00206 def test_subscribe_param_deletion(self):
00207 from rosmaster.registrations import RegistrationManager
00208 from rosmaster.paramserver import ParamDictionary
00209
00210
00211 reg_manager = RegistrationManager(ThreadPoolMock())
00212 param_server = ParamDictionary(reg_manager)
00213
00214
00215 self.assertEquals({}, param_server.subscribe_param('/foo', ('node1', 'http://node1:1')))
00216 param_server.set_param('/foo', 1, notify_task=self.notify_task)
00217 param_server.delete_param('/foo', notify_task=self.notify_task)
00218 self.assertEquals([([('node1', 'http://node1:1')], '/foo/', {}), ], self.last_update)
00219
00220
00221 gains = {'p': 'P', 'i': 'I', 'd' : 'D'}
00222 self.assertEquals({}, param_server.subscribe_param('/gains', ('deltree', 'http://deltree:1')))
00223 param_server.set_param('/gains', gains.copy(), notify_task=self.notify_task)
00224 param_server.delete_param('/gains', notify_task=self.notify_task)
00225 self.assertEquals([([('deltree', 'http://deltree:1')], '/gains/', {}), ], self.last_update)
00226
00227
00228 self.assertEquals({}, param_server.subscribe_param('/gains2', ('deltree2', 'http://deltree2:2')))
00229 param_server.set_param('/gains2', gains.copy(), notify_task=self.notify_task)
00230 param_server.delete_param('/gains2/p', notify_task=self.notify_task)
00231 self.assertEquals([([('deltree2', 'http://deltree2:2')], '/gains2/p/', {}), ], self.last_update)
00232 param_server.delete_param('/gains2/i', notify_task=self.notify_task)
00233 self.assertEquals([([('deltree2', 'http://deltree2:2')], '/gains2/i/', {}), ], self.last_update)
00234 param_server.delete_param('/gains2', notify_task=self.notify_task)
00235 self.assertEquals([([('deltree2', 'http://deltree2:2')], '/gains2/', {}), ], self.last_update)
00236
00237
00238 k = '/ns1/ns2/ns3/key'
00239 self.assertEquals({}, param_server.subscribe_param(k, ('del_parent', 'http://del_parent:1')))
00240 param_server.set_param(k, 1, notify_task=self.notify_task)
00241 param_server.delete_param('/ns1/ns2', notify_task=self.notify_task)
00242 self.assertEquals([([('del_parent', 'http://del_parent:1')], '/ns1/ns2/ns3/key/', {}), ], self.last_update)
00243
00244 def test_unsubscribe_param(self):
00245 from rosmaster.registrations import RegistrationManager
00246 from rosmaster.paramserver import ParamDictionary
00247
00248
00249 reg_manager = RegistrationManager(ThreadPoolMock())
00250 param_server = ParamDictionary(reg_manager)
00251
00252
00253 self.last_update = None
00254 self.assertEquals({}, param_server.subscribe_param('/foo', ('node1', 'http://node1:1')))
00255 param_server.set_param('/foo', 1, notify_task=self.notify_task)
00256 self.assertEquals([([('node1', 'http://node1:1')], '/foo/', 1), ], self.last_update)
00257
00258 code, msg, val = param_server.unsubscribe_param('/foo', ('node1', 'http://node1:1'))
00259 self.assertEquals(1, code)
00260 self.assertEquals(1, val)
00261 self.last_update = None
00262 param_server.set_param('/foo', 2, notify_task=self.notify_task)
00263 self.assertEquals(None, self.last_update)
00264
00265 code, msg, val = param_server.unsubscribe_param('/foo', ('node1', 'http://node1:1'))
00266 self.assertEquals(1, code)
00267 self.assertEquals(0, val)
00268 self.last_update = None
00269 param_server.set_param('/foo', 2, notify_task=self.notify_task)
00270 self.assertEquals(None, self.last_update)
00271
00272
00273 self.last_update = None
00274 self.assertEquals({}, param_server.subscribe_param('/bar', ('barnode', 'http://barnode:1')))
00275 param_server.set_param('/bar', 3, notify_task=self.notify_task)
00276 self.assertEquals([([('barnode', 'http://barnode:1')], '/bar/', 3), ], self.last_update)
00277 code, msg, val = param_server.unsubscribe_param('/foo', ('barnode', 'http://notbarnode:1'))
00278 self.assertEquals(1, code)
00279 self.assertEquals(0, val)
00280 param_server.set_param('/bar', 4, notify_task=self.notify_task)
00281 self.assertEquals([([('barnode', 'http://barnode:1')], '/bar/', 4), ], self.last_update)
00282
00283
00284 def _set_param(self, ctx, my_state, test_vals, param_server):
00285 ctx = make_global_ns(ctx)
00286 for type, vals in test_vals:
00287 try:
00288 caller_id = ns_join(ctx, "node")
00289 count = 0
00290 for val in vals:
00291 key = ns_join(caller_id, "%s-%s"%(type,count))
00292 param_server.set_param(key, val)
00293 self.assert_(param_server.has_param(key))
00294 true_key = ns_join(ctx, key)
00295 my_state[true_key] = val
00296 count += 1
00297 except Exception, e:
00298 assert "getParam failed on type[%s], val[%s]"%(type,val)
00299
00300
00301 def _check_param_state(self, param_server, my_state):
00302 for (k, v) in my_state.iteritems():
00303 assert param_server.has_param(k)
00304
00305 try:
00306 v2 = param_server.get_param(k)
00307 except:
00308 raise Exception("Exception raised while calling param_server.get_param(%s): %s"%(k, traceback.format_exc()))
00309
00310 self.assertEquals(v, v2)
00311 param_names = my_state.keys()
00312 ps_param_names = param_server.get_param_names()
00313 assert not set(param_names) ^ set(ps_param_names), "parameter server keys do not match local: %s"%(set(param_names)^set(ps_param_names))
00314
00315
00316
00317 def test_has_param(self):
00318 from rosmaster.paramserver import ParamDictionary
00319 param_server = ParamDictionary(None)
00320
00321 self.failIf(param_server.has_param('/new_param'))
00322 param_server.set_param('/new_param', 1)
00323 self.assert_(param_server.has_param('/new_param'))
00324
00325
00326 self.failIf(param_server.has_param('/sub/sub2/new_param2'))
00327
00328 for k in ['/sub/sub2/', '/sub/sub2', '/sub/', '/sub']:
00329 self.failIf(param_server.has_param(k))
00330 param_server.set_param('/sub/sub2/new_param2', 1)
00331 self.assert_(param_server.has_param('/sub/sub2/new_param2'))
00332
00333 for k in ['/sub/sub2/', '/sub/sub2', '/sub/', '/sub']:
00334 self.assert_(param_server.has_param(k))
00335
00336
00337
00338
00339 def test_search_param(self):
00340 from rosmaster.paramserver import ParamDictionary
00341 param_server = ParamDictionary(None)
00342
00343 caller_id = '/node'
00344
00345
00346 val1 = { 'level1_p1': random.randint(0, 10000),
00347 'level1_p2' : { 'level2_p2': random.randint(0, 10000) }}
00348 val2 = { 'level1_p1': random.randint(0, 10000),
00349 'level1_p2' : { 'level2_p2': random.randint(0, 10000) }}
00350 val3 = { 'level1_p1': random.randint(0, 10000),
00351 'level1_p2' : { 'level2_p2': random.randint(0, 10000) }}
00352 val4 = { 'level1_p1': random.randint(0, 10000),
00353 'level1_p2' : { 'level2_p2': random.randint(0, 10000) }}
00354 full_dict = {}
00355
00356
00357 for k in ['', None, '~param']:
00358 try:
00359 param_server.search_param('/level1/level2', k)
00360 self.fail("param_server search should have failed on [%s]"%k)
00361 except ValueError: pass
00362 for ns in ['', None, 'relative', '~param']:
00363 try:
00364 param_server.search_param(ns, 'param')
00365 self.fail("param_server search should have failed on %s"%k)
00366 except ValueError: pass
00367
00368
00369
00370
00371 self.failIf(param_server.has_param('/level1/param'))
00372 self.failIf(param_server.search_param('/level1/node', 'param'))
00373 param_server.set_param('/level1/param', val1)
00374
00375
00376 for ns in ['/level1/node', '/level1/level2/node', '/level1/level2/level3/node']:
00377 self.assertEquals('/level1/param', param_server.search_param(ns, 'param'), "failed with ns[%s]"%ns)
00378 self.assertEquals('/level1/param/', param_server.search_param(ns, 'param/'))
00379 self.assertEquals('/level1/param/level1_p1', param_server.search_param(ns, 'param/level1_p1'))
00380 self.assertEquals('/level1/param/level1_p2/level2_p2', param_server.search_param(ns, 'param/level1_p2/level2_p2'))
00381 self.assertEquals(None, param_server.search_param('/root', 'param'))
00382 self.assertEquals(None, param_server.search_param('/root', 'param/'))
00383
00384
00385 self.failIf(param_server.has_param('/level1/level2/param'))
00386 param_server.set_param('/level1/level2/param', val2)
00387
00388
00389 for ns in ['/level1/level2/node', '/level1/level2/level3/node', '/level1/level2/level3/level4/node']:
00390 self.assertEquals('/level1/level2/param', param_server.search_param(ns, 'param'))
00391 self.assertEquals('/level1/level2/param/', param_server.search_param(ns, 'param/'))
00392 self.assertEquals('/level1/param', param_server.search_param('/level1/node', 'param'))
00393 self.assertEquals('/level1/param/', param_server.search_param('/level1/node', 'param/'))
00394 self.assertEquals(None, param_server.search_param('/root', 'param'))
00395
00396
00397 self.failIf(param_server.has_param('/level1/level2/level3/param'))
00398 param_server.set_param('/level1/level2/level3/param', val3)
00399
00400
00401 for ns in ['/level1/level2/level3/node', '/level1/level2/level3/level4/node']:
00402 self.assertEquals('/level1/level2/level3/param', param_server.search_param(ns, 'param'))
00403 self.assertEquals('/level1/level2/param', param_server.search_param('/level1/level2/node', 'param'))
00404 self.assertEquals('/level1/param', param_server.search_param('/level1/node', 'param'))
00405
00406
00407
00408
00409 self.assertEquals(None, param_server.search_param('/root', 'param'))
00410 self.assertEquals(None, param_server.search_param('/root', 'param/level1_p1'))
00411 self.assertEquals(None, param_server.search_param('/not/level1/level2/level3/level4/node', 'param/level1_p1'))
00412 tests = [
00413 ('/level1/node', '/level1/param/'),
00414 ('/level1/level2/', '/level1/level2/param/'),
00415 ('/level1/level2', '/level1/level2/param/'),
00416 ('/level1/level2/node', '/level1/level2/param/'),
00417 ('/level1/level2/notlevel3', '/level1/level2/param/'),
00418 ('/level1/level2/notlevel3/node', '/level1/level2/param/'),
00419 ('/level1/level2/level3/level4', '/level1/level2/level3/param/'),
00420 ('/level1/level2/level3/level4/', '/level1/level2/level3/param/'),
00421 ('/level1/level2/level3/level4/node', '/level1/level2/level3/param/'),
00422
00423 ]
00424 for ns, pbase in tests:
00425 self.assertEquals(pbase+'level1_p1',
00426 param_server.search_param(ns, 'param/level1_p1'))
00427 retval = param_server.search_param(ns, 'param/level1_p2/level2_p2')
00428 self.assertEquals(pbase+'level1_p2/level2_p2', retval,
00429 "failed with ns[%s] pbase[%s]: %s"%(ns, pbase, retval))
00430
00431
00432 self.failIf(param_server.has_param('/param'))
00433 param_server.set_param('/param', val4)
00434 self.assertEquals('/param', param_server.search_param('/root', 'param'))
00435 self.assertEquals('/param', param_server.search_param('/notlevel1/node', 'param'))
00436 self.assertEquals('/level1/param', param_server.search_param('/level1/node', 'param'))
00437 self.assertEquals('/level1/param', param_server.search_param('/level1', 'param'))
00438 self.assertEquals('/level1/param', param_server.search_param('/level1/', 'param'))
00439
00440
00441 val5 = { 'level1_p1': random.randint(0, 10000),
00442 'level1_p2' : { }}
00443
00444 self.failIf(param_server.has_param('/partial1/param'))
00445 param_server.set_param('/partial1/param', val5)
00446 self.assertEquals('/partial1/param', param_server.search_param('/partial1', 'param'))
00447 self.assertEquals('/partial1/param/level1_p1',
00448 param_server.search_param('/partial1', 'param/level1_p1'))
00449
00450 self.assertEquals('/partial1/param/non_existent',
00451 param_server.search_param('/partial1', 'param/non_existent'))
00452 self.assertEquals('/partial1/param/level1_p2/non_existent',
00453 param_server.search_param('/partial1', 'param/level1_p2/non_existent'))
00454
00455
00456
00457 def test_get_param(self):
00458 from rosmaster.paramserver import ParamDictionary
00459 param_server = ParamDictionary(None)
00460
00461 val = random.randint(0, 10000)
00462
00463 full_dict = {}
00464
00465
00466 self.failIf(param_server.has_param('/new_param'))
00467 self.failIf(param_server.has_param('/new_param/'))
00468 self.assertGetParamFail(param_server, '/new_param')
00469 param_server.set_param('/new_param', val)
00470 full_dict['new_param'] = val
00471 self.assertEquals(val, param_server.get_param('/new_param'))
00472 self.assertEquals(val, param_server.get_param('/new_param/'))
00473
00474 self.assertEquals(val, param_server.get_param('/new_param//'))
00475
00476
00477 self.assertEquals(full_dict, param_server.get_param('/'))
00478
00479
00480 val = random.randint(0, 10000)
00481 self.failIf(param_server.has_param('/sub/sub2/new_param2'))
00482 self.assertGetParamFail(param_server, '/sub/sub2/new_param2')
00483 param_server.set_param('/sub/sub2/new_param2', val)
00484 full_dict['sub'] = {'sub2': { 'new_param2': val }}
00485 self.assertEquals(val, param_server.get_param('/sub/sub2/new_param2'))
00486
00487 self.assertEquals(val, param_server.get_param('/sub///sub2/new_param2/'))
00488
00489
00490 self.assertEquals(full_dict, param_server.get_param('/'))
00491
00492
00493 val1 = random.randint(0, 10000)
00494 val2 = random.randint(0, 10000)
00495 val3 = random.randint(0, 10000)
00496
00497 for k in ['/gains/P', '/gains/I', '/gains/D', '/gains']:
00498 self.assertGetParamFail(param_server, k)
00499 self.failIf(param_server.has_param(k))
00500
00501 param_server.set_param('/gains/P', val1)
00502 param_server.set_param('/gains/I', val2)
00503 param_server.set_param('/gains/D', val3)
00504
00505 pid = {'P': val1, 'I': val2, 'D': val3}
00506 full_dict['gains'] = pid
00507 self.assertEquals(pid,
00508 param_server.get_param('/gains'))
00509 self.assertEquals(pid,
00510 param_server.get_param('/gains/'))
00511 self.assertEquals(full_dict,
00512 param_server.get_param('/'))
00513
00514 self.failIf(param_server.has_param('/ns/gains/P'))
00515 self.failIf(param_server.has_param('/ns/gains/I'))
00516 self.failIf(param_server.has_param('/ns/gains/D'))
00517 self.failIf(param_server.has_param('/ns/gains'))
00518
00519 param_server.set_param('/ns/gains/P', val1)
00520 param_server.set_param('/ns/gains/I', val2)
00521 param_server.set_param('/ns/gains/D', val3)
00522 full_dict['ns'] = {'gains': pid}
00523
00524 self.assertEquals(pid,
00525 param_server.get_param('/ns/gains'))
00526 self.assertEquals({'gains': pid},
00527 param_server.get_param('/ns/'))
00528 self.assertEquals({'gains': pid},
00529 param_server.get_param('/ns'))
00530 self.assertEquals(full_dict,
00531 param_server.get_param('/'))
00532
00533
00534 def test_delete_param(self):
00535 from rosmaster.paramserver import ParamDictionary
00536 param_server = ParamDictionary(None)
00537 try:
00538 param_server.delete_param('/fake')
00539 self.fail("delete_param of non-existent should have failed")
00540 except: pass
00541 try:
00542 param_server.delete_param('/')
00543 self.fail("delete_param of root should have failed")
00544 except: pass
00545
00546 param_server.set_param('/foo', 'foo')
00547 param_server.set_param('/bar', 'bar')
00548 self.assert_(param_server.has_param('/foo'))
00549 self.assert_(param_server.has_param('/bar'))
00550 param_server.delete_param('/foo')
00551 self.failIf(param_server.has_param('/foo'))
00552
00553 param_server.delete_param('/bar/')
00554 self.failIf(param_server.has_param('/bar'))
00555
00556
00557 param_server.set_param("/sub/key/x", 1)
00558 param_server.set_param("/sub/key/y", 2)
00559 try:
00560 param_server.delete_param('/sub/key/z')
00561 self.fail("delete_param of non-existent should have failed")
00562 except: pass
00563 try:
00564 param_server.delete_param('/sub/sub2/z')
00565 self.fail("delete_param of non-existent should have failed")
00566 except: pass
00567
00568 self.assert_(param_server.has_param('/sub/key/x'))
00569 self.assert_(param_server.has_param('/sub/key/y'))
00570 self.assert_(param_server.has_param('/sub/key'))
00571 param_server.delete_param('/sub/key')
00572 self.failIf(param_server.has_param('/sub/key'))
00573 self.failIf(param_server.has_param('/sub/key/x'))
00574 self.failIf(param_server.has_param('/sub/key/y'))
00575
00576
00577 param_server.set_param('/sub2', {'key': { 'x' : 1, 'y' : 2}})
00578 self.assert_(param_server.has_param('/sub2/key/x'))
00579 self.assert_(param_server.has_param('/sub2/key/y'))
00580 self.assert_(param_server.has_param('/sub2/key'))
00581 param_server.delete_param('/sub2/key')
00582 self.failIf(param_server.has_param('/sub2/key'))
00583 self.failIf(param_server.has_param('/sub2/key/x'))
00584 self.failIf(param_server.has_param('/sub2/key/y'))
00585
00586
00587
00588
00589 param_server.set_param('/a', 'b')
00590 self.assert_(param_server.has_param('/a'))
00591 try:
00592 param_server.delete_param('/a/b/c')
00593 self.fail_("should have raised key error")
00594 except: pass
00595
00596
00597
00598 def test_set_param(self):
00599 from rosmaster.paramserver import ParamDictionary
00600 param_server = ParamDictionary(None)
00601 caller_id = '/node'
00602 val = random.randint(0, 10000)
00603
00604
00605 try:
00606 param_server.set_param('/', 1)
00607 self.fail("ParamDictionary allowed root to be set to non-dictionary")
00608 except: pass
00609
00610
00611 self.failIf(param_server.has_param('/new_param'))
00612 param_server.set_param('/new_param', val)
00613 self.assertEquals(val, param_server.get_param('/new_param'))
00614 self.assertEquals(val, param_server.get_param('/new_param/'))
00615 self.assert_(param_server.has_param('/new_param'))
00616
00617
00618 val = random.randint(0, 10000)
00619 self.failIf(param_server.has_param('/sub/sub2/new_param2'))
00620 param_server.set_param('/sub/sub2/new_param2', val)
00621 self.assertEquals(val, param_server.get_param('/sub/sub2/new_param2'))
00622
00623
00624 vals = ['a', {'a': 'b'}, 1, 1., 'foo', {'c': 'd'}, 4, {'a': {'b': 'c'}}, 3]
00625 for v in vals:
00626 param_server.set_param('/multi/multi_param', v)
00627 self.assertEquals(v, param_server.get_param('/multi/multi_param'))
00628
00629
00630 param_server.set_param('/multi2/multi_param', 1)
00631 self.assertEquals(1, param_server.get_param('/multi2/multi_param'))
00632
00633 param_server.set_param('/multi2/multi_param/a', 2)
00634 self.assertEquals(2, param_server.get_param('/multi2/multi_param/a'))
00635 self.assertEquals({'a': 2}, param_server.get_param('/multi2/multi_param/'))
00636 param_server.set_param('/multi2/multi_param/a/b', 3)
00637 self.assertEquals(3, param_server.get_param('/multi2/multi_param/a/b'))
00638 self.assertEquals({'b': 3}, param_server.get_param('/multi2/multi_param/a/'))
00639 self.assertEquals({'a': {'b': 3}}, param_server.get_param('/multi2/multi_param/'))
00640
00641
00642
00643 self.failIf(param_server.has_param('/gains/P'))
00644 self.failIf(param_server.has_param('/gains/I'))
00645 self.failIf(param_server.has_param('/gains/D'))
00646 self.failIf(param_server.has_param('/gains'))
00647
00648 pid = {'P': random.randint(0, 10000), 'I': random.randint(0, 10000), 'D': random.randint(0, 10000)}
00649 param_server.set_param('/gains', pid)
00650 self.assertEquals(pid, param_server.get_param('/gains'))
00651 self.assertEquals(pid['P'], param_server.get_param('/gains/P'))
00652 self.assertEquals(pid['I'], param_server.get_param('/gains/I'))
00653 self.assertEquals(pid['D'], param_server.get_param('/gains/D'))
00654
00655 subns = {'gains1': pid, 'gains2': pid}
00656 param_server.set_param('/ns', subns)
00657 self.assertEquals(pid['P'], param_server.get_param('/ns/gains1/P'))
00658 self.assertEquals(pid['I'], param_server.get_param('/ns/gains1/I'))
00659 self.assertEquals(pid['D'], param_server.get_param('/ns/gains1/D'))
00660 self.assertEquals(pid, param_server.get_param('/ns/gains1'))
00661 self.assertEquals(pid, param_server.get_param('/ns/gains2'))
00662 self.assertEquals(subns, param_server.get_param('/ns/'))
00663
00664
00665 param_server.set_param('/ns', {})
00666
00667 self.assert_(param_server.has_param('/ns/'))
00668
00669 self.assertEquals({}, param_server.get_param('/ns/'))
00670
00671 self.failIf(param_server.has_param('/ns/gains1'))
00672 self.failIf(param_server.has_param('/ns/gains1/P'))
00673
00674
00675 param_server.set_param('/', {})
00676 self.failIf(param_server.has_param('/new_param'))
00677 param_server.set_param('/', {'foo': 1, 'bar': 2, 'baz': {'a': 'a'}})
00678 self.assertEquals(1, param_server.get_param('/foo'))
00679 self.assertEquals(1, param_server.get_param('/foo/'))
00680 self.assertEquals(2, param_server.get_param('/bar'))
00681 self.assertEquals(2, param_server.get_param('/bar/'))
00682 self.assertEquals('a', param_server.get_param('/baz/a'))
00683 self.assertEquals('a', param_server.get_param('/baz/a/'))
00684
00685
00686 def test_param_values(self):
00687 import math
00688 from rosmaster.paramserver import ParamDictionary
00689 param_server = ParamDictionary(None)
00690 test_vals = [
00691 ['int', [0, 1024, 2147483647, -2147483647]],
00692 ['boolean', [True, False]],
00693
00694
00695 ['unicode-string', [u'', u'hello', unicode('Andr\302\202', 'utf-8'), unicode('\377\376A\000n\000d\000r\000\202\000', 'utf-16')]],
00696 ['string-easy-ascii', [chr(n) for n in xrange(32, 128)]],
00697
00698
00699
00700
00701 ['string', ['', 'x', 'hello-there', 'new\nline', 'tab\t']],
00702 ['double', [0.0, math.pi, -math.pi, 3.4028235e+38, -3.4028235e+38]],
00703
00704 ['datetime', [datetime.datetime(2005, 12, 6, 12, 13, 14), datetime.datetime(1492, 12, 6, 12, 13, 14)]],
00705 ['array', [[], [1, 2, 3], ['a', 'b', 'c'], [0.0, 0.1, 0.2, 2.0, 2.1, -4.0],
00706 [1, 'a', True], [[1, 2, 3], ['a', 'b', 'c'], [1.0, 2.1, 3.2]]]
00707 ],
00708 ]
00709
00710 print "Putting parameters onto the server"
00711
00712 contexts = ['', 'scope1', 'scope/subscope1', 'scope/sub1/sub2']
00713 my_state = {}
00714 failures = []
00715 for ctx in contexts:
00716 self._set_param(ctx, my_state, test_vals, param_server)
00717 self._check_param_state(param_server, my_state)
00718
00719 print "Deleting all of our parameters"
00720
00721 param_keys = my_state.keys()
00722 count = 0
00723 for key in param_keys:
00724 count += 1
00725 param_server.delete_param(key)
00726 del my_state[key]
00727
00728 if count % 50 == 0:
00729 self._check_param_state(param_server, my_state)
00730 self._check_param_state(param_server, my_state)
00731
00732 def assertGetParamFail(self, param_server, param):
00733 try:
00734 param_server.get_param(param)
00735 self.fail("get_param[%s] did not raise KeyError"%(param))
00736 except KeyError: pass
00737
00738
00739 if __name__ == '__main__':
00740 rostest.unitrun('test_rosmaster', sys.argv[0], TestRospyParamServer, coverage_packages=['rosmaster.paramserver'])