$search
00001 #!/usr/bin/env python 00002 # Software License Agreement (BSD License) 00003 # 00004 # Copyright (c) 2008, Willow Garage, Inc. 00005 # All rights reserved. 00006 # 00007 # Redistribution and use in source and binary forms, with or without 00008 # modification, are permitted provided that the following conditions 00009 # are met: 00010 # 00011 # * Redistributions of source code must retain the above copyright 00012 # notice, this list of conditions and the following disclaimer. 00013 # * Redistributions in binary form must reproduce the above 00014 # copyright notice, this list of conditions and the following 00015 # disclaimer in the documentation and/or other materials provided 00016 # with the distribution. 00017 # * Neither the name of Willow Garage, Inc. nor the names of its 00018 # contributors may be used to endorse or promote products derived 00019 # from this software without specific prior written permission. 00020 # 00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 # POSSIBILITY OF SUCH DAMAGE. 00033 # 00034 # Revision $Id$ 00035 00036 import roslib; roslib.load_manifest('test_rosmaster') 00037 00038 import os 00039 import sys 00040 import struct 00041 import unittest 00042 import time 00043 import random 00044 import datetime 00045 00046 import rostest 00047 00048 from roslib.names import make_global_ns, ns_join 00049 00050 # mock of subscription tests 00051 class ThreadPoolMock(object): 00052 def queue_task(*args): pass 00053 00054 ## Unit tests for rosmaster.paramserver module 00055 class TestRospyParamServer(unittest.TestCase): 00056 00057 def test_compute_param_updates(self): 00058 from rosmaster.paramserver import compute_param_updates 00059 # spec requires that subscriptions always have a trailing slash 00060 tests = [ 00061 # [correct val], (subscribers, param_key, param_value) 00062 ([],({}, '/foo', 1)), 00063 ([],({'/bar': 'barapi'}, '/foo/', 1)), 00064 ([],({'/bar/': 'barapi'}, '/foo/', 1)), 00065 00066 # make sure that it's robust to aliases 00067 ([('fooapi', '/foo/', 1)], ({'/foo/': 'fooapi'}, '/foo', 1)), 00068 ([('fooapi', '/foo/', 1)], ({'/foo/': 'fooapi'}, '/foo/', 1)), 00069 00070 # check namespace subscription 00071 ([('fooapi', '/foo/val/', 1)], ({'/foo/': 'fooapi'}, '/foo/val', 1)), 00072 00073 # check against dictionary param values 00074 ([],({'/bar/': 'barapi'}, '/foo/', {'bar': 2})), 00075 ([('fooapi', '/foo/val/', 1)], ({'/foo/val/': 'fooapi'}, '/foo', {'val' : 1})), 00076 00077 ([('fooapi', '/foo/bar/val/', 1)], ({'/foo/bar/val/': 'fooapi'}, '/foo', {'bar' : {'val' : 1}})), 00078 ([('fooapi', '/foo/bar/', {'val': 1})], ({'/foo/bar/': 'fooapi'}, '/foo', {'bar' : {'val' : 1}})), 00079 ([('fooapi', '/foo/', {'bar':{'val': 1}})], ({'/foo/': 'fooapi'}, '/foo', {'bar' : {'val' : 1}})), 00080 00081 ([('fooapi', '/foo/', {'bar': 1, 'baz': 2}), ('foobazapi', '/foo/baz/', 2)], 00082 ({'/foo/': 'fooapi', '/foo/baz/': 'foobazapi'}, '/foo', {'bar' : 1, 'baz': 2})), 00083 00084 ([('foobarapi', '/foo/bar/', 1), ('foobazapi', '/foo/baz/', 2)], 00085 ({'/foo/bar/': 'foobarapi', '/foo/baz/': 'foobazapi'}, '/foo', {'bar' : 1, 'baz': 2})), 00086 00087 # deletion of higher level tree 00088 ([('delapi', '/del/bar/', {})], 00089 ({'/del/bar/': 'delapi'}, '/del', {})), 00090 00091 ] 00092 for correct, args in tests: 00093 val = compute_param_updates(*args) 00094 self.assertEquals(len(correct), len(val), "Failed: \n%s \nreturned \n%s\nvs correct\n%s"%(str(args), str(val), str(correct))) 00095 for c in correct: 00096 self.assert_(c in val, "Failed: \n%s \ndid not include \n%s. \nIt returned \n%s"%(str(args), c, val)) 00097 00098 00099 def notify_task(self, updates): 00100 self.last_update = updates 00101 00102 def test_subscribe_param_simple(self): 00103 from rosmaster.registrations import RegistrationManager 00104 from rosmaster.paramserver import ParamDictionary 00105 00106 # setup node and subscriber data 00107 reg_manager = RegistrationManager(ThreadPoolMock()) 00108 param_server = ParamDictionary(reg_manager) 00109 00110 # subscribe to parameter that has not been set yet 00111 self.last_update = None 00112 self.assertEquals({}, param_server.subscribe_param('/foo', ('node1', 'http://node1:1'))) 00113 param_server.set_param('/foo', 1, notify_task=self.notify_task) 00114 self.assertEquals([([('node1', 'http://node1:1')], '/foo/', 1), ], self.last_update) 00115 00116 # resubscribe 00117 self.assertEquals(1, param_server.subscribe_param('/foo', ('node1', 'http://node1:1'))) 00118 param_server.set_param('/foo', 2, notify_task=self.notify_task) 00119 self.assertEquals([([('node1', 'http://node1:1')], '/foo/', 2), ], self.last_update) 00120 00121 # resubscribe (test canonicalization of parameter name) 00122 self.assertEquals(2, param_server.subscribe_param('/foo/', ('node1', 'http://node1:1'))) 00123 param_server.set_param('/foo', 'resub2', notify_task=self.notify_task) 00124 self.assertEquals([([('node1', 'http://node1:1')], '/foo/', 'resub2'), ], self.last_update) 00125 00126 # change the URI 00127 self.assertEquals('resub2', param_server.subscribe_param('/foo', ('node1', 'http://node1b:1'))) 00128 self.assertEquals('http://node1b:1', reg_manager.get_node('node1').api) 00129 param_server.set_param('/foo', 3, notify_task=self.notify_task) 00130 self.assertEquals([([('node1', 'http://node1b:1')], '/foo/', 3), ], self.last_update) 00131 00132 # multiple subscriptions to same param 00133 self.assertEquals(3, param_server.subscribe_param('/foo', ('node2', 'http://node2:2'))) 00134 self.assertEquals('http://node2:2', reg_manager.get_node('node2').api) 00135 param_server.set_param('/foo', 4, notify_task=self.notify_task) 00136 self.assertEquals([([('node1', 'http://node1b:1'), ('node2', 'http://node2:2')], '/foo/', 4), ], self.last_update) 00137 00138 def test_subscribe_param_tree(self): 00139 from rosmaster.registrations import RegistrationManager 00140 from rosmaster.paramserver import ParamDictionary 00141 00142 # setup node and subscriber data 00143 reg_manager = RegistrationManager(ThreadPoolMock()) 00144 param_server = ParamDictionary(reg_manager) 00145 00146 # Test Parameter Tree Subscriptions 00147 00148 # simple case - subscribe and set whole tree 00149 gains = {'p': 'P', 'i': 'I', 'd' : 'D'} 00150 self.assertEquals({}, param_server.subscribe_param('/gains', ('ptnode', 'http://ptnode:1'))) 00151 param_server.set_param('/gains', gains.copy(), notify_task=self.notify_task) 00152 self.assertEquals([([('ptnode', 'http://ptnode:1')], '/gains/', gains), ], self.last_update) 00153 # - test with trailing slash 00154 param_server.set_param('/gains/', gains.copy(), notify_task=self.notify_task) 00155 self.assertEquals([([('ptnode', 'http://ptnode:1')], '/gains/', gains), ], self.last_update) 00156 00157 # change params within tree 00158 param_server.set_param('/gains/p', 'P2', notify_task=self.notify_task) 00159 self.assertEquals([([('ptnode', 'http://ptnode:1')], '/gains/p/', 'P2'), ], self.last_update) 00160 param_server.set_param('/gains/i', 'I2', notify_task=self.notify_task) 00161 self.assertEquals([([('ptnode', 'http://ptnode:1')], '/gains/i/', 'I2'), ], self.last_update) 00162 00163 # test overlapping subscriptions 00164 self.assertEquals('P2', param_server.subscribe_param('/gains/p', ('ptnode2', 'http://ptnode2:2'))) 00165 param_server.set_param('/gains', gains.copy(), notify_task=self.notify_task) 00166 self.assertEquals([([('ptnode', 'http://ptnode:1')], '/gains/', gains), \ 00167 ([('ptnode2', 'http://ptnode2:2')], '/gains/p/', 'P'), \ 00168 ], self.last_update) 00169 # - retest with trailing slash on subscribe 00170 self.last_update = None 00171 self.assertEquals('P', param_server.subscribe_param('/gains/p/', ('ptnode2', 'http://ptnode2:2'))) 00172 param_server.set_param('/gains', gains.copy(), notify_task=self.notify_task) 00173 self.assertEquals([([('ptnode', 'http://ptnode:1')], '/gains/', gains), \ 00174 ([('ptnode2', 'http://ptnode2:2')], '/gains/p/', 'P'), \ 00175 ], self.last_update) 00176 # test with overlapping (change to sub param) 00177 param_server.set_param('/gains/p', 'P3', notify_task=self.notify_task) 00178 # - this is a bit overtuned as a more optimal ps could use one update 00179 self.assertEquals([([('ptnode2', 'http://ptnode2:2')], '/gains/p/', 'P3'), \ 00180 ([('ptnode', 'http://ptnode:1')], '/gains/p/', 'P3'), \ 00181 ], self.last_update) 00182 00183 # virtual deletion: subscribe to subparam, parameter tree reset 00184 self.last_update = None 00185 param_server.set_param('/gains2', gains.copy(), notify_task=self.notify_task) 00186 self.assertEquals('P', param_server.subscribe_param('/gains2/p/', ('ptnode3', 'http://ptnode3:3'))) 00187 # - erase the sub parameters 00188 param_server.set_param('/gains2', {}, notify_task=self.notify_task) 00189 self.assertEquals([([('ptnode3', 'http://ptnode3:3')], '/gains2/p/', {}), ], self.last_update) 00190 00191 #Final test: test subscription to entire tree 00192 self.last_update = None 00193 param_server.delete_param('/gains') 00194 param_server.delete_param('/gains2') 00195 self.assertEquals({}, param_server.get_param('/')) 00196 self.assertEquals({}, param_server.subscribe_param('/', ('allnode', 'http://allnode:1'))) 00197 param_server.set_param('/one', 1, notify_task=self.notify_task) 00198 self.assertEquals([([('allnode', 'http://allnode:1')], '/one/', 1), ], self.last_update) 00199 param_server.set_param('/two', 2, notify_task=self.notify_task) 00200 self.assertEquals([([('allnode', 'http://allnode:1')], '/two/', 2), ], self.last_update) 00201 param_server.set_param('/foo/bar', 'bar', notify_task=self.notify_task) 00202 self.assertEquals([([('allnode', 'http://allnode:1')], '/foo/bar/', 'bar'), ], self.last_update) 00203 00204 00205 # verify that subscribe_param works with parameter deletion 00206 def test_subscribe_param_deletion(self): 00207 from rosmaster.registrations import RegistrationManager 00208 from rosmaster.paramserver import ParamDictionary 00209 00210 # setup node and subscriber data 00211 reg_manager = RegistrationManager(ThreadPoolMock()) 00212 param_server = ParamDictionary(reg_manager) 00213 00214 # subscription to then delete parameter 00215 self.assertEquals({}, param_server.subscribe_param('/foo', ('node1', 'http://node1:1'))) 00216 param_server.set_param('/foo', 1, notify_task=self.notify_task) 00217 param_server.delete_param('/foo', notify_task=self.notify_task) 00218 self.assertEquals([([('node1', 'http://node1:1')], '/foo/', {}), ], self.last_update) 00219 00220 # subscribe to and delete whole tree 00221 gains = {'p': 'P', 'i': 'I', 'd' : 'D'} 00222 self.assertEquals({}, param_server.subscribe_param('/gains', ('deltree', 'http://deltree:1'))) 00223 param_server.set_param('/gains', gains.copy(), notify_task=self.notify_task) 00224 param_server.delete_param('/gains', notify_task=self.notify_task) 00225 self.assertEquals([([('deltree', 'http://deltree:1')], '/gains/', {}), ], self.last_update) 00226 00227 # subscribe to and delete params within subtree 00228 self.assertEquals({}, param_server.subscribe_param('/gains2', ('deltree2', 'http://deltree2:2'))) 00229 param_server.set_param('/gains2', gains.copy(), notify_task=self.notify_task) 00230 param_server.delete_param('/gains2/p', notify_task=self.notify_task) 00231 self.assertEquals([([('deltree2', 'http://deltree2:2')], '/gains2/p/', {}), ], self.last_update) 00232 param_server.delete_param('/gains2/i', notify_task=self.notify_task) 00233 self.assertEquals([([('deltree2', 'http://deltree2:2')], '/gains2/i/', {}), ], self.last_update) 00234 param_server.delete_param('/gains2', notify_task=self.notify_task) 00235 self.assertEquals([([('deltree2', 'http://deltree2:2')], '/gains2/', {}), ], self.last_update) 00236 00237 # delete parent tree 00238 k = '/ns1/ns2/ns3/key' 00239 self.assertEquals({}, param_server.subscribe_param(k, ('del_parent', 'http://del_parent:1'))) 00240 param_server.set_param(k, 1, notify_task=self.notify_task) 00241 param_server.delete_param('/ns1/ns2', notify_task=self.notify_task) 00242 self.assertEquals([([('del_parent', 'http://del_parent:1')], '/ns1/ns2/ns3/key/', {}), ], self.last_update) 00243 00244 def test_unsubscribe_param(self): 00245 from rosmaster.registrations import RegistrationManager 00246 from rosmaster.paramserver import ParamDictionary 00247 00248 # setup node and subscriber data 00249 reg_manager = RegistrationManager(ThreadPoolMock()) 00250 param_server = ParamDictionary(reg_manager) 00251 00252 # basic test 00253 self.last_update = None 00254 self.assertEquals({}, param_server.subscribe_param('/foo', ('node1', 'http://node1:1'))) 00255 param_server.set_param('/foo', 1, notify_task=self.notify_task) 00256 self.assertEquals([([('node1', 'http://node1:1')], '/foo/', 1), ], self.last_update) 00257 # - return value is actually generated by Registrations 00258 code, msg, val = param_server.unsubscribe_param('/foo', ('node1', 'http://node1:1')) 00259 self.assertEquals(1, code) 00260 self.assertEquals(1, val) 00261 self.last_update = None 00262 param_server.set_param('/foo', 2, notify_task=self.notify_task) 00263 self.assertEquals(None, self.last_update) 00264 # - repeat the unsubscribe 00265 code, msg, val = param_server.unsubscribe_param('/foo', ('node1', 'http://node1:1')) 00266 self.assertEquals(1, code) 00267 self.assertEquals(0, val) 00268 self.last_update = None 00269 param_server.set_param('/foo', 2, notify_task=self.notify_task) 00270 self.assertEquals(None, self.last_update) 00271 00272 # verify that stale unsubscribe has no effect on active subscription 00273 self.last_update = None 00274 self.assertEquals({}, param_server.subscribe_param('/bar', ('barnode', 'http://barnode:1'))) 00275 param_server.set_param('/bar', 3, notify_task=self.notify_task) 00276 self.assertEquals([([('barnode', 'http://barnode:1')], '/bar/', 3), ], self.last_update) 00277 code, msg, val = param_server.unsubscribe_param('/foo', ('barnode', 'http://notbarnode:1')) 00278 self.assertEquals(1, code) 00279 self.assertEquals(0, val) 00280 param_server.set_param('/bar', 4, notify_task=self.notify_task) 00281 self.assertEquals([([('barnode', 'http://barnode:1')], '/bar/', 4), ], self.last_update) 00282 00283 00284 def _set_param(self, ctx, my_state, test_vals, param_server): 00285 ctx = make_global_ns(ctx) 00286 for type, vals in test_vals: 00287 try: 00288 caller_id = ns_join(ctx, "node") 00289 count = 0 00290 for val in vals: 00291 key = ns_join(caller_id, "%s-%s"%(type,count)) 00292 param_server.set_param(key, val) 00293 self.assert_(param_server.has_param(key)) 00294 true_key = ns_join(ctx, key) 00295 my_state[true_key] = val 00296 count += 1 00297 except Exception, e: 00298 assert "getParam failed on type[%s], val[%s]"%(type,val) 00299 #self._check_param_state(my_state) 00300 00301 def _check_param_state(self, param_server, my_state): 00302 for (k, v) in my_state.iteritems(): 00303 assert param_server.has_param(k) 00304 #print "verifying parameter %s"%k 00305 try: 00306 v2 = param_server.get_param(k) 00307 except: 00308 raise Exception("Exception raised while calling param_server.get_param(%s): %s"%(k, traceback.format_exc())) 00309 00310 self.assertEquals(v, v2) 00311 param_names = my_state.keys() 00312 ps_param_names = param_server.get_param_names() 00313 assert not set(param_names) ^ set(ps_param_names), "parameter server keys do not match local: %s"%(set(param_names)^set(ps_param_names)) 00314 00315 00316 # test_has_param: test has_param API 00317 def test_has_param(self): 00318 from rosmaster.paramserver import ParamDictionary 00319 param_server = ParamDictionary(None) 00320 00321 self.failIf(param_server.has_param('/new_param')) 00322 param_server.set_param('/new_param', 1) 00323 self.assert_(param_server.has_param('/new_param')) 00324 00325 # test with param in sub-namespace 00326 self.failIf(param_server.has_param('/sub/sub2/new_param2')) 00327 # - verify that parameter tree does not exist yet (#587) 00328 for k in ['/sub/sub2/', '/sub/sub2', '/sub/', '/sub']: 00329 self.failIf(param_server.has_param(k)) 00330 param_server.set_param('/sub/sub2/new_param2', 1) 00331 self.assert_(param_server.has_param('/sub/sub2/new_param2')) 00332 # - verify that parameter tree now exists (#587) 00333 for k in ['/sub/sub2/', '/sub/sub2', '/sub/', '/sub']: 00334 self.assert_(param_server.has_param(k)) 00335 00336 00337 ## test ^param naming, i.e. upwards-looking get access 00338 ## @param self 00339 def test_search_param(self): 00340 from rosmaster.paramserver import ParamDictionary 00341 param_server = ParamDictionary(None) 00342 00343 caller_id = '/node' 00344 # vals are mostly identical, save some randomness. we want 00345 # identical structure in order to stress lookup rules 00346 val1 = { 'level1_p1': random.randint(0, 10000), 00347 'level1_p2' : { 'level2_p2': random.randint(0, 10000) }} 00348 val2 = { 'level1_p1': random.randint(0, 10000), 00349 'level1_p2' : { 'level2_p2': random.randint(0, 10000) }} 00350 val3 = { 'level1_p1': random.randint(0, 10000), 00351 'level1_p2' : { 'level2_p2': random.randint(0, 10000) }} 00352 val4 = { 'level1_p1': random.randint(0, 10000), 00353 'level1_p2' : { 'level2_p2': random.randint(0, 10000) }} 00354 full_dict = {} 00355 00356 # test invalid input 00357 for k in ['', None, '~param']: 00358 try: 00359 param_server.search_param('/level1/level2', k) 00360 self.fail("param_server search should have failed on [%s]"%k) 00361 except ValueError: pass 00362 for ns in ['', None, 'relative', '~param']: 00363 try: 00364 param_server.search_param(ns, 'param') 00365 self.fail("param_server search should have failed on %s"%k) 00366 except ValueError: pass 00367 00368 # set the val parameter at four levels so we can validate search 00369 00370 # - set val1 00371 self.failIf(param_server.has_param('/level1/param')) 00372 self.failIf(param_server.search_param('/level1/node', 'param')) 00373 param_server.set_param('/level1/param', val1) 00374 00375 # - test param on val1 00376 for ns in ['/level1/node', '/level1/level2/node', '/level1/level2/level3/node']: 00377 self.assertEquals('/level1/param', param_server.search_param(ns, 'param'), "failed with ns[%s]"%ns) 00378 self.assertEquals('/level1/param/', param_server.search_param(ns, 'param/')) 00379 self.assertEquals('/level1/param/level1_p1', param_server.search_param(ns, 'param/level1_p1')) 00380 self.assertEquals('/level1/param/level1_p2/level2_p2', param_server.search_param(ns, 'param/level1_p2/level2_p2')) 00381 self.assertEquals(None, param_server.search_param('/root', 'param')) 00382 self.assertEquals(None, param_server.search_param('/root', 'param/')) 00383 00384 # - set val2 00385 self.failIf(param_server.has_param('/level1/level2/param')) 00386 param_server.set_param('/level1/level2/param', val2) 00387 00388 # - test param on val2 00389 for ns in ['/level1/level2/node', '/level1/level2/level3/node', '/level1/level2/level3/level4/node']: 00390 self.assertEquals('/level1/level2/param', param_server.search_param(ns, 'param')) 00391 self.assertEquals('/level1/level2/param/', param_server.search_param(ns, 'param/')) 00392 self.assertEquals('/level1/param', param_server.search_param('/level1/node', 'param')) 00393 self.assertEquals('/level1/param/', param_server.search_param('/level1/node', 'param/')) 00394 self.assertEquals(None, param_server.search_param('/root', 'param')) 00395 00396 # - set val3 00397 self.failIf(param_server.has_param('/level1/level2/level3/param')) 00398 param_server.set_param('/level1/level2/level3/param', val3) 00399 00400 # - test param on val3 00401 for ns in ['/level1/level2/level3/node', '/level1/level2/level3/level4/node']: 00402 self.assertEquals('/level1/level2/level3/param', param_server.search_param(ns, 'param')) 00403 self.assertEquals('/level1/level2/param', param_server.search_param('/level1/level2/node', 'param')) 00404 self.assertEquals('/level1/param', param_server.search_param('/level1/node', 'param')) 00405 00406 # test subparams before we set val4 on the root 00407 # - test looking for param/sub_param 00408 00409 self.assertEquals(None, param_server.search_param('/root', 'param')) 00410 self.assertEquals(None, param_server.search_param('/root', 'param/level1_p1')) 00411 self.assertEquals(None, param_server.search_param('/not/level1/level2/level3/level4/node', 'param/level1_p1')) 00412 tests = [ 00413 ('/level1/node', '/level1/param/'), 00414 ('/level1/level2/', '/level1/level2/param/'), 00415 ('/level1/level2', '/level1/level2/param/'), 00416 ('/level1/level2/node', '/level1/level2/param/'), 00417 ('/level1/level2/notlevel3', '/level1/level2/param/'), 00418 ('/level1/level2/notlevel3/node', '/level1/level2/param/'), 00419 ('/level1/level2/level3/level4', '/level1/level2/level3/param/'), 00420 ('/level1/level2/level3/level4/', '/level1/level2/level3/param/'), 00421 ('/level1/level2/level3/level4/node', '/level1/level2/level3/param/'), 00422 00423 ] 00424 for ns, pbase in tests: 00425 self.assertEquals(pbase+'level1_p1', 00426 param_server.search_param(ns, 'param/level1_p1')) 00427 retval = param_server.search_param(ns, 'param/level1_p2/level2_p2') 00428 self.assertEquals(pbase+'level1_p2/level2_p2', retval, 00429 "failed with ns[%s] pbase[%s]: %s"%(ns, pbase, retval)) 00430 00431 # - set val4 on the root 00432 self.failIf(param_server.has_param('/param')) 00433 param_server.set_param('/param', val4) 00434 self.assertEquals('/param', param_server.search_param('/root', 'param')) 00435 self.assertEquals('/param', param_server.search_param('/notlevel1/node', 'param')) 00436 self.assertEquals('/level1/param', param_server.search_param('/level1/node', 'param')) 00437 self.assertEquals('/level1/param', param_server.search_param('/level1', 'param')) 00438 self.assertEquals('/level1/param', param_server.search_param('/level1/', 'param')) 00439 00440 # make sure that partial match works 00441 val5 = { 'level1_p1': random.randint(0, 10000), 00442 'level1_p2' : { }} 00443 00444 self.failIf(param_server.has_param('/partial1/param')) 00445 param_server.set_param('/partial1/param', val5) 00446 self.assertEquals('/partial1/param', param_server.search_param('/partial1', 'param')) 00447 self.assertEquals('/partial1/param/level1_p1', 00448 param_server.search_param('/partial1', 'param/level1_p1')) 00449 # - this is the important check, should return key even if it doesn't exist yet based on stem match 00450 self.assertEquals('/partial1/param/non_existent', 00451 param_server.search_param('/partial1', 'param/non_existent')) 00452 self.assertEquals('/partial1/param/level1_p2/non_existent', 00453 param_server.search_param('/partial1', 'param/level1_p2/non_existent')) 00454 00455 00456 # test_get_param: test basic getParam behavior. Value encoding verified separately by testParamValues 00457 def test_get_param(self): 00458 from rosmaster.paramserver import ParamDictionary 00459 param_server = ParamDictionary(None) 00460 00461 val = random.randint(0, 10000) 00462 00463 full_dict = {} 00464 00465 # very similar to has param sequence 00466 self.failIf(param_server.has_param('/new_param')) 00467 self.failIf(param_server.has_param('/new_param/')) 00468 self.assertGetParamFail(param_server, '/new_param') 00469 param_server.set_param('/new_param', val) 00470 full_dict['new_param'] = val 00471 self.assertEquals(val, param_server.get_param('/new_param')) 00472 self.assertEquals(val, param_server.get_param('/new_param/')) 00473 # - test homonym 00474 self.assertEquals(val, param_server.get_param('/new_param//')) 00475 00476 # test full get 00477 self.assertEquals(full_dict, param_server.get_param('/')) 00478 00479 # test with param in sub-namespace 00480 val = random.randint(0, 10000) 00481 self.failIf(param_server.has_param('/sub/sub2/new_param2')) 00482 self.assertGetParamFail(param_server, '/sub/sub2/new_param2') 00483 param_server.set_param('/sub/sub2/new_param2', val) 00484 full_dict['sub'] = {'sub2': { 'new_param2': val }} 00485 self.assertEquals(val, param_server.get_param('/sub/sub2/new_param2')) 00486 # - test homonym 00487 self.assertEquals(val, param_server.get_param('/sub///sub2/new_param2/')) 00488 00489 # test full get 00490 self.assertEquals(full_dict, param_server.get_param('/')) 00491 00492 # test that parameter server namespace-get (#587) 00493 val1 = random.randint(0, 10000) 00494 val2 = random.randint(0, 10000) 00495 val3 = random.randint(0, 10000) 00496 00497 for k in ['/gains/P', '/gains/I', '/gains/D', '/gains']: 00498 self.assertGetParamFail(param_server, k) 00499 self.failIf(param_server.has_param(k)) 00500 00501 param_server.set_param('/gains/P', val1) 00502 param_server.set_param('/gains/I', val2) 00503 param_server.set_param('/gains/D', val3) 00504 00505 pid = {'P': val1, 'I': val2, 'D': val3} 00506 full_dict['gains'] = pid 00507 self.assertEquals(pid, 00508 param_server.get_param('/gains')) 00509 self.assertEquals(pid, 00510 param_server.get_param('/gains/')) 00511 self.assertEquals(full_dict, 00512 param_server.get_param('/')) 00513 00514 self.failIf(param_server.has_param('/ns/gains/P')) 00515 self.failIf(param_server.has_param('/ns/gains/I')) 00516 self.failIf(param_server.has_param('/ns/gains/D')) 00517 self.failIf(param_server.has_param('/ns/gains')) 00518 00519 param_server.set_param('/ns/gains/P', val1) 00520 param_server.set_param('/ns/gains/I', val2) 00521 param_server.set_param('/ns/gains/D', val3) 00522 full_dict['ns'] = {'gains': pid} 00523 00524 self.assertEquals(pid, 00525 param_server.get_param('/ns/gains')) 00526 self.assertEquals({'gains': pid}, 00527 param_server.get_param('/ns/')) 00528 self.assertEquals({'gains': pid}, 00529 param_server.get_param('/ns')) 00530 self.assertEquals(full_dict, 00531 param_server.get_param('/')) 00532 00533 00534 def test_delete_param(self): 00535 from rosmaster.paramserver import ParamDictionary 00536 param_server = ParamDictionary(None) 00537 try: 00538 param_server.delete_param('/fake') 00539 self.fail("delete_param of non-existent should have failed") 00540 except: pass 00541 try: 00542 param_server.delete_param('/') 00543 self.fail("delete_param of root should have failed") 00544 except: pass 00545 00546 param_server.set_param('/foo', 'foo') 00547 param_server.set_param('/bar', 'bar') 00548 self.assert_(param_server.has_param('/foo')) 00549 self.assert_(param_server.has_param('/bar')) 00550 param_server.delete_param('/foo') 00551 self.failIf(param_server.has_param('/foo')) 00552 # - test with trailing slash 00553 param_server.delete_param('/bar/') 00554 self.failIf(param_server.has_param('/bar')) 00555 00556 # test with namespaces 00557 param_server.set_param("/sub/key/x", 1) 00558 param_server.set_param("/sub/key/y", 2) 00559 try: 00560 param_server.delete_param('/sub/key/z') 00561 self.fail("delete_param of non-existent should have failed") 00562 except: pass 00563 try: 00564 param_server.delete_param('/sub/sub2/z') 00565 self.fail("delete_param of non-existent should have failed") 00566 except: pass 00567 00568 self.assert_(param_server.has_param('/sub/key/x')) 00569 self.assert_(param_server.has_param('/sub/key/y')) 00570 self.assert_(param_server.has_param('/sub/key')) 00571 param_server.delete_param('/sub/key') 00572 self.failIf(param_server.has_param('/sub/key')) 00573 self.failIf(param_server.has_param('/sub/key/x')) 00574 self.failIf(param_server.has_param('/sub/key/y')) 00575 00576 # test with namespaces (dictionary vals) 00577 param_server.set_param('/sub2', {'key': { 'x' : 1, 'y' : 2}}) 00578 self.assert_(param_server.has_param('/sub2/key/x')) 00579 self.assert_(param_server.has_param('/sub2/key/y')) 00580 self.assert_(param_server.has_param('/sub2/key')) 00581 param_server.delete_param('/sub2/key') 00582 self.failIf(param_server.has_param('/sub2/key')) 00583 self.failIf(param_server.has_param('/sub2/key/x')) 00584 self.failIf(param_server.has_param('/sub2/key/y')) 00585 00586 # test with namespaces: treat value as if its a namespace 00587 # - try to get the dictionary-of-dictionary code to fail 00588 # by descending a value key as if it is a namespace 00589 param_server.set_param('/a', 'b') 00590 self.assert_(param_server.has_param('/a')) 00591 try: 00592 param_server.delete_param('/a/b/c') 00593 self.fail_("should have raised key error") 00594 except: pass 00595 00596 00597 # test_set_param: test basic set_param behavior. Value encoding verified separately by testParamValues 00598 def test_set_param(self): 00599 from rosmaster.paramserver import ParamDictionary 00600 param_server = ParamDictionary(None) 00601 caller_id = '/node' 00602 val = random.randint(0, 10000) 00603 00604 # verify error behavior with root 00605 try: 00606 param_server.set_param('/', 1) 00607 self.fail("ParamDictionary allowed root to be set to non-dictionary") 00608 except: pass 00609 00610 # very similar to has param sequence 00611 self.failIf(param_server.has_param('/new_param')) 00612 param_server.set_param('/new_param', val) 00613 self.assertEquals(val, param_server.get_param('/new_param')) 00614 self.assertEquals(val, param_server.get_param('/new_param/')) 00615 self.assert_(param_server.has_param('/new_param')) 00616 00617 # test with param in sub-namespace 00618 val = random.randint(0, 10000) 00619 self.failIf(param_server.has_param('/sub/sub2/new_param2')) 00620 param_server.set_param('/sub/sub2/new_param2', val) 00621 self.assertEquals(val, param_server.get_param('/sub/sub2/new_param2')) 00622 00623 # test with param type mutation 00624 vals = ['a', {'a': 'b'}, 1, 1., 'foo', {'c': 'd'}, 4, {'a': {'b': 'c'}}, 3] 00625 for v in vals: 00626 param_server.set_param('/multi/multi_param', v) 00627 self.assertEquals(v, param_server.get_param('/multi/multi_param')) 00628 00629 # - set value within subtree that mutates higher level value 00630 param_server.set_param('/multi2/multi_param', 1) 00631 self.assertEquals(1, param_server.get_param('/multi2/multi_param')) 00632 00633 param_server.set_param('/multi2/multi_param/a', 2) 00634 self.assertEquals(2, param_server.get_param('/multi2/multi_param/a')) 00635 self.assertEquals({'a': 2}, param_server.get_param('/multi2/multi_param/')) 00636 param_server.set_param('/multi2/multi_param/a/b', 3) 00637 self.assertEquals(3, param_server.get_param('/multi2/multi_param/a/b')) 00638 self.assertEquals({'b': 3}, param_server.get_param('/multi2/multi_param/a/')) 00639 self.assertEquals({'a': {'b': 3}}, param_server.get_param('/multi2/multi_param/')) 00640 00641 00642 # test that parameter server namespace-set (#587) 00643 self.failIf(param_server.has_param('/gains/P')) 00644 self.failIf(param_server.has_param('/gains/I')) 00645 self.failIf(param_server.has_param('/gains/D')) 00646 self.failIf(param_server.has_param('/gains')) 00647 00648 pid = {'P': random.randint(0, 10000), 'I': random.randint(0, 10000), 'D': random.randint(0, 10000)} 00649 param_server.set_param('/gains', pid) 00650 self.assertEquals(pid, param_server.get_param('/gains')) 00651 self.assertEquals(pid['P'], param_server.get_param('/gains/P')) 00652 self.assertEquals(pid['I'], param_server.get_param('/gains/I')) 00653 self.assertEquals(pid['D'], param_server.get_param('/gains/D')) 00654 00655 subns = {'gains1': pid, 'gains2': pid} 00656 param_server.set_param('/ns', subns) 00657 self.assertEquals(pid['P'], param_server.get_param('/ns/gains1/P')) 00658 self.assertEquals(pid['I'], param_server.get_param('/ns/gains1/I')) 00659 self.assertEquals(pid['D'], param_server.get_param('/ns/gains1/D')) 00660 self.assertEquals(pid, param_server.get_param('/ns/gains1')) 00661 self.assertEquals(pid, param_server.get_param('/ns/gains2')) 00662 self.assertEquals(subns, param_server.get_param('/ns/')) 00663 00664 # test empty dictionary set 00665 param_server.set_param('/ns', {}) 00666 # - param should still exist 00667 self.assert_(param_server.has_param('/ns/')) 00668 # - value should remain dictionary 00669 self.assertEquals({}, param_server.get_param('/ns/')) 00670 # - value2 below /ns/ should be erased 00671 self.failIf(param_server.has_param('/ns/gains1')) 00672 self.failIf(param_server.has_param('/ns/gains1/P')) 00673 00674 # verify that root can be set and that it erases all values 00675 param_server.set_param('/', {}) 00676 self.failIf(param_server.has_param('/new_param')) 00677 param_server.set_param('/', {'foo': 1, 'bar': 2, 'baz': {'a': 'a'}}) 00678 self.assertEquals(1, param_server.get_param('/foo')) 00679 self.assertEquals(1, param_server.get_param('/foo/')) 00680 self.assertEquals(2, param_server.get_param('/bar')) 00681 self.assertEquals(2, param_server.get_param('/bar/')) 00682 self.assertEquals('a', param_server.get_param('/baz/a')) 00683 self.assertEquals('a', param_server.get_param('/baz/a/')) 00684 00685 # test_param_values: test storage of all XML-RPC compatible types""" 00686 def test_param_values(self): 00687 import math 00688 from rosmaster.paramserver import ParamDictionary 00689 param_server = ParamDictionary(None) 00690 test_vals = [ 00691 ['int', [0, 1024, 2147483647, -2147483647]], 00692 ['boolean', [True, False]], 00693 #no longer testing null char 00694 #['string', ['', '\0', 'x', 'hello', ''.join([chr(n) for n in xrange(0, 255)])]], 00695 ['unicode-string', [u'', u'hello', unicode('Andr\302\202', 'utf-8'), unicode('\377\376A\000n\000d\000r\000\202\000', 'utf-16')]], 00696 ['string-easy-ascii', [chr(n) for n in xrange(32, 128)]], 00697 00698 #['string-mean-ascii-low', [chr(n) for n in xrange(9, 10)]], #separate for easier book-keeping 00699 #['string-mean-ascii-low', [chr(n) for n in xrange(1, 31)]], #separate for easier book-keeping 00700 #['string-mean-signed', [chr(n) for n in xrange(129, 256)]], 00701 ['string', ['', 'x', 'hello-there', 'new\nline', 'tab\t']], 00702 ['double', [0.0, math.pi, -math.pi, 3.4028235e+38, -3.4028235e+38]], 00703 #TODO: microseconds? 00704 ['datetime', [datetime.datetime(2005, 12, 6, 12, 13, 14), datetime.datetime(1492, 12, 6, 12, 13, 14)]], 00705 ['array', [[], [1, 2, 3], ['a', 'b', 'c'], [0.0, 0.1, 0.2, 2.0, 2.1, -4.0], 00706 [1, 'a', True], [[1, 2, 3], ['a', 'b', 'c'], [1.0, 2.1, 3.2]]] 00707 ], 00708 ] 00709 00710 print "Putting parameters onto the server" 00711 # put our params into the parameter server 00712 contexts = ['', 'scope1', 'scope/subscope1', 'scope/sub1/sub2'] 00713 my_state = {} 00714 failures = [] 00715 for ctx in contexts: 00716 self._set_param(ctx, my_state, test_vals, param_server) 00717 self._check_param_state(param_server, my_state) 00718 00719 print "Deleting all of our parameters" 00720 # delete all of our parameters 00721 param_keys = my_state.keys() 00722 count = 0 00723 for key in param_keys: 00724 count += 1 00725 param_server.delete_param(key) 00726 del my_state[key] 00727 # far too intensive to check every time 00728 if count % 50 == 0: 00729 self._check_param_state(param_server, my_state) 00730 self._check_param_state(param_server, my_state) 00731 00732 def assertGetParamFail(self, param_server, param): 00733 try: 00734 param_server.get_param(param) 00735 self.fail("get_param[%s] did not raise KeyError"%(param)) 00736 except KeyError: pass 00737 00738 00739 if __name__ == '__main__': 00740 rostest.unitrun('test_rosmaster', sys.argv[0], TestRospyParamServer, coverage_packages=['rosmaster.paramserver'])