testSlave.py
Go to the documentation of this file.
1 # Software License Agreement (BSD License)
2 #
3 # Copyright (c) 2008, Willow Garage, Inc.
4 # All rights reserved.
5 #
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions
8 # are met:
9 #
10 # * Redistributions of source code must retain the above copyright
11 # notice, this list of conditions and the following disclaimer.
12 # * Redistributions in binary form must reproduce the above
13 # copyright notice, this list of conditions and the following
14 # disclaimer in the documentation and/or other materials provided
15 # with the distribution.
16 # * Neither the name of Willow Garage, Inc. nor the names of its
17 # contributors may be used to endorse or promote products derived
18 # from this software without specific prior written permission.
19 #
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
24 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
25 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
26 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
28 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
30 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31 # POSSIBILITY OF SUCH DAMAGE.
32 #
33 # Revision $Id$
34 
35 import os
36 import sys
37 import string
38 try:
39  from xmlrpc.client import ServerProxy
40 except ImportError:
41  from xmlrpclib import ServerProxy
42 
43 import rospy
44 from rostest import *
45 
46 singletest = None
47 #singletest = 'testSourceKillFlow'
48 
49 #TODO: test single source to multiple sinks
50 #TODO: test multiple sources to single sink
51 
52 # test_string_in
53 # test_string_out
54 
55 # test_primitives_in
56 # test_primitives_out
57 
58 # test_arrays_in
59 # test_arrays_out
60 
61 # test_header_in
62 # test_header_out
63 
64 _required_publications = 'test_string_out', 'test_primitives_out', 'test_arrays_out', 'test_header_out'
65 _required_subscriptions = 'test_string_in', 'test_primitives_in', 'test_arrays_in', 'test_header_in'
66 
67 ## Expects a single test node to be running with name 'test_node' and subscribed to 'test_string'
68 class SlaveTestCase(TestRosClient):
69 
70  def setUp(self):
71  super(SlaveTestCase, self).setUp()
72  # retrieve handle on node
73  self.caller_id = rospy.get_caller_id()
74  self.node_api = self.apiSuccess(self.master.lookupNode(self.caller_id, 'test_node'))
75  self.assert_(self.node_api.startswith('http'))
76  self.node = ServerProxy(self.node_api)
77 
78  def testGetPid(self):
79  pid = self.apiSuccess(self.node.getPid(self.caller_id))
80  self.assert_(pid > 0)
81 
83  publications = self.apiSuccess(self.node.getPublications(self.caller_id))
84  self.assert_(publications is not None)
85  expected = [rospy.resolve_name(t) for t in _required_publications]
86  missing = set(expected) - set(publications)
87  self.failIf(len(missing), 'missing required topics: %s'%(','.join(missing)))
88 
89  def _subTestSourceRequestFlow(self, testName, protocols, testEval):
90  master = self.master
91  tests = [
92  [['testSourceRequestFlow-%s-nodeA','testSourceRequestFlow-%s-nodeB',],
93  ['node', 'testSourceRequestFlow-%s-nodeA.out', 'testSourceRequestFlow-%s-nodeB.in']],
94  [['g1.testSourceRequestFlow-%s-nodeA','g1.testSourceRequestFlow-%s-nodeB',],
95  ['g1.node', 'testSourceRequestFlow-%s-nodeA.out', 'testSourceRequestFlow-%s-nodeB.in']],
96  [['g1.g2.g3.testSourceRequestFlow-%s-nodeA','g1.g2.testSourceRequestFlow-%s-nodeB',],
97  ['g1.g2.node', 'g3.testSourceRequestFlow-%s-nodeA.out', 'testSourceRequestFlow-%s-nodeB.in']],
98  [['g1.g2.testSourceRequestFlow-%s-nodeA','g1.g2.g3.testSourceRequestFlow-%s-nodeB',],
99  ['g1.g2.node', 'testSourceRequestFlow-%s-nodeA.out', 'g3.testSourceRequestFlow-%s-nodeB.in']],
100  ]
101  sources = {}
102  #start the nodes
103  # - save the source as we will be making calls on it
104  pkg, node = testNode
105  for test in tests:
106  sourceName, sinkName = [val%testName for val in test[0]]
107  port = apiSuccess(master.addNode('', '', sourceName, pkg, node, TEST_MACHINE, 0))
108  apiSuccess(master.addNode('', '', sinkName, pkg, node, TEST_MACHINE, 0))
109  sourceUri = 'http://%s:%s/'%(testNodeAddr[0], port)
110  sources[sourceName] = ServerProxy(sourceUri)
111 
112  for test in tests:
113  sourceName, sinkName = [val%testName for val in test[0]]
114  source = sources[sourceName]
115  callerId = test[1][0]
116  sourceLocator, sinkLocator = [val%testName for val in test[1][1:]]
117  args = source.sourceRequestFlow(callerId, sourceLocator, sinkLocator, protocols)
118  testEval(args)
119  #TODO: test locator name resolution
120 
122  def testEval(args):
123  protocol = apiSuccess(args)
124  assert type(protocol) == list
125  assert string.upper(protocol[0]) == 'TCPROS', "source should have returned TCPROS as the desired protocol"
126  assert len(protocol) == 3, "TCPROS parameter spec should be length 3"
127  protocols = [['TCPROS']]
128  self._subTestSourceRequestFlow('TCPROS1', protocols, testEval)
129 
131  def testEval(args):
132  protocol = apiSuccess(args)
133  assert type(protocol) == list
134  assert string.upper(protocol[0]) == 'TCPROS', "source should have returned TCPROS as the desired protocol"
135  assert len(protocol) == 3, "TCPROS parameter spec should be length 3"
136  protocols = [['Fake1', 123, 132], ['Fake2', 1.0], ['Fake3'], ['Fake4', 'string'], ['Fake5', ['a', 'list'], ['a', 'nother', 'list']], ['TCPROS'], ['Fakelast', 'fake'] ]
137  self._subTestSourceRequestFlow('TCPROS2', protocols, testEval)
138 
140  protocols = [['Fake1', 123, 132], ['Fake2', 1.0], ['Fake3'], ['Fake4', 'string'], ['Fake5', ['a', 'list'], ['a', 'nother', 'list']], ['Fakelast', 'fake'] ]
141  self._subTestSourceRequestFlow('Fail', protocols, apiFail)
142 
144  slave = self.slave
145  master = self.master
146  #test that malformed locators return error codes
147  apiError(slave.sourceRequestFlow('', '', ''))
148  apiError(slave.sourceRequestFlow('', 'good.locator', 'badlocator'))
149  apiError(slave.sourceRequestFlow('', 'badlocator', 'good.locator'))
150 
151 # sourceKillFlow(callerId, sourceLocator, sinkLocator)
152 #
153 # * called by master
154 # * returns int
155 
157  slave = self.slave
158  master = self.master
159  #test that malformed locators return error codes
160  apiError(slave.sourceKillFlow('', '', ''))
161  apiError(slave.sourceKillFlow('', 'good.locator', 'badlocator'))
162  apiError(slave.sourceKillFlow('', 'badlocator', 'good.locator'))
163 
164  tests = [
165  [['testSourceKillFlow-nodeA','testSourceKillFlow-nodeB',],
166  ['node', 'testSourceKillFlow-nodeA.out', 'testSourceKillFlow-nodeB.in']],
167  [['g1.testSourceKillFlow-nodeA','g1.testSourceKillFlow-nodeB',],
168  ['g1.node', 'testSourceKillFlow-nodeA.out', 'testSourceKillFlow-nodeB.in']],
169  [['g1.g2.g3.testSourceKillFlow-nodeA','g1.g2.g3.testSourceKillFlow-nodeB',],
170  ['g1.g2.node', 'g3.testSourceKillFlow-nodeA.out', 'g3.testSourceKillFlow-nodeB.in']],
171  [['g1.g2.testSourceKillFlow-nodeA','g1.g2.testSourceKillFlow-nodeB',],
172  ['g1.g2.node', 'testSourceKillFlow-nodeA.out', 'testSourceKillFlow-nodeB.in']],
173  [['a1.g2.g3.testSourceKillFlow-nodeA','a1.g2.testSourceKillFlow-nodeB',],
174  ['a1.node', 'g2.g3.testSourceKillFlow-nodeA.out', 'g2.testSourceKillFlow-nodeB.in']],
175  [['a1.g2.testSourceKillFlow-nodeA','a1.g2.g3.testSourceKillFlow-nodeB',],
176  ['a1.node', 'g2.testSourceKillFlow-nodeA.out', 'g2.g3.testSourceKillFlow-nodeB.in']],
177 
178  ]
179  sources = {}
180  #start the flows
181  # - save the source as we will be making calls on it
182  pkg, node = testNode
183  for test in tests:
184  sourceName, sinkName = test[0]
185  # - start the source/sink nodes
186  port = apiSuccess(master.addNode('', '', sourceName, pkg, node, TEST_MACHINE, 0))
187  apiSuccess(master.addNode('', '', sinkName, pkg, node, TEST_MACHINE, 0))
188  sourceUri = 'http://%s:%s/'%(testNodeAddr[0], port)
189  sources[sourceName] = ServerProxy(sourceUri)
190  # - start the flow
191  callerId, sourceLocator, sinkLocator = test[1]
192  apiSuccess(master.connectFlow(callerId, sourceLocator, sinkLocator, 1))
193 
194  #attempt to kill flow with 1 wrong endpoint
195  for test in tests:
196  source = sources[test[0][0]]
197  callerId, sourceLocator, sinkLocator = test[1]
198  #sourceKill flow does a silent succeed if there is no flow to sink, but returns
199  #0 flows killed
200  val = apiSuccess(source.sourceKillFlow(callerId, sourceLocator, 'fake.sink'))
201  assert val == 0, "flowsKilled should be 0 for non-existent flow [fakesink]"
202  #sourceKill flow fails if source param does not refer to it
203  apiError(source.sourceKillFlow(callerId, 'fake.source', sinkLocator))
204 
205  #attempt to kill flow with wrong context
206  for test in tests:
207  source = sources[test[0][0]]
208  callerId, sourceL, sinkL = test[1]
209  for sub_test in tests:
210  sub_callerId, sub_source, sub_sink = test[1]
211  if sub_callerId == callerId and sub_source == sourceL and sub_sink == sinkL:
212  continue
213  val = apiSuccess(source.sourceKillFlow(sub_callerId, sub_source, sub_sink))
214  assert val == 0, "flowsKilled should be 0 for non-existent flow [different context: %s,%s,%s on %s,%s,%s]"%(sub_callerId, sub_source, sub_sink, callerId, sourceL, sinkL)
215 
216  # try to kill all flows on the source that we started
217  for test in tests:
218  source = sources[test[0][0]]
219  callerId, sourceLocator, sinkLocator = test[1]
220  val = apiSuccess(source.sourceKillFlow(callerId, sourceLocator, sinkLocator))
221  assert type(val) == int
222  assert val == 1, "Source did not report 1 flow killed: %s, %s"%(val, getLastMsg())
223  #TODO: test locator name resolution
224 
225  sinkTests = [
226  [['testSinkConnectFlow-nodeA','testSinkConnectFlow-nodeB',],
227  ['node', 'testSinkConnectFlow-nodeA.out', 'testSinkConnectFlow-nodeB.in']],
228  [['g1.testSinkConnectFlow-nodeA','g1.testSinkConnectFlow-nodeB',],
229  ['g1.node', 'testSinkConnectFlow-nodeA.out', 'testSinkConnectFlow-nodeB.in']],
230  [['g1.g2.g3.testSinkConnectFlow-nodeA','g1.g2.testSinkConnectFlow-nodeB',],
231  ['g1.g2.node', 'g3.testSinkConnectFlow-nodeA.out', 'testSinkConnectFlow-nodeB.in']],
232  [['g1.g2.testSinkConnectFlow-nodeA','g1.g2.g3.testSinkConnectFlow-nodeB',],
233  ['g1.g2.node', 'testSinkConnectFlow-nodeA.out', 'g3.testSinkConnectFlow-nodeB.in']],
234  #separate subgraphs
235  [['a1.a2.testSinkConnectFlow-nodeA','b1.b2.testSinkConnectFlow-nodeB',],
236  ['node', 'a1.a2.testSinkConnectFlow-nodeA.out', 'b1.b2.testSinkConnectFlow-nodeB.in']],
237  # '.locator' resolution
238  [['c1.node.testSinkConnectFlow-nodeA','c1.node2.testSinkConnectFlow-nodeB',],
239  ['c1.node2', 'node.testSinkConnectFlow-nodeA.out', '.testSinkConnectFlow-nodeB.in']],
240 
241  ]
242 
243  def _sink_StartNodes(self, tests):
244  """
245  Test subroutine to startup all the nodes specified in the tests
246  """
247  master = self.master
248  sourceUris = {}
249  sinks = {}
250  #start the nodes
251  # - save the sink as we will be making calls on it
252  pkg, node = testNode
253  for test in tests:
254  sourceName, sinkName = test[0]
255  sourcePort = apiSuccess(master.addNode('', '', sourceName, pkg, node, TEST_MACHINE, 0))
256  sinkPort = apiSuccess(master.addNode('', '', sinkName, pkg, node, TEST_MACHINE, 0))
257  sourceUri = 'http://%s:%s/'%(testNodeAddr[0], sourcePort)
258  sinkUri = 'http://%s:%s/'%(testNodeAddr[0], sinkPort)
259  sourceUris[sourceName] = sourceUri
260  sinks[sinkName] = ServerProxy(sinkUri)
261  return sourceUris, sinks
262 
263  def _sink_StartFlows(self, tests, sourceUris, sinks):
264  """
265  subroutine to connect the flows specified in the tests, purely
266  using the sink API.
267 
268  In the future it would be nice to verify that the flow truly
269  exists, but for now trust what the sink says
270  """
271  reliable = 1 #don't have UDP yet
272  for test in tests:
273  sourceName, sinkName = test[0]
274  sourceUri = sourceUris[sourceName]
275  sink = sinks[sinkName]
276  callerId, sourceLocator, sinkLocator = test[1]
277  print "Testing", test
278  val = apiSuccess(sink.sinkConnectFlow(callerId, sourceLocator, sinkLocator, sourceUri, reliable))
279  assert type(val) == int
280 
281  def _sink_KillFlows(self, tests, sinks):
282  """
283  subroutine to kill the flows specified in the tests, purely
284  using the sink API (i.e. source will still be running)
285 
286  In the future it would be nice to verify that the flow was
287  killed, but for now trust what the sink says"""
288  reliable = 1 #don't have UDP yet
289  for test in tests:
290  sourceName, sinkName = test[0]
291  sink = sinks[sinkName]
292  callerId, sourceLocator, sinkLocator = test[1]
293  print "Testing", test
294  assert 1 == apiSuccess(sink.sinkKillFlow(callerId, sourceLocator, sinkLocator)),\
295  "sink did not report 1 flow killed: %s, %s"%(getLastVal(), getLastMsg())
296 
297  #sinkConnectFlow(sourceLocator, sinkLocator, sourceXmlRpcURI, reliable)
298  # * API call on the sink of a flow. Called by master.
299  # * master requests that sink negotiate with the source to establish the flow
300  # * returns meaningless int
301  # * synchronous call, blocks until success/fail
302 
304  master = self.master
305  sourceUris, sinks = self._sink_StartNodes(self.sinkTests)
306  self._sink_StartFlows(self.sinkTests, sourceUris, sinks)
307 
308 # sinkKillFlow(sourceLocator, sinkLocator)
309 # * called by master
310  def testSinkKillFlow(self):
311  slave = self.slave
312  master = self.master
313  apiError(slave.sinkKillFlow('', '', ''))
314 
315  #startup the standard sink tests
316  sourceUris, sinks = self._sink_StartNodes(self.sinkTests)
317  self._sink_StartFlows(self.sinkTests, sourceUris, sinks)
318  self._sink_KillFlows(self.sinkTests, sinks)
319 
320  def testMisc(self):
321  slave = self.slave
322  assert slave is not None, "slave is None"
323  #getMasterUri
324  masterUri = apiSuccess(slave.getMasterUri(''))
325  assert getMasterUri() == masterUri or getMasterUriAlt() == masterUri, masterUri
326  assert masterUri == apiSuccess(slave.getMasterUri('a.different.id'))
327  #getPid
328  pid = apiSuccess(slave.getPid(''))
329  assert pid == apiSuccess(slave.getPid('a.different.id'))
330  #callerId must be string
331  apiError(slave.getPid(0))
332  apiError(slave.getMasterUri(0))
333  try:
334  slave.shutdown('some.id')
335  except:
336  pass
337  time.sleep(0.1)
338  try:
339  code, status, val = slave.getPid('')
340  assert code < 1, "Slave is still running after shutdown"
341  except:
342  pass
343 
def testSourceRequestFlow_Errors(self)
Definition: testSlave.py:143
def _sink_StartFlows(self, tests, sourceUris, sinks)
Definition: testSlave.py:263
def testSourceRequestFlow_Fail(self)
Definition: testSlave.py:139
def testSourceRequestFlow_TCPROS1(self)
Definition: testSlave.py:121
def _sink_StartNodes(self, tests)
Definition: testSlave.py:243
Expects a single test node to be running with name &#39;test_node&#39; and subscribed to &#39;test_string&#39;.
Definition: testSlave.py:68
def testSourceRequestFlow_TCPROS2(self)
Definition: testSlave.py:130
def _sink_KillFlows(self, tests, sinks)
Definition: testSlave.py:281
def _subTestSourceRequestFlow(self, testName, protocols, testEval)
Definition: testSlave.py:89


test_rosmaster
Author(s): Ken Conley, Dirk Thomas
autogenerated on Mon Nov 2 2020 03:52:54