testMaster.py
Go to the documentation of this file.
1 # Software License Agreement (BSD License)
2 #
3 # Copyright (c) 2008, Willow Garage, Inc.
4 # All rights reserved.
5 #
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions
8 # are met:
9 #
10 # * Redistributions of source code must retain the above copyright
11 # notice, this list of conditions and the following disclaimer.
12 # * Redistributions in binary form must reproduce the above
13 # copyright notice, this list of conditions and the following
14 # disclaimer in the documentation and/or other materials provided
15 # with the distribution.
16 # * Neither the name of Willow Garage, Inc. nor the names of its
17 # contributors may be used to endorse or promote products derived
18 # from this software without specific prior written permission.
19 #
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
24 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
25 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
26 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
28 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
30 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31 # POSSIBILITY OF SUCH DAMAGE.
32 #
33 # Revision $Id$
34 """
35 testMaster: ROS integration test cases for master XML-RPC API
36 
37 To run, invoke nodes/testMaster
38 """
39 
40 import os, sys, getopt, traceback, logging, socket
41 import datetime, math, random
42 try:
43  from xmlrpc.client import DateTime, ServerProxy
44 except ImportError:
45  from xmlrpclib import DateTime, ServerProxy
46 import unittest
47 import rospy
48 from rostest import *
49 from testSlave import msMain
50 
51 MYPKG = 'test_rosmaster'
52 
53 HAS_PARAM = True
54 
55 singletest = 'testGetFlowNames'
56 singletest = None
57 #singletest = 'testDotLocalNames'
58 
59 def verifyNodeAddress(master, callerId, name, machine, addr, port):
60  if not name:
61  raise Exception("name is None")
62  rmachine, raddr, rport = apiSuccess(master.getNodeAddress(callerId, name))
63  if machine:
64  assert rmachine == machine, "Node [%s] is running on '%s' instead of '%s'"%(name, rmachine, machine)
65  if port:
66  assert rport == port, "Node [%s] is running on '%s' instead of '%s'"%(name, rport, port)
67  else:
68  assert rport, "Node [%s] does not have a registered port"%name
69  if addr:
70  if addr == 'localhost':
71  addr = socket.gethostname()
72  if raddr == 'localhost':
73  raddr = socket.gethostname()
74  addrinfo = socket.getaddrinfo(addr, 0, 0, 0, socket.SOL_TCP)
75  raddrinfo = socket.getaddrinfo(raddr, 0, 0, 0, socket.SOL_TCP)
76  assert raddrinfo == addrinfo, "%s!=%s" % (raddrinfo, addrinfo)
77  #ping the node
78  apiSuccess(ServerProxy("http://%s:%s/"%(raddr, rport)).getPid(''))
79 
80 def testGraphState(master, graphNodes, graphFlows):
81  graph = apiSuccess(master.getGraph(''))
82  diff = set(graph[0]) ^ set(graphNodes)
83  assert not diff, "Graph nodes %s does not match expected %s: %s"%(graph[0], graphNodes, diff)
84  # stringify for comparison
85  expectedFlows = ["%s:%s:1"%f for f in graphFlows] # :1 = connected
86  print graph[1]
87  remoteFlows = ["%s:%s:%s"%(src,snk,c) for (src,snk,c) in graph[1]]
88  if expectedFlows or remoteFlows:
89  #assert set(expectedFlows) ^ set(remoteFlows), "Graph flows [%s] does not match expected [%s]"%(graph[1], graphFlows)
90  diff = set(expectedFlows) ^ set(remoteFlows)
91  assert not diff, "Graph flows %s does not match expected %s: %s"%(expectedFlows, remoteFlows, diff)
92 
93 def testParamState(master, myState):
94  callerId = 'master' #validate from root
95  for (k, v) in myState.items():
96  if HAS_PARAM:
97  assert apiSuccess(master.hasParam(callerId, k))
98  print "verifying parameter %s"%k
99  v2 = apiSuccess(master.getParam(callerId, k))
100  if isinstance(v2, DateTime):
101  assert DateTime(v) == v2, "[%s]: %s != %s, %s"%(k, v, v2, v2.__class__)
102  else:
103  assert v == v2, "[%s]: %s != %s, %s"%(k, v, v2, v2.__class__)
104  paramNames = myState.keys()
105  remoteParamNames = apiSuccess(master.getParamNames(callerId))
106  assert not set(paramNames) ^ set(remoteParamNames), "parameter server keys do not match local"
107 
108 class ParamServerTestCase(ROSGraphTestCase):
109  """Parameter Server API Test Cases"""
110 
111  def setUp(self):
112  super(ParamServerTestCase, self).setUp()
113 
114  def tearDown(self):
115  super(ParamServerTestCase, self).tearDown()
116 
117  def _testSetParam(self, ctx, myState, testVals, master):
118  for type, vals in testVals:
119  try:
120  if ctx:
121  callerId = "%s.node"%ctx
122  else:
123  callerId = "node"
124  count = 0
125  for val in vals:
126  key = "%s-%s"%(type,count)
127  print "master.setParam(%s,%s,%s)"%(callerId, key, val)
128  master.setParam(callerId, key, val)
129  if HAS_PARAM:
130  assert apiSuccess(master.hasParam(callerId, key))
131  if ctx:
132  trueKey = "%s.%s"%(ctx, key)
133  else:
134  trueKey = key
135  myState[trueKey] = val
136  count += 1
137  except:
138  assert "getParam failed on type[%s], val[%s]"%(type,val)
139  testParamState(master, myState)
140 
141  def testParamValues(self):
142  """testParamValues: test storage of all XML-RPC compatible types"""
143  try:
144  from xmlrpc.client import Binary
145  except ImportError:
146  from xmlrpclib import Binary
147  testVals = [
148  ['int', [0, 1024, 2147483647, -2147483647]],
149  ['boolean', [True, False]],
150  ['string', ['', '\0', 'x', 'hello', ''.join([chr(n) for n in range(0, 255)])]],
151  ['double', [0.0, math.pi, -math.pi, 3.4028235e+38, -3.4028235e+38]],
152  #TODO: microseconds?
153  ['datetime', [datetime.datetime(2005, 12, 6, 12, 13, 14), datetime.datetime(1492, 12, 6, 12, 13, 14)]],
154  ['base64', [Binary(''), Binary('\0'), Binary(''.join([chr(n) for n in range(0, 255)]))]],
155  ['struct', [{ "a": 2, "b": 4},
156  {"a" : "b", "c" : "d"},
157  {"a" : {"b" : { "c" : "d"}}}]],
158  ['array', [[], [1, 2, 3], ['a', 'b', 'c'], [0.0, 0.1, 0.2, 2.0, 2.1, -4.0],
159  [1, 'a', math.pi], [[1, 2, 3], ['a', 'b', 'c'], [1.0, 2.1, 3.2]]]
160  ],
161  ]
162  master = self.master
163 
164  print "Putting parameters onto the server"
165  # put our params into the parameter server
166  contexts = ['', 'scope1', 'scope2', 'scope.subscope1', 'scope.sub1.sub2']
167  myState = {}
168  for ctx in contexts:
169  self._testSetParam(ctx, myState, testVals, master)
170 
171  print "Deleting all of our parameters"
172  # delete all of our parameters
173  paramKeys = myState.keys()
174  for key in paramKeys:
175  apiSuccess(master.deleteParam('', key))
176  del myState[key]
177  testParamState(master, myState)
178 
179  def testEncapsulation(self):
180  """testEncapsulation: test encapsulation: setting same parameter at different levels"""
181  master = self.master
182  myState = {}
183  testParamState(master, myState)
184 
185  testContexts = ['', 'en', 'en.sub1', 'en.sub2', 'en.sub1.sub2']
186  for c in testContexts:
187  testKey = 'param1'
188  testVal = random.randint(-1000000, 100000)
189  if c:
190  callerId = "%s.node"%c
191  trueKey = "%s.%s"%(c,testKey)
192  else:
193  callerId ="node"
194  trueKey = testKey
195  master.setParam(callerId, testKey, testVal)
196  myState[trueKey] = testVal
197  # make sure the master has the parameter under both keys and that they are equal
198  v1 = apiSuccess(master.getParam('', trueKey))
199  v2 = apiSuccess(master.getParam(callerId, testKey))
200  assert v1 == v2, "[%s]: %s vs. [%s,%s]: %s"%(trueKey, v1, callerId, testKey, v2)
201  if HAS_PARAM:
202  assert apiSuccess(master.hasParam(callerId, testKey)), testKey
203  assert apiSuccess(master.hasParam('node', trueKey)), trueKey
204 
205  testParamState(master, myState)
206 
207  def testDotLocalNames(self):
208  master = self.master
209  myState = {}
210  testParamState(master, myState)
211 
212  testContexts = ['', 'sub1', 'sub1.sub2', 'sub1.sub2.sub3']
213  for c in testContexts:
214  if c:
215  callerId = "%s.node"%c
216  else:
217  callerId = "node"
218  testKey = ".param1"
219  testVal = random.randint(-1000000, 100000)
220  master.setParam(callerId, testKey, testVal)
221  trueKey = callerId+testKey
222  myState[trueKey] = testVal
223 
224  v1 = apiSuccess(master.getParam('node', trueKey))
225  v2 = apiSuccess(master.getParam(callerId, testKey))
226  assert v1 == v2, "[%s]: %s vs. [%s,%s]: %s"%(trueKey, v1, callerId, testKey, v2)
227  if HAS_PARAM:
228  assert apiSuccess(master.hasParam(callerId, testKey)), testKey
229  assert apiSuccess(master.hasParam('node', trueKey)), trueKey
230 
231  #test setting a local param on a different node
232  testKey = "altnode.param2"
233  testVal = random.randint(-1000000, 100000)
234  master.setParam(callerId, testKey, testVal)
235  if c:
236  trueKey = "%s.%s"%(c,testKey)
237  altCallerId = "%s.altnode"%c
238  else:
239  trueKey = testKey
240  altCallerId = "altnode"
241  myState[trueKey] = testVal
242 
243  v1 = apiSuccess(master.getParam(altCallerId, ".param2"))
244  v2 = apiSuccess(master.getParam(callerId, testKey))
245  assert v1 == v2
246  if HAS_PARAM:
247  assert apiSuccess(master.hasParam(callerId, testKey)), testKey
248  assert apiSuccess(master.hasParam(altCallerId, ".param2"))
249 
250  testParamState(master, myState)
251 
252  def testScopeUp(self):
253  """testScopeUp: test that parameter server can chain up scopes to find/delete parameters"""
254  master = self.master
255  myState = {}
256  testParamState(master, myState)
257 
258  testVal = random.randint(-1000000, 100000)
259  master.setParam('', 'uparam1', testVal)
260  myState['uparam1'] = testVal
261  assert testVal == apiSuccess(master.getParam('node', 'uparam1'))
262  assert testVal == apiSuccess(master.getParam('uptest.node', 'uparam1'))
263  assert testVal == apiSuccess(master.getParam('uptest.sub1.node', 'uparam1'))
264  assert testVal == apiSuccess(master.getParam('uptest.sub1.sub2.node', 'uparam1'))
265 
266  testVal = random.randint(-1000000, 100000)
267  master.setParam('uptest2.sub1.node', 'uparam2', testVal)
268  myState['uptest2.sub1.uparam2'] = testVal
269  assert testVal == apiSuccess(master.getParam('uptest2.sub1.node', 'uparam2'))
270  assert testVal == apiSuccess(master.getParam('uptest2.sub1.sub2.node', 'uparam2'))
271  assert testVal == apiSuccess(master.getParam('uptest2.sub1.sub2.sub3.node', 'uparam2'))
272  testParamState(master, myState)
273 
274  #verify upwards deletion
275  apiSuccess(master.deleteParam('uptest.sub1.sub2.node', 'uparam1'))
276  del myState['uparam1']
277  testParamState(master, myState)
278  apiSuccess(master.deleteParam('uptest2.sub1.sub2.sub3.node', 'uparam2'))
279  del myState['uptest2.sub1.uparam2']
280  testParamState(master, myState)
281 
282  def testScopeDown(self):
283  """testScopeDown: test scoping rules for sub contexts"""
284  master = self.master
285  myState = {}
286  testParamState(master, myState)
287 
288  # test that parameter server down not chain down scopes
289  testVal = random.randint(-1000000, 100000)
290  master.setParam('down.one.two.three.node', 'dparam1', testVal)
291  myState['down.one.two.three.dparam1'] = testVal
292  if HAS_PARAM:
293  assert not apiSuccess(master.hasParam('down.one', 'dparam1'))
294  assert not apiSuccess(master.hasParam('down.one.two', 'dparam1'))
295  apiError(master.getParam('down.one.node', 'dparam1'))
296  apiError(master.getParam('down.one.two.node', 'dparam1'))
297 
298  # test that parameter server allows setting of parameters further down (1)
299  testVal = random.randint(-1000000, 100000)
300  master.setParam('node', 'down2.dparam2', testVal)
301  myState['down2.dparam2'] = testVal
302  assert testVal == apiSuccess(master.getParam('down2.node', 'dparam2'))
303  assert testVal == apiSuccess(master.getParam('', 'down2.dparam2'))
304  if HAS_PARAM:
305  assert not apiSuccess(master.hasParam('down2.node', 'down2.dparam2'))
306  apiError(master.getParam('down2.node', 'down2.dparam2'))
307  testParamState(master, myState)
308 
309  # test that parameter server allows setting of parameters further down (2)
310  testVal = random.randint(-1000000, 100000)
311  master.setParam('node', 'down3.sub.dparam3', testVal)
312  myState['down3.sub.dparam3'] = testVal
313  assert testVal == apiSuccess(master.getParam('down3.sub.node', 'dparam3'))
314  assert testVal == apiSuccess(master.getParam('down3.node', 'sub.dparam3'))
315  assert testVal == apiSuccess(master.getParam('', 'down3.sub.dparam3'))
316  assert testVal == apiSuccess(master.getParam('down3.sub.sub2.node', 'dparam3'))
317  if HAS_PARAM:
318  assert not apiSuccess(master.hasParam('down3.sub.node', 'sub.dparam3'))
319  assert not apiSuccess(master.hasParam('down3.sub.node', 'down3.sub.dparam3'))
320  apiError(master.getParam('down3.sub.node', 'sub.dparam3'))
321  apiError(master.getParam('down3.sub.node', 'down3.sub.dparam3'))
322  testParamState(master, myState)
323 
324  # test downwards deletion
325  master.setParam('node', 'down4.sub.dparam4A', testVal)
326  apiSuccess(master.deleteParam('down4.sub.node', 'dparam4A'))
327  if HAS_PARAM:
328  assert not apiSuccess(master.hasParam('down4.sub', 'dparam4A'))
329  master.setParam('node', 'down4.sub.dparam4B', testVal)
330  apiSuccess(master.deleteParam('down4.node', 'sub.dparam4B'))
331  if HAS_PARAM:
332  assert not apiSuccess(master.hasParam('down4.sub', 'dparam4B'))
333  master.setParam('node', 'down4.sub.dparam4C', testVal)
334  apiSuccess(master.deleteParam('', 'down4.sub.dparam4C'))
335  if HAS_PARAM:
336  assert not apiSuccess(master.hasParam('down4.sub.node', 'dparam4C'))
337  testParamState(master, myState)
338 
339 class MasterTestCase(ROSGraphTestCase):
340  """Master API Test Cases -- those not covered by ParamServer and AddKillNode"""
341 
342  def setUp(self):
343  super(MasterTestCase, self).setUp()
344 
345  def tearDown(self):
346  super(MasterTestCase, self).tearDown()
347 
348  def _verifyFlowNameState(self, master, state):
349  flows = apiSuccess(master.getFlowNames('node1', ''))
350  assert len(flows) == len(state.values()), "Master reported a different number of flows"
351  for val in state.values():
352  assert val in flows, "flows does not contain %s : %s"%(val, flows)
353 
354  def testPromoteFlow(self):
355  master = self.master
356  state = {}
357  callerId = 'node1'
358  type = 'common_flows/String'
359  #setup node1 with outflows outflow1..3 and inflow1..3
360  apiSuccess(master.registerNode(callerId, 'node1', 'localhost', 1234, []))
361  for i in range(1, 4):
362  for dir in ['inflow', 'outflow']:
363  locator = 'node1:%s%s'%(dir, i)
364  apiSuccess(master.registerFlow(callerId, 'node1', locator, dir, type))
365  state[locator] = [locator, dir, type, 0]
366 
367  #test promote of a non-existent flow
368  apiError(master.promoteFlow(callerId, 'node1:notAFlow', 'node1:newNotAFlow'))
369  # - :outflow1 should resolve to parent, which is a non-existent flow
370  apiError(master.promoteFlow(callerId, ':outflow1', 'node1:newNotAFlow'))
371 
372  ## Play with Outflows
373  #successfully promote outflow1 to global namespace
374  dir = 'outflow'
375  apiSuccess(master.promoteFlow(callerId, 'node1:outflow1', ':outflow1'))
376  state[':outflow1'] = [':outflow1', dir, type, 1]
377  # - verify with .local syntax
378  apiSuccess(master.promoteFlow(callerId, '.:outflow2', ':outflow2'))
379  state[':outflow2'] = [':outflow2', dir, type, 1]
380 
381  ## Play with Inflows
382  #successfully promote inflow1 to the same namespace
383  dir = 'inflow'
384  tests = [
385  ['node1:inflow1', 'node1:newInflow1'],
386  ['node1:inflow2', 'sibling:inflow2'],
387  ['node1:inflow3', 'node1:subgraph1:inflow3']
388  ]
389  for source, target in tests:
390  apiSuccess(master.promoteFlow(callerId, source, target))
391  state[target] = [target, dir, type, 1]
392 
393  self._verifyFlowNameState(master, state)
394 
395  #TODO: test promotion to an already promoted/registered flow name
396  apiError(master.promoteFlow(callerId, 'node1:outflow1', 'node1:outflow2'))
397  apiError(master.promoteFlow(callerId, 'node1:outflow2', ':outflow1'))
398  apiError(master.promoteFlow(callerId, 'node1:inflow1', 'sibling:inflow2'))
399 
400  #TODO: test promote of a promoted flow
401  #TODO: test promote with both :flow and .:flow
402  #TODO: test name resolution with subgraphs and callerIds better
403 
404  def testRegisterFlow(self):
405  master = self.master
406  state = {}
407  type = 'common_flows/String'
408  for callerId in ['node1', 'subgraph.node1', 'grandparent.parent.node1']:
409  #setup node1 with outflows outflow1..3 and inflow1..3
410  apiSuccess(master.registerNode(callerId, 'node1', 'localhost', 1234, []))
411  for i in range(1, 4):
412  for dir in ['inflow', 'outflow']:
413  if dir == 'inflow':
414  locator = 'node1:%s%s'%(dir, i)
415  else:
416  locator = '.:%s%s'%(dir, i) #test .local resolution
417  apiSuccess(master.registerFlow(callerId, 'node1', locator, dir, type))
418  realLocator = '%s:%s%s'%(callerId, dir, i)
419  state[realLocator] = [realLocator, dir, type, 0]
420  #test register on a non-existent node
421  apiError(master.registerFlow(callerId, 'notNode', 'notANode:outflow', 'outflow', type))
422  #test register on bad locators
423  apiError(master.registerFlow(callerId, 'node1', '', 'outflow', type))
424  apiError(master.registerFlow(callerId, 'node1', 'badflow1', 'outflow', type))
425  #test register on a bad direction
426  apiError(master.registerFlow(callerId, 'node1', 'node1:badflow2', 'spiralflow', type))
427  #test register on invalid types
428  apiError(master.registerFlow(callerId, 'node1', 'node1:badflow3', 'outflow', ''))
429  apiError(master.registerFlow(callerId, 'node1', 'node1:badflow4', 'outflow', 'faketype'))
430 
431  #test register to an already promoted/registered flow name
432  apiError(master.registerFlow(callerId, 'node1', 'node1:outflow1', 'outflow', type))
433  apiSuccess(master.promoteFlow(callerId, 'node1:outflow1', 'node1:newOutflow1'))
434  state['%s:newOutflow1'%callerId] = ['node1:newOutflow1', dir, type, 1]
435  apiError(master.registerFlow(callerId, 'node1', 'node1:newOutflow1', 'outflow', type))
436 
437  self._verifyFlowNameState(master, state)
438 
440  master = self.master
441  state = {}
442  type = 'common_flows/String'
443  for callerId in ['node1', 'subgraph.node1', 'grandparent.parent.node1']:
444  #setup node1 with outflows outflow1..3 and inflow1..3
445  apiSuccess(master.registerNode(callerId, 'node1', 'localhost', 1234, []))
446  for i in range(1, 4):
447  for dir in ['inflow', 'outflow']:
448  if dir == 'inflow':
449  locator = rlocator = 'node1:%s%s'%(dir, i)
450  else:
451  locator = '.:%s%s'%(dir, i) #test .local resolution
452  rlocator = 'node1:%s%s'%(dir, i) #test .local resolution
453  apiSuccess(master.registerFlow(callerId, 'node1', locator, dir, type))
454  realLocator = '%s:%s%s'%(callerId, dir, i)
455  state[realLocator] = [realLocator, dir, type, 0]
456 
457  self._verifyFlowNameState(master, state)
458 
459  #test bad unregisters
460  apiError(master.unregisterFlow(callerId, 'notANode:outflow'))
461  apiError(master.unregisterFlow(callerId, ''))
462  apiError(master.unregisterFlow(callerId.replace('node1', 'node2'), 'outflow1'))
463 
464  apiSuccess(master.unregisterFlow(callerId, 'node1:outflow1'))
465  del state['%s:outflow1'%callerId]
466  apiSuccess(master.unregisterFlow(callerId, '.:outflow2'))
467  del state['%s:outflow2'%callerId]
468  apiSuccess(master.unregisterFlow(callerId.replace('node1', 'node2'), 'node1:outflow3'))
469  del state['%s:outflow3'%callerId]
470  apiSuccess(master.unregisterFlow('master', '%s:inflow1'%callerId))
471  del state['%s:inflow1'%callerId]
472 
473  self._verifyFlowNameState(master, state)
474 
475  #test register to an already unregistered flow
476  apiError(master.unregisterFlow(callerId, 'node1:outflow1'))
477 
478  #test transitive unregisters
479  apiSuccess(master.promoteFlow(callerId, 'node1:inflow3', ':inflow3A'))
480  apiSuccess(master.promoteFlow(callerId, ':inflow3A', ':inflow3B'))
481  # - should unregister both 3A and 3B
482  apiSuccess(master.unregisterFlow(callerId, 'node1:inflow3'))
483  del state['%s:inflow3'%callerId]
484 
485  self._verifyFlowNameState(master, state)
486 
487  def testGetFlowNames(self):
488  master = self.master
489  pkg,node = testNode
490  subgraph = 'sub1.sub2'
491  name = 'testGFN_A'
492  port = apiSuccess(master.addNode('caller', subgraph, name, pkg, node, TEST_MACHINE, 0))
493  #testNode must have :in and :out
494  #:in and :out
495  testFlowNames = ["%s:%s"%(name, v) for v in ["in", "out"]]
496  print "FLOW NAMES", apiSuccess(master.getFlowNames('master', ''))
497  flows = apiSuccess(master.getFlowNames('%s.caller'%subgraph, ''))
498  assert not set([x[0] for x in flows]) ^ set(testFlowNames), "%s vs. %s"%([x[0] for x in flows], testFlowNames)
499 
500  inDirs = [x[1] for x in flows if x[0].endswith(':in')]
501  outDirs = [x[1] for x in flows if x[0].endswith(':out')]
502  assert not filter(lambda x: x != "inflow", inDirs), inDirs
503  assert not filter(lambda x: x != "outflow", outDirs), outDirs
504  assert not filter(lambda x: x != "common_flows/String", [x[2] for x in flows]) #all flow types should be String
505  assert not filter(lambda x: x, [x[3] for x in flows]) #promoted flag should all be zero
506 
507  #test callerId scoping and subgraph parameter
508  testFlowNames = ["%s.%s:%s"%(subgraph, name, v) for v in ["in", "out"]]
509  flows = apiSuccess(master.getFlowNames('caller', subgraph))
510  assert not set([x[0] for x in flows]) ^ set(testFlowNames), "%s vs. %s"%([x[0] for x in flows], testFlowNames)
511  flows = apiSuccess(master.getFlowNames('caller', "%s.%s"%(subgraph, name)))
512  assert not set([x[0] for x in flows]) ^ set(testFlowNames), "%s vs. %s"%([x[0] for x in flows], testFlowNames)
513 
514  testFlowNames = ["sub2.%s:%s"%(name, v) for v in ["in", "out"]]
515  flows = apiSuccess(master.getFlowNames('sub1.caller', 'sub2'))
516  assert not set([x[0] for x in flows]) ^ set(testFlowNames), "%s vs. %s"%([x[0] for x in flows], testFlowNames)
517 
518 
519  testFlowNames = ["%s.%s:%s"%(subgraph, name, v) for v in ["in", "out"]]
520  flows = apiSuccess(master.getFlowNames('caller', ''))
521  flowNames = [x[0] for x in flows]
522  assert not set(flowNames) ^ set(testFlowNames)
523 
524 
526  def validate(val, callerId, name, port):
527  assert len(val) == 3, "getNodeAddress did not return 3-element list for value field"
528  assert type(val[0]) == str and type(val[1]) == str and type(val[2]) == int,\
529  "getNodeAddress did not return (str, str, int) for value field"
530  verifyNodeAddress(master, callerId, name, TEST_MACHINE, 'localhost', port)
531 
532  #NOTE: this does not do full coverage on rospy, as many of the branch
533  #conditions in rospy are inaccessible via external call (they required
534  #corrupted data to be inserted into the master, which registerNode prevents)
535 
536  master = self.master
537  #test that invalid case still meets type spec
538  code, msg, val = master.getNodeAddress('', 'testGetNodeAddress-fake')
539  assert code == -1, "getNodeAddress did not return failure code 0 for non-existent node"
540  assert len(val) == 3, "getNodeAddress did not return 3-element list for value field in error case"
541  assert type(val[0]) == str and type(val[1]) == str and type(val[2]) == int,\
542  "getNodeAddress did not return (str, str, int) for value field in error case"
543 
544  #start a node to test valid case against
545  name = 'testGetNodeAddress-1'
546  port = 7981
547  pkg, node = testNode
548  apiSuccess(master.addNode('', '', name, pkg, node, TEST_MACHINE, port))
549  val = apiSuccess(master.getNodeAddress('', name))
550  validate(val, '', name, port)
551 
552  #verify Graph Resource Name scoping rules
553  name = 'testGetNodeAddress-2'
554  port = 7982
555  apiSuccess(master.addNode('', 'gna1.gna2', name, pkg, node, TEST_MACHINE, port))
556  #make sure we have a node
557  tests = [
558  #test exact name matches
559  ['gna1.gna2.node', name],
560  ['gna1.node', 'gna2.%s'%name],
561  ['', 'gna1.gna2.%s'%name],
562  #make sure that gNA searches upwards
563  ['gna1.gna2.gna3.node', name],
564  ]
565  for test in tests:
566  callerId, name = test
567  validate(apiSuccess(master.getNodeAddress(callerId, name)), callerId, name, port)
568 
569  #make sure that it gNA doesn't search upwards when subcontext is specified
570  val = apiError(master.getNodeAddress('gna1.gna2', 'gna3.%s'%name))
571 
572  def _verifyNodeDead(self, port):
573  testUri = "http://localhost:%s/"%port
574  try:
575  ServerProxy(testUri).getPid('node')
576  self.fail("test node is still running")
577  except:
578  pass
579 
580  def testAddKillNode(self):
581  """testAddKillNode: test adding then killing nodes"""
582  #TODO: more test cases
583  master = self.master
584  pkg,node = testNode
585  apiError(master.killNode('node','nonexistentNode'))
586  name = 'testAddKillA'
587  port = apiSuccess(master.addNode('node', 'subgraph', name, pkg, node, TEST_MACHINE, 0))
588  # - doesn't traverse across
589  apiError(master.killNode('different.subgraph.node', name))
590  # - doesn't traverse down
591  apiError(master.killNode('node', name))
592  # - doesn't traverse up
593  apiError(master.killNode('subgraph.sub2.node', name))
594  # - kill it
595  apiSuccess(master.killNode('subgraph.node', name))
596 
597  # - push on the name resolution a bit
598  tests = [['node', '', 'testAddKillB'],
599  ['node', 'g1.g2', 'testAddKillB'],
600  ['node', 'g1', 'testAddKillB'],
601  ['g1.g2.node', 'g3', 'testAddKillB'],
602  ]
603  for callerId, subcontext, nodeName in tests:
604  port = apiSuccess(master.addNode(callerId, subcontext, nodeName, pkg, node, TEST_MACHINE, 0))
605  if subcontext:
606  name = "%s.%s"%(subcontext, nodeName)
607  else:
608  name = nodeName
609  apiSuccess(master.killNode(callerId, name))
610  self._verifyNodeDead(port)
611 
612  def testAddNode(self):
613  """testAddNode: test master.addNode(callerId, subcontext, name, pkg, pkgNode, machine, port)"""
614  master = self.master
615  graphNodes = ['master']
616  graphFlows = []
617  # Failure cases
618  pkg, node = testNode
619  errors = [
620  # - subcontext
621  ['', 12, 'testAddNodeError1', pkg, node, TEST_MACHINE, 0],
622  #name
623  ['', '', 123, pkg, node, TEST_MACHINE, 0],
624  # - invalid package implementation type
625  ['', '', 'testAddNodeError2', 123, node, TEST_MACHINE, 0],
626  # - node impl name
627  ['', '', 'testAddNodeError3', pkg, '', TEST_MACHINE, 0],
628  ['', '', 'testAddNodeError4', pkg, 123, TEST_MACHINE, 0],
629  # - machine parameter
630  ['', '', 'testAddNodeError6', pkg, node, 'unknown', 0],
631  ['', '', 'testAddNodeError7', pkg, 'noNode', 123, 0],
632  # - port
633  ['', '', 'testAddNodeError8', pkg, node, TEST_MACHINE, -80],
634  ['', '', 'testAddNodeError9', pkg, node, TEST_MACHINE, "80"],
635  ]
636  for args in errors:
637  try:
638  apiError(master.addNode(*args))
639  except Exception, e:
640  self.fail("addNodeError case failed with args[%s] and exception [%s]"%(args, e))
641  # - non-existent node implementation (this takes awhile)
642  apiFail(master.addNode('', '', 'testAddNodeFail1', pkg, 'notANode', TEST_MACHINE, 0))
643 
644  testGraphState(master, graphNodes, graphFlows)
645  tests = [[['','testAddNode1'], [TEST_MACHINE, 7980]],
646  [['','testAddNode2'], [TEST_MACHINE, 0]],
647  [['','testAddNode3'], ['', 0]],
648  [['','testAddNode4'], [TEST_MACHINE, 0]],
649  [['','testAddNode5'], [TEST_MACHINE, 0]],
650  [['','testAddNode6'], [TEST_MACHINE, 0]],
651  [['','testAddNode7'], [TEST_MACHINE, 0]],
652  # subcontext
653  [['push', 'testAddNode8'], [TEST_MACHINE, 0]],
654  [['push.one.two', 'testAddNode9'], [TEST_MACHINE, 0]],
655  [['stanford.addNodeTest','testAddNodeA'], [TEST_MACHINE, 0]],
656  [['wg.addNodeTest', 'testAddNodeA'], [TEST_MACHINE, 0]],
657  ]
658  for fullname, args in tests:
659  print "testAddNode: testing", fullname
660  subcontext, name = fullname
661  if subcontext:
662  fullname = '%s.%s'%(subcontext, name)
663  else:
664  fullname = name
665  machine, port = args
666  apiSuccess(master.addNode('', subcontext, name, pkg, node, machine, port))
667  verifyNodeAddress(master, '', fullname, machine, 'localhost', port)
668  graphNodes.append(fullname)
669  testGraphState(master, graphNodes, graphFlows)
670  # duplicate call should succeed
671  apiSuccess(master.addNode('', subcontext, name, pkg, node, machine, port))
672 
673  #TODO: more stress testing of name resolution with non-root callerIds
674 
675  # duplicate call with different params should fail
676  port = apiSuccess(master.addNode('node', 'duplicate.test', 'nodeA', pkg, node, TEST_MACHINE, 0))
677  apiError(master.addNode('node', 'duplicate.test', 'nodeA', pkg, node, TEST_MACHINE, port + 1))
678  apiError(master.addNode('node', 'duplicate.test', 'nodeA', pkg+'foo', node, TEST_MACHINE, port))
679  apiError(master.addNode('node', 'duplicate.test', 'nodeA', pkg, node+'foo', TEST_MACHINE, port))
680  #TODO: different machine
681 
682  def testGetGraph(self):
683  #TODO: in ROS 2.0, graph will be scoped by callerId. For now its safe to assume
684  #that we are implicitly testing getGraph()
685  pass
686 
687  def testAddMachine(self):
688  master = self.master
689  host, port = masterAddr
690  #test invalid calls on addMachine
691  # - name
692  apiError(master.addMachine('node', '', self.rosRoot, host, 22, '', ''))
693  apiError(master.addMachine('node', 123, self.rosRoot, host, 22, '', ''))
694  # - ros root
695  apiError(master.addMachine('node', 'name', '', host, 22, '', ''))
696  apiError(master.addMachine('node', 'name', 123, host, 22, '', ''))
697  # - address
698  apiError(master.addMachine('node', 'name', '', '', 22, '', ''))
699  apiError(master.addMachine('node', 'name', '', 123, 22, '', ''))
700  # - ssh port
701  apiError(master.addMachine('node', 'name', '', host, -22, '', ''))
702  apiError(master.addMachine('node', 'name', '', host, "22", '', ''))
703 
704  tests = [
705  ['node', 'testAddMachine-1'],
706  ['g1.node', 'testAddMachine-2'],
707  ['g1.g2.g3.node', 'testAddMachine-3'],
708  ['g1.g2.node', 'testAddMachine-4'],
709  ]
710  rosRoot = self.rosRoot
711  for callerId, m in tests:
712  apiSuccess(master.addMachine(callerId, m, rosRoot, host, 22, '', ''))
713  # - duplicate safe
714  apiSuccess(master.addMachine(callerId, m, rosRoot, host, 22, '', ''))
715  # - test error for each parameter being changed
716  apiError(master.addMachine(callerId, m, rosRoot+'/foo/', host, 22, '', ''))
717  apiError(master.addMachine(callerId, m, rosRoot, 'www.google.com', 22, '', ''))
718  apiError(master.addMachine(callerId, m, rosRoot, host, 21, '', ''))
719  apiError(master.addMachine(callerId, m, rosRoot, host, 22, 'fake-user', ''))
720  apiError(master.addMachine(callerId, m, rosRoot, host, 22, '', 'fake-password'))
721 
722  #TODO: rewrite once master has a method for interrogating machines
723 
725  #TODO: test flows param
726  pass
727 
728  def testRegisterNode(self):
729  master = self.master
730  flows = []
731  # - invalid name
732  apiError(master.registerNode('', '', 'localhost', 80, flows))
733  # - invalid address
734  apiError(master.registerNode('', 'registerNodeFail2', '', 80, flows))
735  # - invalid ports
736  apiError(master.registerNode('', 'registerNodeFail3', 'localhost', -80, flows))
737  apiError(master.registerNode('', 'registerNodeFail4', 'localhost', 0, flows))
738 
739  #implicitly test registerNode via local exec of test node
740  # - this actually tests the slave as much as the master, but I don't want
741  # to call registerNode with correct parameters as it is ambiguous whether
742  # or not the master should verify that the slave node actually exists
743  testCases = [
744  ['', 'registerNodeExternal-1'],
745  ['', 'rne.registerNodeExternal-2'],
746  ['rne', 'registerNodeExternal-3'],
747  ['', 'rne.rne2.rne3.registerNodeExternal-4'],
748  ['rne', 'rne2.rne3.registerNodeExternal-5'],
749  ['rne.rne2.rne3', 'registerNodeExternal-6'],
750  ]
751  for context, name in testCases:
752  try:
753  if context:
754  fullName = "%s.%s"%(context, name)
755  callerId = "%s.node"%context
756  else:
757  fullName = name
758  callerId = "node"
759  startTestNode(fullName)
760  # Block until node is responsive before continuing
761  # - give test node 4 seconds to start
762  timeoutT = time.time() + 4.0
763  node = getTestNode()
764  val = None
765  while time.time() < timeoutT and not val:
766  try:
767  _, _, val = node.getPid('')
768  except:
769  pass
770  assert val, "unable to start test node for registerNode test case"
771  # - we don't know the machine in this case
772  verifyNodeAddress(master, callerId, name, None, testNodeAddr[0], testNodeAddr[1])
773  finally:
774  stopTestNode()
775 
776  def testConnectFlow(self):
777  master = self.master
778  pkg, node = testNode
779  graphNodes = ['master']
780  graphFlows = []
781  machine = TEST_MACHINE
782  for i in range(1, 5):
783  apiSuccess(master.addNode('m', '', 'baseTcfNode-%s'%i, pkg, node, machine, 0))
784  graphNodes.append('baseTcfNode-%s'%i) #for testing up leveling
785  apiSuccess(master.addNode('m', '', 'tcfNode-%s'%i, pkg, node, machine, 0))
786  graphNodes.append('tcfNode-%s'%i)
787  apiSuccess(master.addNode('m', 'tcf1', 'tcfNode-%s'%i, pkg, node, machine, 0))
788  graphNodes.append('tcf1.tcfNode-%s'%i)
789  apiSuccess(master.addNode('m', 'tcf2', 'tcfNode-%s'%i, pkg, node, machine, 0))
790  graphNodes.append('tcf2.tcfNode-%s'%i)
791  apiSuccess(master.addNode('m', 'tcf1.sub1', 'tcfNode-%s'%i, pkg, node, machine, 0))
792  graphNodes.append('tcf1.sub1.tcfNode-%s'%i)
793  apiSuccess(master.addNode('m', 'tcf2.sub1', 'tcfNode-%s'%i, pkg, node, machine, 0))
794  graphNodes.append('tcf2.sub1.tcfNode-%s'%i)
795  apiSuccess(master.addNode('m', 'tcf3', 'tcfNode3-%s'%i, pkg, node, machine, 0))
796  graphNodes.append('tcf3.tcfNode3-%s'%i)
797  testGraphState(master, graphNodes, graphFlows)
798 
799  reliable = 1
800 
801  # illegal cases
802  illegal = [
803  # - name resolution scope
804  ['node.tcf1', 'baseTcfNode-1:out', 'tcfNode-1:in'],
805  ['tcf1.node', 'tcfNode-1:out', 'baseTcfNode-1:in'],
806  ['tcf1.tcfNode-1', 'tcfNode-1:out', 'tcf1.tcfNode-1:in'],
807  ['tcf1.tcfNode-1', ':out', 'tcf1.tcfNode-1:in'],
808  ['tcf1.tcfNode-1', 'tcf1.tcfNode-2:out', ':in'],
809  ['tcf1.node', 'tcf1.tcfNode-1:out', 'tcfNode-1:in'],
810  ['tcf1.node', 'tcf2.tcfNode-1:out', 'tcfNode-1:in'],
811  ['tcf1.node', 'tcfNode-1:out', 'tcf2.tcfNode-1:in'],
812  ['tcf1.node', 'sub1.tcfNode-1:out', 'tcf2.sub1.tcfNode-1:in'],
813  ['tcf1.sub1.node','baseTcfNode-1:out', 'tcfNode-1:in'],
814  ['tcf1.sub1.node','sub1.tcfNode-1:out', 'tcfNode-1:in'],
815  ['tcf1.sub1.node','tcf2.tcfNode-1:out', 'tcfNode-1:in'],
816  ]
817  for callerId, source, sink in illegal:
818  apiError(master.connectFlow(callerId, source, sink, reliable))
819 
820  # single source to sink sink
821  singleCase = [
822  #straight forward cases
823  ['node', 'tcfNode-1:out', 'tcfNode-2:in', 'tcfNode-1:out', 'tcfNode-2:in',],
824  ['tcfNode-1','tcf1.tcfNode-1:out', 'tcf1.tcfNode-2:in', 'tcf1.tcfNode-1:out', 'tcf1.tcfNode-2:in'],
825  ['tcf2.tcfNode-1', 'tcfNode-1:out', 'tcfNode-2:in', 'tcf2.tcfNode-1:out', 'tcf2.tcfNode-2:in'],
826  ['tcf1.tcfNode-2', 'sub1.tcfNode-1:out', 'sub1.tcfNode-2:in', 'tcf1.sub1.tcfNode-1:out', 'tcf1.sub1.tcfNode-2:in'],
827  ['tcf2.tcfNode-1', 'tcfNode-1:out', 'sub1.tcfNode-1:in', 'tcf2.tcfNode-1:out', 'tcf2.sub1.tcfNode-1:in'],
828 
829  # '.locator' naming test
830  ['tcf2.sub1.tcfNode-1', '.:out', 'tcfNode-2:in', 'tcf2.sub1.tcfNode-1:out', 'tcf2.sub1.tcfNode-2:in'],
831  ['tcf3.tcfNode3-2', 'tcfNode3-1:out', '.:in', 'tcf3.tcfNode3-1:out', 'tcf3.tcfNode3-2:in'],
832  ]
833  for callerId, source, sink, sourceFull, sinkFull in singleCase:
834  apiSuccess(master.connectFlow(callerId, source, sink, reliable))
835  graphFlows.append((sourceFull, sinkFull))
836  testGraphState(master, graphNodes, graphFlows)
837 
838  # - already connected
839  # Spec is fuzzy here. It's possible that this should be a succeed.
840  apiError(master.connectFlow('tcf2.node', 'tcfNode-1:out', 'tcfNode-2:in', reliable))
841  apiError(master.connectFlow('node', 'tcf1.tcfNode-1:out', 'tcf1.tcfNode-2:in', reliable))
842 
843  #TODO: test single source to multiple sinks
844  #TODO: test multiple sources to single sink
845 
846 
847  def testKillFlow(self):
848  master = self.master
849  if 0:
850  master.killFlow(callerId, source, sink)
851 
852  def testMisc(self):
853  master = self.master
854  assert master is not None, "master is None"
855  masterUri = apiSuccess(master.getMasterUri(''))
856  assert masterUri == apiSuccess(master.getMasterUri('a.different.id')), master.getMasterUri('a.different.id')
857  assert (getMasterUri() == masterUri) or \
858  (getMasterUriAlt() == masterUri), masterUri
859  #getPid
860  pid = apiSuccess(master.getPid(''))
861  assert pid == apiSuccess(master.getPid('a.different.id')), master.getPid('a.different.id')
862  #callerId must be string
863  apiError(master.getPid(0))
864  apiError(master.getMasterUri(0))
865  #shutdown
866  try:
867  master.shutdown('some.id')
868  except:
869  pass
870  time.sleep(0.1)
871  try:
872  code, status, val = master.getPid('')
873  assert code < 1, "Master is still running after shutdown"
874  except:
875  pass
876 
877 def testMasterMain(argv, stdout, env):
878  return msMain(argv, stdout, env, [ParamServerTestCase, MasterTestCase], singletest)
879 
880 if __name__ == '__main__':
881  testMasterMain(sys.argv, sys.stdout, os.environ)
882 
883 
def verifyNodeAddress(master, callerId, name, machine, addr, port)
Definition: testMaster.py:59
def testGraphState(master, graphNodes, graphFlows)
Definition: testMaster.py:80
def _verifyNodeDead(self, port)
Definition: testMaster.py:572
def testMasterMain(argv, stdout, env)
Definition: testMaster.py:877
def _verifyFlowNameState(self, master, state)
Definition: testMaster.py:348
def testParamState(master, myState)
Definition: testMaster.py:93
def _testSetParam(self, ctx, myState, testVals, master)
Definition: testMaster.py:117


test_rosmaster
Author(s): Ken Conley
autogenerated on Sun Feb 3 2019 03:30:20