39 from xmlrpc.client
import ServerProxy
41 from xmlrpclib
import ServerProxy
64 _required_publications =
'test_string_out',
'test_primitives_out',
'test_arrays_out',
'test_header_out' 65 _required_subscriptions =
'test_string_in',
'test_primitives_in',
'test_arrays_in',
'test_header_in' 71 super(SlaveTestCase, self).
setUp()
75 self.assert_(self.node_api.startswith(
'http'))
79 pid = self.apiSuccess(self.node.getPid(self.
caller_id))
83 publications = self.apiSuccess(self.node.getPublications(self.
caller_id))
84 self.assert_(publications
is not None)
85 expected = [rospy.resolve_name(t)
for t
in _required_publications]
86 missing = set(expected) - set(publications)
87 self.failIf(len(missing),
'missing required topics: %s'%(
','.join(missing)))
92 [[
'testSourceRequestFlow-%s-nodeA',
'testSourceRequestFlow-%s-nodeB',],
93 [
'node',
'testSourceRequestFlow-%s-nodeA.out',
'testSourceRequestFlow-%s-nodeB.in']],
94 [[
'g1.testSourceRequestFlow-%s-nodeA',
'g1.testSourceRequestFlow-%s-nodeB',],
95 [
'g1.node',
'testSourceRequestFlow-%s-nodeA.out',
'testSourceRequestFlow-%s-nodeB.in']],
96 [[
'g1.g2.g3.testSourceRequestFlow-%s-nodeA',
'g1.g2.testSourceRequestFlow-%s-nodeB',],
97 [
'g1.g2.node',
'g3.testSourceRequestFlow-%s-nodeA.out',
'testSourceRequestFlow-%s-nodeB.in']],
98 [[
'g1.g2.testSourceRequestFlow-%s-nodeA',
'g1.g2.g3.testSourceRequestFlow-%s-nodeB',],
99 [
'g1.g2.node',
'testSourceRequestFlow-%s-nodeA.out',
'g3.testSourceRequestFlow-%s-nodeB.in']],
106 sourceName, sinkName = [val%testName
for val
in test[0]]
107 port = apiSuccess(master.addNode(
'',
'', sourceName, pkg, node, TEST_MACHINE, 0))
108 apiSuccess(master.addNode(
'',
'', sinkName, pkg, node, TEST_MACHINE, 0))
109 sourceUri =
'http://%s:%s/'%(testNodeAddr[0], port)
110 sources[sourceName] = ServerProxy(sourceUri)
113 sourceName, sinkName = [val%testName
for val
in test[0]]
114 source = sources[sourceName]
115 callerId = test[1][0]
116 sourceLocator, sinkLocator = [val%testName
for val
in test[1][1:]]
117 args = source.sourceRequestFlow(callerId, sourceLocator, sinkLocator, protocols)
123 protocol = apiSuccess(args)
124 assert type(protocol) == list
125 assert string.upper(protocol[0]) ==
'TCPROS',
"source should have returned TCPROS as the desired protocol" 126 assert len(protocol) == 3,
"TCPROS parameter spec should be length 3" 127 protocols = [[
'TCPROS']]
132 protocol = apiSuccess(args)
133 assert type(protocol) == list
134 assert string.upper(protocol[0]) ==
'TCPROS',
"source should have returned TCPROS as the desired protocol" 135 assert len(protocol) == 3,
"TCPROS parameter spec should be length 3" 136 protocols = [[
'Fake1', 123, 132], [
'Fake2', 1.0], [
'Fake3'], [
'Fake4',
'string'], [
'Fake5', [
'a',
'list'], [
'a',
'nother',
'list']], [
'TCPROS'], [
'Fakelast',
'fake'] ]
140 protocols = [[
'Fake1', 123, 132], [
'Fake2', 1.0], [
'Fake3'], [
'Fake4',
'string'], [
'Fake5', [
'a',
'list'], [
'a',
'nother',
'list']], [
'Fakelast',
'fake'] ]
147 apiError(slave.sourceRequestFlow(
'',
'',
''))
148 apiError(slave.sourceRequestFlow(
'',
'good.locator',
'badlocator'))
149 apiError(slave.sourceRequestFlow(
'',
'badlocator',
'good.locator'))
160 apiError(slave.sourceKillFlow(
'',
'',
''))
161 apiError(slave.sourceKillFlow(
'',
'good.locator',
'badlocator'))
162 apiError(slave.sourceKillFlow(
'',
'badlocator',
'good.locator'))
165 [[
'testSourceKillFlow-nodeA',
'testSourceKillFlow-nodeB',],
166 [
'node',
'testSourceKillFlow-nodeA.out',
'testSourceKillFlow-nodeB.in']],
167 [[
'g1.testSourceKillFlow-nodeA',
'g1.testSourceKillFlow-nodeB',],
168 [
'g1.node',
'testSourceKillFlow-nodeA.out',
'testSourceKillFlow-nodeB.in']],
169 [[
'g1.g2.g3.testSourceKillFlow-nodeA',
'g1.g2.g3.testSourceKillFlow-nodeB',],
170 [
'g1.g2.node',
'g3.testSourceKillFlow-nodeA.out',
'g3.testSourceKillFlow-nodeB.in']],
171 [[
'g1.g2.testSourceKillFlow-nodeA',
'g1.g2.testSourceKillFlow-nodeB',],
172 [
'g1.g2.node',
'testSourceKillFlow-nodeA.out',
'testSourceKillFlow-nodeB.in']],
173 [[
'a1.g2.g3.testSourceKillFlow-nodeA',
'a1.g2.testSourceKillFlow-nodeB',],
174 [
'a1.node',
'g2.g3.testSourceKillFlow-nodeA.out',
'g2.testSourceKillFlow-nodeB.in']],
175 [[
'a1.g2.testSourceKillFlow-nodeA',
'a1.g2.g3.testSourceKillFlow-nodeB',],
176 [
'a1.node',
'g2.testSourceKillFlow-nodeA.out',
'g2.g3.testSourceKillFlow-nodeB.in']],
184 sourceName, sinkName = test[0]
186 port = apiSuccess(master.addNode(
'',
'', sourceName, pkg, node, TEST_MACHINE, 0))
187 apiSuccess(master.addNode(
'',
'', sinkName, pkg, node, TEST_MACHINE, 0))
188 sourceUri =
'http://%s:%s/'%(testNodeAddr[0], port)
189 sources[sourceName] = ServerProxy(sourceUri)
191 callerId, sourceLocator, sinkLocator = test[1]
192 apiSuccess(master.connectFlow(callerId, sourceLocator, sinkLocator, 1))
196 source = sources[test[0][0]]
197 callerId, sourceLocator, sinkLocator = test[1]
200 val = apiSuccess(source.sourceKillFlow(callerId, sourceLocator,
'fake.sink'))
201 assert val == 0,
"flowsKilled should be 0 for non-existent flow [fakesink]" 203 apiError(source.sourceKillFlow(callerId,
'fake.source', sinkLocator))
207 source = sources[test[0][0]]
208 callerId, sourceL, sinkL = test[1]
209 for sub_test
in tests:
210 sub_callerId, sub_source, sub_sink = test[1]
211 if sub_callerId == callerId
and sub_source == sourceL
and sub_sink == sinkL:
213 val = apiSuccess(source.sourceKillFlow(sub_callerId, sub_source, sub_sink))
214 assert val == 0,
"flowsKilled should be 0 for non-existent flow [different context: %s,%s,%s on %s,%s,%s]"%(sub_callerId, sub_source, sub_sink, callerId, sourceL, sinkL)
218 source = sources[test[0][0]]
219 callerId, sourceLocator, sinkLocator = test[1]
220 val = apiSuccess(source.sourceKillFlow(callerId, sourceLocator, sinkLocator))
221 assert type(val) == int
222 assert val == 1,
"Source did not report 1 flow killed: %s, %s"%(val, getLastMsg())
226 [[
'testSinkConnectFlow-nodeA',
'testSinkConnectFlow-nodeB',],
227 [
'node',
'testSinkConnectFlow-nodeA.out',
'testSinkConnectFlow-nodeB.in']],
228 [[
'g1.testSinkConnectFlow-nodeA',
'g1.testSinkConnectFlow-nodeB',],
229 [
'g1.node',
'testSinkConnectFlow-nodeA.out',
'testSinkConnectFlow-nodeB.in']],
230 [[
'g1.g2.g3.testSinkConnectFlow-nodeA',
'g1.g2.testSinkConnectFlow-nodeB',],
231 [
'g1.g2.node',
'g3.testSinkConnectFlow-nodeA.out',
'testSinkConnectFlow-nodeB.in']],
232 [[
'g1.g2.testSinkConnectFlow-nodeA',
'g1.g2.g3.testSinkConnectFlow-nodeB',],
233 [
'g1.g2.node',
'testSinkConnectFlow-nodeA.out',
'g3.testSinkConnectFlow-nodeB.in']],
235 [[
'a1.a2.testSinkConnectFlow-nodeA',
'b1.b2.testSinkConnectFlow-nodeB',],
236 [
'node',
'a1.a2.testSinkConnectFlow-nodeA.out',
'b1.b2.testSinkConnectFlow-nodeB.in']],
238 [[
'c1.node.testSinkConnectFlow-nodeA',
'c1.node2.testSinkConnectFlow-nodeB',],
239 [
'c1.node2',
'node.testSinkConnectFlow-nodeA.out',
'.testSinkConnectFlow-nodeB.in']],
245 Test subroutine to startup all the nodes specified in the tests 254 sourceName, sinkName = test[0]
255 sourcePort = apiSuccess(master.addNode(
'',
'', sourceName, pkg, node, TEST_MACHINE, 0))
256 sinkPort = apiSuccess(master.addNode(
'',
'', sinkName, pkg, node, TEST_MACHINE, 0))
257 sourceUri =
'http://%s:%s/'%(testNodeAddr[0], sourcePort)
258 sinkUri =
'http://%s:%s/'%(testNodeAddr[0], sinkPort)
259 sourceUris[sourceName] = sourceUri
260 sinks[sinkName] = ServerProxy(sinkUri)
261 return sourceUris, sinks
265 subroutine to connect the flows specified in the tests, purely 268 In the future it would be nice to verify that the flow truly 269 exists, but for now trust what the sink says 273 sourceName, sinkName = test[0]
274 sourceUri = sourceUris[sourceName]
275 sink = sinks[sinkName]
276 callerId, sourceLocator, sinkLocator = test[1]
277 print "Testing", test
278 val = apiSuccess(sink.sinkConnectFlow(callerId, sourceLocator, sinkLocator, sourceUri, reliable))
279 assert type(val) == int
283 subroutine to kill the flows specified in the tests, purely 284 using the sink API (i.e. source will still be running) 286 In the future it would be nice to verify that the flow was 287 killed, but for now trust what the sink says""" 290 sourceName, sinkName = test[0]
291 sink = sinks[sinkName]
292 callerId, sourceLocator, sinkLocator = test[1]
293 print "Testing", test
294 assert 1 == apiSuccess(sink.sinkKillFlow(callerId, sourceLocator, sinkLocator)),\
295 "sink did not report 1 flow killed: %s, %s"%(getLastVal(), getLastMsg())
313 apiError(slave.sinkKillFlow(
'',
'',
''))
322 assert slave
is not None,
"slave is None" 324 masterUri = apiSuccess(slave.getMasterUri(
''))
325 assert getMasterUri() == masterUri
or getMasterUriAlt() == masterUri, masterUri
326 assert masterUri == apiSuccess(slave.getMasterUri(
'a.different.id'))
328 pid = apiSuccess(slave.getPid(
''))
329 assert pid == apiSuccess(slave.getPid(
'a.different.id'))
331 apiError(slave.getPid(0))
332 apiError(slave.getMasterUri(0))
334 slave.shutdown(
'some.id')
339 code, status, val = slave.getPid(
'')
340 assert code < 1,
"Slave is still running after shutdown" def testSourceKillFlow(self)
def testSourceRequestFlow_Errors(self)
def testSinkConnectFlow(self)
def _sink_StartFlows(self, tests, sourceUris, sinks)
def testSourceRequestFlow_Fail(self)
def testSourceRequestFlow_TCPROS1(self)
def _sink_StartNodes(self, tests)
Expects a single test node to be running with name 'test_node' and subscribed to 'test_string'.
def testSourceRequestFlow_TCPROS2(self)
def testGetPublications(self)
def _sink_KillFlows(self, tests, sinks)
def _subTestSourceRequestFlow(self, testName, protocols, testEval)
def testSinkKillFlow(self)