param_server_test_case.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2008, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 #
34 # Revision $Id: test_embed_msg.py 1986 2008-08-26 23:57:56Z sfkwc $
35 
36 ## Integration test for empty services to test serializers
37 ## and transport
38 
39 import sys
40 import unittest
41 try:
42  from xmlrpc.client import DateTime
43 except ImportError:
44  from xmlrpclib import DateTime
45 import math
46 import datetime
47 import random
48 import traceback
49 
50 import rostest
51 from roslib.names import make_global_ns, ns_join
52 
53 from rosclient import TestRosClient
54 
55 HAS_PARAM = True
56 
57 ## Parameter Server API Test Cases. These tests are individually
58 ## enabled by param server test nodes. Each test assumes a fresh param
59 ## server, so we cannot run within the same roslaunch/rostest session.
60 class ParamServerTestCase(TestRosClient):
61 
62  def _setParam(self, ctx, myState, testVals, master):
63  ctx = make_global_ns(ctx)
64  for type, vals in testVals:
65  try:
66  callerId = ns_join(ctx, "node")
67  count = 0
68  for val in vals:
69  key = "%s-%s"%(type,count)
70  #print("master.setParam(%s,%s)"%(callerId, key))
71  master.setParam(callerId, key, val)
72  self.assert_(self.apiSuccess(master.hasParam(callerId, key)))
73  trueKey = ns_join(ctx, key)
74  myState[trueKey] = val
75  count += 1
76  except Exception:
77  assert "getParam failed on type[%s], val[%s]"%(type,val)
78  #self._checkParamState(myState)
79 
80  def _checkParamState(self, myState):
81  master = self.master
82  callerId = 'master' #validate from root
83  for (k, v) in myState.items():
84  assert self.apiSuccess(master.hasParam(callerId, k))
85  #print("verifying parameter %s"%k)
86  try:
87  v2 = self.apiSuccess(master.getParam(callerId, k))
88  except:
89  raise Exception("Exception raised while calling master.getParam(%s,%s): %s"%(callerId, k, traceback.format_exc()))
90  if isinstance(v2, DateTime):
91  self.assertEquals(DateTime(v), v2, "[%s]: %s != %s, %s"%(k, v, v2, v2.__class__))
92  elif type(v2) == float:
93  self.assertAlmostEqual(v, v2, 3, "[%s]: %s != %s, %s"%(k, v, v2, v2.__class__))
94  else:
95  self.assertEquals(v, v2)
96  paramNames = myState.keys()
97  remoteParamNames = self.apiSuccess(master.getParamNames(callerId))
98  # filter out the roslaunch params like run id and roslaunch/, which are always set
99  remoteParamNames = [p for p in remoteParamNames if not p in ['/run_id', '/rosdistro', '/rosversion']]
100  remoteParamNames = [p for p in remoteParamNames if not p.startswith('/roslaunch/')]
101 
102  assert not set(paramNames) ^ set(remoteParamNames), "parameter server keys do not match local: %s"%(set(paramNames)^set(remoteParamNames))
103 
104 
105  # _testHasParam: test hasParam API
106  def _testHasParam(self):
107  master = self.master
108  caller_id = '/node'
109  self.failIf(self.apiSuccess(master.hasParam(caller_id, '/new_param')))
110  self.apiSuccess(master.setParam(caller_id, '/new_param', 1))
111  self.assert_(self.apiSuccess(master.hasParam(caller_id, '/new_param')))
112  # test with relative-name resolution
113  self.assert_(self.apiSuccess(master.hasParam(caller_id, 'new_param')))
114 
115  # test with param in sub-namespace
116  self.failIf(self.apiSuccess(master.hasParam(caller_id, '/sub/sub2/new_param2')))
117  # - verify that parameter tree does not exist yet (#587)
118  self.failIf(self.apiSuccess(master.hasParam(caller_id, '/sub/sub2/')))
119  self.failIf(self.apiSuccess(master.hasParam(caller_id, '/sub/sub2')))
120  self.failIf(self.apiSuccess(master.hasParam(caller_id, '/sub/')))
121  self.failIf(self.apiSuccess(master.hasParam(caller_id, '/sub')))
122  self.apiSuccess(master.setParam(caller_id, '/sub/sub2/new_param2', 1))
123  self.assert_(self.apiSuccess(master.hasParam(caller_id, '/sub/sub2/new_param2')))
124  # - verify that parameter tree now exists (#587)
125  self.assert_(self.apiSuccess(master.hasParam(caller_id, '/sub/sub2/')))
126  self.assert_(self.apiSuccess(master.hasParam(caller_id, '/sub/sub2')))
127  self.assert_(self.apiSuccess(master.hasParam(caller_id, '/sub/')))
128  self.assert_(self.apiSuccess(master.hasParam(caller_id, '/sub')))
129  # test with relative-name resolution
130  self.assert_(self.apiSuccess(master.hasParam(caller_id, 'sub/sub2/new_param2')))
131  self.assert_(self.apiSuccess(master.hasParam('/sub/node', 'sub2/new_param2')))
132  self.assert_(self.apiSuccess(master.hasParam('/sub/sub2/node', 'new_param2')))
133  self.assert_(self.apiSuccess(master.hasParam('/sub/node', 'sub2')))
134  self.assert_(self.apiSuccess(master.hasParam('/node', 'sub')))
135 
136  # testSearchParam: test upwards-looking parameter search
137  def _testSearchParam(self):
138  master = self.master
139  caller_id = '/node'
140  # vals are mostly identical, save some randomness. we want
141  # identical structure in order to stress lookup rules
142  val1 = { 'level1_p1': random.randint(0, 10000),
143  'level1_p2' : { 'level2_p2': random.randint(0, 10000) }}
144  val2 = { 'level1_p1': random.randint(0, 10000),
145  'level1_p2' : { 'level2_p2': random.randint(0, 10000) }}
146  val3 = { 'level1_p1': random.randint(0, 10000),
147  'level1_p2' : { 'level2_p2': random.randint(0, 10000) }}
148  val4 = { 'level1_p1': random.randint(0, 10000),
149  'level1_p2' : { 'level2_p2': random.randint(0, 10000) }}
150  full_dict = {}
151 
152  # set the val parameter at three levels so we can validate search
153  caller_id = '/root'
154  self.failIf(self.apiSuccess(master.hasParam(caller_id, '/param')))
155  self.apiSuccess(master.setParam(caller_id, '/param', val1))
156  # - test param
157  self.assertEquals('/param', self.apiSuccess(master.searchParam(caller_id, 'param')))
158 
159  self.failIf(self.apiSuccess(master.hasParam(caller_id, '/level1/param')))
160  self.apiSuccess(master.setParam(caller_id, '/level1/param', val2))
161  self.assertEquals(val2, self.apiSuccess(master.getParam(caller_id, '/level1/param')))
162  # - test search param
163  self.assertEquals('/param',
164  self.apiSuccess(master.searchParam(caller_id, 'param')))
165  self.assertEquals('/level1/param',
166  self.apiSuccess(master.searchParam('/level1/node', 'param')))
167 
168  self.failIf(self.apiSuccess(master.hasParam(caller_id, '/level1/level2/param')))
169  self.apiSuccess(master.setParam(caller_id, '/level1/level2/param', val3))
170  # - test search param
171  self.assertEquals('/param',
172  self.apiSuccess(master.searchParam(caller_id, 'param')))
173  self.assertEquals('/level1/param',
174  self.apiSuccess(master.searchParam('/level1/node', 'param')))
175  self.assertEquals('/level1/level2/param',
176  self.apiSuccess(master.searchParam('/level1/level2/node', 'param')))
177  self.assertEquals('/level1/level2/param',
178  self.apiSuccess(master.searchParam('/level1/level2/level3/level4/node', 'param')))
179 
180  self.failIf(self.apiSuccess(master.hasParam(caller_id, '/level1/level2/level3/level4/param')))
181  self.apiSuccess(master.setParam(caller_id, '/level1/level2/level3/level4/param', val4))
182  # - test search param
183  self.assertEquals('/param',
184  self.apiSuccess(master.searchParam(caller_id, 'param')))
185  self.assertEquals('/level1/param',
186  self.apiSuccess(master.searchParam('/level1/node', 'param')))
187  self.assertEquals('/level1/level2/param',
188  self.apiSuccess(master.searchParam('/level1/level2/node', 'param')))
189  self.assertEquals('/level1/level2/param',
190  self.apiSuccess(master.searchParam('/level1/level2/level3/node', 'param')))
191  self.assertEquals('/level1/level2/level3/level4/param',
192  self.apiSuccess(master.searchParam('/level1/level2/level3/level4/node', 'param')))
193 
194  # test completely different hierarchy, should go to top
195  self.assertEquals('/param', self.apiSuccess(master.searchParam('/not/level1/level2/level3/level4/node', 'param')))
196  # test looking for param/sub_param
197  tests = [('/param', '/not/level1/level2/level3/level4/node'),
198  ('/level1/param', '/level1/node'),
199  ('/level1/param', '/level1/notlevel2/notlevel3/node'),
200  ('/level1/level2/param', '/level1/level2/node'),
201  ('/level1/level2/param', '/level1/level2/level3/node'),
202  ('/level1/level2/param', '/level1/level2/notlevel3/notlevel3/node'),
203  ('/level1/level2/level3/level4/param', '/level1/level2/level3/level4/node'),
204  ('/level1/level2/level3/level4/param', '/level1/level2/level3/level4/l5/l6/node'),
205  ]
206  for pbase, caller_id in tests:
207  self.assertEquals(pbase + '/level1_p1',
208  self.apiSuccess(master.searchParam(caller_id, 'param/level1_p1')))
209  key = pbase+'/level1_p2/level2_p2'
210  self.assertEquals(key,
211  self.apiSuccess(master.searchParam(caller_id, 'param/level1_p2/level2_p2')))
212  # delete the sub key and repeat, should get the same result as searchParam does partial matches
213  # - we may have already deleted the parameter in a previous iteration, so make sure
214  if self.apiSuccess(master.hasParam(caller_id, key)):
215  self.apiSuccess(master.deleteParam(caller_id, key))
216  self.assertEquals(key,
217  self.apiSuccess(master.searchParam(caller_id, 'param/level1_p2/level2_p2')))
218  # to make sure that it didn't work spuriously, search for non-existent key
219  self.assertEquals(pbase + '/non_existent',
220  self.apiSuccess(master.searchParam(caller_id, 'param/non_existent')))
221  self.assertEquals(pbase + '/level1_p2/non_existent',
222  self.apiSuccess(master.searchParam(caller_id, 'param/level1_p2/non_existent')))
223 
224 
225  ## remove common keys that roslaunch places on param server
226  def _filterDict(self, d):
227  for k in ['run_id', 'roslaunch', 'rosversion', 'rosdistro']:
228  if k in d:
229  del d[k]
230  return d
231 
232  # testGetParam: test basic getParam behavior. Value encoding verified separately by testParamValues
233  def _testGetParam(self):
234  master = self.master
235  caller_id = '/node'
236  val = random.randint(0, 10000)
237 
238  full_dict = {}
239 
240  # very similar to has param sequence
241  self.failIf(self.apiSuccess(master.hasParam(caller_id, '/new_param')))
242  self.apiSuccess(master.setParam(caller_id, '/new_param', val))
243  full_dict['new_param'] = val
244  self.assertEquals(val, self.apiSuccess(master.getParam(caller_id, '/new_param')))
245  # test with relative-name resolution
246  self.assertEquals(val, self.apiSuccess(master.getParam(caller_id, 'new_param')))
247  # test full get
248  ps_full_dict = self.apiSuccess(master.getParam(caller_id, '/'))
249  self._filterDict(ps_full_dict)
250 
251  self.assertEquals(full_dict, ps_full_dict)
252 
253  # test with param in sub-namespace
254  val = random.randint(0, 10000)
255  self.failIf(self.apiSuccess(master.hasParam(caller_id, '/sub/sub2/new_param2')))
256  self.apiSuccess(master.setParam(caller_id, '/sub/sub2/new_param2', val))
257  full_dict['sub'] = {'sub2': { 'new_param2': val }}
258  self.assertEquals(val, self.apiSuccess(master.getParam(caller_id, '/sub/sub2/new_param2')))
259  # test with relative-name resolution
260  self.assertEquals(val, self.apiSuccess(master.getParam(caller_id, 'sub/sub2/new_param2')))
261  self.assertEquals(val, self.apiSuccess(master.getParam('/sub/node', 'sub2/new_param2')))
262  self.assertEquals(val, self.apiSuccess(master.getParam('/sub/sub2/node', 'new_param2')))
263  # test that parameter server allows gets across namespaces (#493)
264  self.assertEquals(val, self.apiSuccess(master.getParam('/foo/bar/baz/blah/node', '/sub/sub2/new_param2')))
265  # test full get
266  ps_full_dict = self.apiSuccess(master.getParam(caller_id, '/'))
267  self._filterDict(ps_full_dict)
268  self.assertEquals(full_dict, ps_full_dict)
269 
270 
271  # test that parameter server namespace-get (#587)
272  val1 = random.randint(0, 10000)
273  val2 = random.randint(0, 10000)
274  val3 = random.randint(0, 10000)
275 
276  self.failIf(self.apiSuccess(master.hasParam(caller_id, '/gains/P')))
277  self.failIf(self.apiSuccess(master.hasParam(caller_id, '/gains/I')))
278  self.failIf(self.apiSuccess(master.hasParam(caller_id, '/gains/D')))
279  self.failIf(self.apiSuccess(master.hasParam(caller_id, '/gains')))
280 
281  self.apiSuccess(master.setParam(caller_id, '/gains/P', val1))
282  self.apiSuccess(master.setParam(caller_id, '/gains/I', val2))
283  self.apiSuccess(master.setParam(caller_id, '/gains/D', val3))
284 
285  pid = {'P': val1, 'I': val2, 'D': val3}
286  full_dict['gains'] = pid
287  self.assertEquals(pid,
288  self.apiSuccess(master.getParam(caller_id, '/gains')))
289  self.assertEquals(pid,
290  self.apiSuccess(master.getParam(caller_id, '/gains/')))
291  ps_full_dict = self.apiSuccess(master.getParam(caller_id, '/'))
292  self._filterDict(ps_full_dict)
293  self.assertEquals(full_dict, ps_full_dict)
294 
295  self.failIf(self.apiSuccess(master.hasParam(caller_id, '/ns/gains/P')))
296  self.failIf(self.apiSuccess(master.hasParam(caller_id, '/ns/gains/I')))
297  self.failIf(self.apiSuccess(master.hasParam(caller_id, '/ns/gains/D')))
298  self.failIf(self.apiSuccess(master.hasParam(caller_id, '/ns/gains')))
299 
300  self.apiSuccess(master.setParam(caller_id, '/ns/gains/P', val1))
301  self.apiSuccess(master.setParam(caller_id, '/ns/gains/I', val2))
302  self.apiSuccess(master.setParam(caller_id, '/ns/gains/D', val3))
303  full_dict['ns'] = {'gains': pid}
304 
305  self.assertEquals(pid,
306  self.apiSuccess(master.getParam(caller_id, '/ns/gains')))
307  self.assertEquals({'gains': pid},
308  self.apiSuccess(master.getParam(caller_id, '/ns/')))
309  self.assertEquals({'gains': pid},
310  self.apiSuccess(master.getParam(caller_id, '/ns')))
311  ps_full_dict = self.apiSuccess(master.getParam(caller_id, '/'))
312  self._filterDict(ps_full_dict)
313  self.assertEquals(full_dict, ps_full_dict)
314 
315 
316  # testSetParam: test basic setParam behavior. Value encoding verified separately by testParamValues
317  def _testSetParam(self):
318  master = self.master
319  caller_id = '/node'
320  val = random.randint(0, 10000)
321 
322  # very similar to has param sequence
323  self.failIf(self.apiSuccess(master.hasParam(caller_id, '/new_param')))
324  self.apiSuccess(master.setParam(caller_id, '/new_param', val))
325  self.assertEquals(val, self.apiSuccess(master.getParam(caller_id, '/new_param')))
326  # test with relative-name resolution
327  self.assertEquals(val, self.apiSuccess(master.getParam(caller_id, 'new_param')))
328 
329  # test type mutation, including from dictionary to value and back
330  vals = ['a', {'a': 'b'}, 1, 1., 'foo', {'c': 'd'}, 4]
331  for v in vals:
332  self.apiSuccess(master.setParam(caller_id, '/multi/multi_param', v))
333  self.assertEquals(v, self.apiSuccess(master.getParam(caller_id, 'multi/multi_param')))
334 
335  # test with param in sub-namespace
336  val = random.randint(0, 10000)
337  self.failIf(self.apiSuccess(master.hasParam(caller_id, '/sub/sub2/new_param2')))
338  self.apiSuccess(master.setParam(caller_id, '/sub/sub2/new_param2', val))
339  self.assertEquals(val, self.apiSuccess(master.getParam(caller_id, '/sub/sub2/new_param2')))
340  # test with relative-name resolution
341  self.assertEquals(val, self.apiSuccess(master.getParam(caller_id, 'sub/sub2/new_param2')))
342  self.assertEquals(val, self.apiSuccess(master.getParam('/sub/node', 'sub2/new_param2')))
343  self.assertEquals(val, self.apiSuccess(master.getParam('/sub/sub2/node', 'new_param2')))
344 
345  # test that parameter server namespace-set (#587)
346  self.failIf(self.apiSuccess(master.hasParam(caller_id, '/gains/P')))
347  self.failIf(self.apiSuccess(master.hasParam(caller_id, '/gains/I')))
348  self.failIf(self.apiSuccess(master.hasParam(caller_id, '/gains/D')))
349  self.failIf(self.apiSuccess(master.hasParam(caller_id, '/gains')))
350 
351  pid = {'P': random.randint(0, 10000), 'I': random.randint(0, 10000), 'D': random.randint(0, 10000)}
352  self.apiSuccess(master.setParam(caller_id, '/gains', pid))
353  self.assertEquals(pid, self.apiSuccess(master.getParam(caller_id, '/gains')))
354  self.assertEquals(pid['P'], self.apiSuccess(master.getParam(caller_id, '/gains/P')))
355  self.assertEquals(pid['I'], self.apiSuccess(master.getParam(caller_id, '/gains/I')))
356  self.assertEquals(pid['D'], self.apiSuccess(master.getParam(caller_id, '/gains/D')))
357 
358  subns = {'gains1': pid, 'gains2': pid}
359  self.apiSuccess(master.setParam(caller_id, '/ns', subns))
360  self.assertEquals(pid['P'], self.apiSuccess(master.getParam(caller_id, '/ns/gains1/P')))
361  self.assertEquals(pid['I'], self.apiSuccess(master.getParam(caller_id, '/ns/gains1/I')))
362  self.assertEquals(pid['D'], self.apiSuccess(master.getParam(caller_id, '/ns/gains1/D')))
363  self.assertEquals(pid, self.apiSuccess(master.getParam(caller_id, '/ns/gains1')))
364  self.assertEquals(pid, self.apiSuccess(master.getParam(caller_id, '/ns/gains2')))
365  self.assertEquals(subns, self.apiSuccess(master.getParam(caller_id, '/ns/')))
366 
367  # test empty dictionary set
368  self.apiSuccess(master.setParam(caller_id, '/ns', {}))
369  # - param should still exist
370  self.assert_(self.apiSuccess(master.hasParam(caller_id, '/ns/')))
371  # - value should remain dictionary
372  self.assertEquals({}, self.apiSuccess(master.getParam(caller_id, '/ns/')))
373  # - value2 below /ns/ should be erased
374  self.failIf(self.apiSuccess(master.hasParam(caller_id, '/ns/gains1')))
375  self.failIf(self.apiSuccess(master.hasParam(caller_id, '/ns/gains1/P')))
376 
377 
378  # testParamValues: test storage of all XML-RPC compatible types"""
379  def _testParamValues(self):
380  try:
381  from xmlrpc.client import Binary
382  except ImportError:
383  from xmlrpclib import Binary
384  str_of_bytes = ''.join([chr(n) for n in range(0, 255)])
385  if not isinstance(str_of_bytes, bytes):
386  str_of_bytes = bytes(range(255))
387  testVals = [
388  ['int', [0, 1024, 2147483647, -2147483647]],
389  ['boolean', [True, False]],
390  #no longer testing null char
391  #['string', ['', '\0', 'x', 'hello', ''.join([chr(n) for n in range(0, 255)])]],
392  ['unicode-string', [u'', u'hello', b'Andr\302\202'.decode('utf-8'), b'\377\376A\000n\000d\000r\000\202\000'.decode('utf-16')]],
393  ['string-easy-ascii', [chr(n) for n in range(32, 128)]],
394 
395  #['string-mean-ascii-low', [chr(n) for n in range(9, 10)]], #separate for easier book-keeping
396  #['string-mean-ascii-low', [chr(n) for n in range(1, 31)]], #separate for easier book-keeping
397  #['string-mean-signed', [chr(n) for n in range(129, 256)]],
398  ['string', ['', 'x', 'hello-there', 'new\nline', 'tab\t']],
399  ['double', [0.0, math.pi, -math.pi, 3.4028235e+38, -3.4028235e+38]],
400  #TODO: microseconds?
401  ['datetime', [datetime.datetime(2005, 12, 6, 12, 13, 14), datetime.datetime(1492, 12, 6, 12, 13, 14)]],
402  ['base64', [Binary(b''), Binary(b'\0'), Binary(str_of_bytes)]],
403  ['array', [[], [1, 2, 3], ['a', 'b', 'c'], [0.0, 0.1, 0.2, 2.0, 2.1, -4.0],
404  [1, 'a', True], [[1, 2, 3], ['a', 'b', 'c'], [1.0, 2.1, 3.2]]]
405  ],
406  ]
407  master = self.master
408 
409  print("Putting parameters onto the server")
410  # put our params into the parameter server
411  contexts = ['', 'scope1', 'scope/sub1/sub2']
412  myState = {}
413  failures = []
414  for ctx in contexts:
415  self._setParam(ctx, myState, testVals, master)
416  self._checkParamState(myState)
417 
418  print("Deleting all of our parameters")
419  # delete all of our parameters
420  ctx = ''
421  count = 0
422  for key in list(myState.keys()):
423  count += 1
424  print("deleting [%s], [%s]" % (ctx, key))
425  self.apiSuccess(master.deleteParam(ctx, key))
426  del myState[key]
427  # far too intensive to check every time
428  if count % 50 == 0:
429  self._checkParamState(myState)
430  self._checkParamState(myState)
431 
433  """testEncapsulation: test encapsulation: setting same parameter at different levels"""
434  master = self.master
435  myState = {}
436  self._checkParamState(myState)
437 
438  testContexts = ['', 'en', 'en/sub1', 'en/sub2', 'en/sub1/sub2']
439  for c in testContexts:
440  c = make_global_ns(c)
441  testKey = 'param1'
442  testVal = random.randint(-1000000, 100000)
443  callerId = ns_join(c, "node")
444  trueKey = ns_join(c, testKey)
445  master.setParam(callerId, testKey, testVal)
446  myState[trueKey] = testVal
447  # make sure the master has the parameter under both keys and that they are equal
448  v1 = self.apiSuccess(master.getParam('/', trueKey))
449  v2 = self.apiSuccess(master.getParam(callerId, testKey))
450  assert v1 == v2, "[%s]: %s vs. [%s,%s]: %s"%(trueKey, v1, callerId, testKey, v2)
451  assert self.apiSuccess(master.hasParam(callerId, testKey)), testKey
452  assert self.apiSuccess(master.hasParam('node', trueKey)), trueKey
453 
454  self._checkParamState(myState)
455 
456  def _testPrivateNames(self):
457  master = self.master
458  myState = {}
459  self._checkParamState(myState)
460 
461  testContexts = ['', 'sub1', 'sub1/sub2', 'sub1/sub2/sub3']
462  for c in testContexts:
463  c = make_global_ns(c)
464  callerId = ns_join(c, "node")
465 
466  keyStub = "param1"
467  testKey = "~%s"%keyStub
468  testVal = random.randint(-1000000, 100000)
469  master.setParam(callerId, testKey, testVal)
470  trueKey = ns_join(callerId, keyStub)
471  myState[trueKey] = testVal
472 
473  print("checking", trueKey)
474  v1 = self.apiSuccess(master.getParam('/', trueKey))
475  v2 = self.apiSuccess(master.getParam(callerId, testKey))
476  assert v1 == v2, "[%s]: %s vs. [%s,%s]: %s"%(trueKey, v1, callerId, testKey, v2)
477  assert self.apiSuccess(master.hasParam(callerId, testKey)), testKey
478  assert self.apiSuccess(master.hasParam('node', trueKey)), trueKey
479 
480  #test setting a local param on a different node
481  testKey = ns_join("altnode","param2")
482  testVal = random.randint(-1000000, 100000)
483  master.setParam(callerId, testKey, testVal)
484  trueKey = ns_join(c, testKey)
485  altCallerId = ns_join(c, "altnode")
486  myState[trueKey] = testVal
487 
488  v1 = self.apiSuccess(master.getParam(altCallerId, "~param2"))
489  v2 = self.apiSuccess(master.getParam(callerId, testKey))
490  assert v1 == v2
491  assert self.apiSuccess(master.hasParam(callerId, testKey)), testKey
492  assert self.apiSuccess(master.hasParam(altCallerId, "~param2"))
493 
494  self._checkParamState(myState)
495 
496  ## testScopeUp: test that parameter server can chain up scopes to find/delete parameters
497  def _testScopeUp(self):
498  master = self.master
499  myState = {}
500  self._checkParamState(myState)
501 
502  testVal = random.randint(-1000000, 100000)
503  master.setParam('/', 'uparam1', testVal)
504  myState['/uparam1'] = testVal
505  assert testVal == self.apiSuccess(master.getParam('/node', 'uparam1'))
506  assert testVal == self.apiSuccess(master.getParam('/uptest/node', 'uparam1'))
507  assert testVal == self.apiSuccess(master.getParam('/uptest/sub1/node', 'uparam1'))
508  assert testVal == self.apiSuccess(master.getParam('/uptest/sub1/sub2/node', 'uparam1'))
509 
510  testVal = random.randint(-1000000, 100000)
511  master.setParam('/uptest2/sub1/node', 'uparam2', testVal)
512  myState['/uptest2/sub1/uparam2'] = testVal
513  assert testVal == self.apiSuccess(master.getParam('/uptest2/sub1/node', 'uparam2'))
514  assert testVal == self.apiSuccess(master.getParam('/uptest2/sub1/sub2/node', 'uparam2'))
515  assert testVal == self.apiSuccess(master.getParam('/uptest2/sub1/sub2/sub3/node', 'uparam2'))
516 
517  # make sure we can ascend then descend
518  if 0:
519  testVal = random.randint(-1000000, 100000)
520  master.setParam('/uptest3/node', 'alt2/alt3/uparam3', testVal)
521  myState['/uptest3/alt2/alt3/uparam3'] = testVal
522  assert testVal == self.apiSuccess(master.getParam('/uptest3/sub1/node', 'alt2/alt3/uparam3'))
523  assert testVal == self.apiSuccess(master.getParam('/uptest3/sub1/sub2/node', 'alt2/alt3/uparam3'))
524  assert testVal == self.apiSuccess(master.getParam('/uptest3/sub1/sub2/sub3/node', 'alt2/alt3/uparam3'))
525  self._checkParamState(myState)
526 
527  #verify upwards deletion
528  self.apiSuccess(master.deleteParam('/uptest/sub1/sub2/node', 'uparam1'))
529  del myState['/uparam1']
530  self._checkParamState(myState)
531  self.apiSuccess(master.deleteParam('/uptest2/sub1/sub2/sub3/node', 'uparam2'))
532  del myState['/uptest2/sub1/uparam2']
533  if 0:
534  self.apiSuccess(master.deleteParam('/uptest3/sub1/sub2/sub3/node', 'alt2/alt3/uparam3'))
535  del myState['/uptest3/alt2/alt3/uparam3']
536  self._checkParamState(myState)
537 
538  ## testScopeDown: test scoping rules for sub contexts
539  def _testScopeDown(self):
540  master = self.master
541  myState = {}
542  self._checkParamState(myState)
543 
544  # test that parameter server down not chain down scopes
545  testVal = random.randint(-1000000, 100000)
546  master.setParam('/down/one/two/three/node', 'dparam1', testVal)
547  myState['/down/one/two/three/dparam1'] = testVal
548  assert not self.apiSuccess(master.hasParam('/down/one/node', 'dparam1'))
549  assert not self.apiSuccess(master.hasParam('/down/one/two/node', 'dparam1'))
550  self.apiError(master.getParam('/down/one/node', 'dparam1'))
551  self.apiError(master.getParam('/down/one/two/node', 'dparam1'))
552 
553  # test that parameter server allows setting of parameters further down (1)
554  testVal = random.randint(-1000000, 100000)
555  master.setParam('/', '/down2/dparam2', testVal)
556  myState['/down2/dparam2'] = testVal
557  assert testVal == self.apiSuccess(master.getParam('/down2/node', 'dparam2'))
558  assert testVal == self.apiSuccess(master.getParam('/', 'down2/dparam2'))
559  assert not self.apiSuccess(master.hasParam('/down2/node', 'down2/dparam2'))
560  self.apiError(master.getParam('/down2/node', 'down2/dparam2'))
561  self._checkParamState(myState)
562 
563  # test that parameter server allows setting of parameters further down (2)
564  testVal = random.randint(-1000000, 100000)
565  master.setParam('/', '/down3/sub/dparam3', testVal)
566  myState['/down3/sub/dparam3'] = testVal
567  assert testVal == self.apiSuccess(master.getParam('/down3/sub/node', 'dparam3'))
568  assert testVal == self.apiSuccess(master.getParam('/down3/node', 'sub/dparam3'))
569  assert testVal == self.apiSuccess(master.getParam('/', 'down3/sub/dparam3'))
570  assert testVal == self.apiSuccess(master.getParam('/down3/sub/sub2/node', 'dparam3'))
571  assert not self.apiSuccess(master.hasParam('/down3/sub/node', 'sub/dparam3'))
572  assert not self.apiSuccess(master.hasParam('/down3/sub/node', 'down3/sub/dparam3'))
573  self.apiError(master.getParam('/down3/sub/node', 'sub/dparam3'))
574  self.apiError(master.getParam('/down3/sub/node', 'down3/sub/dparam3'))
575  self._checkParamState(myState)
576 
577  # test downwards deletion
578  master.setParam('/', '/down4/sub/dparam4A', testVal)
579  self.apiSuccess(master.deleteParam('/down4/sub/node', 'dparam4A'))
580  assert not self.apiSuccess(master.hasParam('/down4/sub', 'dparam4A'))
581  master.setParam('/', '/down4/sub/dparam4B', testVal)
582  self.apiSuccess(master.deleteParam('/down4/node', 'sub/dparam4B'))
583  assert not self.apiSuccess(master.hasParam('/down4/sub', 'dparam4B'))
584  master.setParam('/', '/down4/sub/dparam4C', testVal)
585  self.apiSuccess(master.deleteParam('/', 'down4/sub/dparam4C'))
586  assert not self.apiSuccess(master.hasParam('/down4/sub/node', 'dparam4C'))
587  self._checkParamState(myState)
588 
def _filterDict(self, d)
remove common keys that roslaunch places on param server
def _testScopeUp(self)
testScopeUp: test that parameter server can chain up scopes to find/delete parameters ...
def _testScopeDown(self)
testScopeDown: test scoping rules for sub contexts
def _setParam(self, ctx, myState, testVals, master)


test_rosmaster
Author(s): Ken Conley
autogenerated on Sun Feb 3 2019 03:30:20