00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 import os
00036 import sys
00037 import string
00038 try:
00039 from xmlrpc.client import ServerProxy
00040 except ImportError:
00041 from xmlrpclib import ServerProxy
00042
00043 import rospy
00044 from rostest import *
00045
00046 singletest = None
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064 _required_publications = 'test_string_out', 'test_primitives_out', 'test_arrays_out', 'test_header_out'
00065 _required_subscriptions = 'test_string_in', 'test_primitives_in', 'test_arrays_in', 'test_header_in'
00066
00067
00068 class SlaveTestCase(TestRosClient):
00069
00070 def setUp(self):
00071 super(SlaveTestCase, self).setUp()
00072
00073 self.caller_id = rospy.get_caller_id()
00074 self.node_api = self.apiSuccess(self.master.lookupNode(self.caller_id, 'test_node'))
00075 self.assert_(self.node_api.startswith('http'))
00076 self.node = ServerProxy(self.node_api)
00077
00078 def testGetPid(self):
00079 pid = self.apiSuccess(self.node.getPid(self.caller_id))
00080 self.assert_(pid > 0)
00081
00082 def testGetPublications(self):
00083 publications = self.apiSuccess(self.node.getPublications(self.caller_id))
00084 self.assert_(publications is not None)
00085 expected = [rospy.resolve_name(t) for t in _required_publications]
00086 missing = set(expected) - set(publications)
00087 self.failIf(len(missing), 'missing required topics: %s'%(','.join(missing)))
00088
00089 def _subTestSourceRequestFlow(self, testName, protocols, testEval):
00090 master = self.master
00091 tests = [
00092 [['testSourceRequestFlow-%s-nodeA','testSourceRequestFlow-%s-nodeB',],
00093 ['node', 'testSourceRequestFlow-%s-nodeA.out', 'testSourceRequestFlow-%s-nodeB.in']],
00094 [['g1.testSourceRequestFlow-%s-nodeA','g1.testSourceRequestFlow-%s-nodeB',],
00095 ['g1.node', 'testSourceRequestFlow-%s-nodeA.out', 'testSourceRequestFlow-%s-nodeB.in']],
00096 [['g1.g2.g3.testSourceRequestFlow-%s-nodeA','g1.g2.testSourceRequestFlow-%s-nodeB',],
00097 ['g1.g2.node', 'g3.testSourceRequestFlow-%s-nodeA.out', 'testSourceRequestFlow-%s-nodeB.in']],
00098 [['g1.g2.testSourceRequestFlow-%s-nodeA','g1.g2.g3.testSourceRequestFlow-%s-nodeB',],
00099 ['g1.g2.node', 'testSourceRequestFlow-%s-nodeA.out', 'g3.testSourceRequestFlow-%s-nodeB.in']],
00100 ]
00101 sources = {}
00102
00103
00104 pkg, node = testNode
00105 for test in tests:
00106 sourceName, sinkName = [val%testName for val in test[0]]
00107 port = apiSuccess(master.addNode('', '', sourceName, pkg, node, TEST_MACHINE, 0))
00108 apiSuccess(master.addNode('', '', sinkName, pkg, node, TEST_MACHINE, 0))
00109 sourceUri = 'http://%s:%s/'%(testNodeAddr[0], port)
00110 sources[sourceName] = ServerProxy(sourceUri)
00111
00112 for test in tests:
00113 sourceName, sinkName = [val%testName for val in test[0]]
00114 source = sources[sourceName]
00115 callerId = test[1][0]
00116 sourceLocator, sinkLocator = [val%testName for val in test[1][1:]]
00117 args = source.sourceRequestFlow(callerId, sourceLocator, sinkLocator, protocols)
00118 testEval(args)
00119
00120
00121 def testSourceRequestFlow_TCPROS1(self):
00122 def testEval(args):
00123 protocol = apiSuccess(args)
00124 assert type(protocol) == list
00125 assert string.upper(protocol[0]) == 'TCPROS', "source should have returned TCPROS as the desired protocol"
00126 assert len(protocol) == 3, "TCPROS parameter spec should be length 3"
00127 protocols = [['TCPROS']]
00128 self._subTestSourceRequestFlow('TCPROS1', protocols, testEval)
00129
00130 def testSourceRequestFlow_TCPROS2(self):
00131 def testEval(args):
00132 protocol = apiSuccess(args)
00133 assert type(protocol) == list
00134 assert string.upper(protocol[0]) == 'TCPROS', "source should have returned TCPROS as the desired protocol"
00135 assert len(protocol) == 3, "TCPROS parameter spec should be length 3"
00136 protocols = [['Fake1', 123, 132], ['Fake2', 1.0], ['Fake3'], ['Fake4', 'string'], ['Fake5', ['a', 'list'], ['a', 'nother', 'list']], ['TCPROS'], ['Fakelast', 'fake'] ]
00137 self._subTestSourceRequestFlow('TCPROS2', protocols, testEval)
00138
00139 def testSourceRequestFlow_Fail(self):
00140 protocols = [['Fake1', 123, 132], ['Fake2', 1.0], ['Fake3'], ['Fake4', 'string'], ['Fake5', ['a', 'list'], ['a', 'nother', 'list']], ['Fakelast', 'fake'] ]
00141 self._subTestSourceRequestFlow('Fail', protocols, apiFail)
00142
00143 def testSourceRequestFlow_Errors(self):
00144 slave = self.slave
00145 master = self.master
00146
00147 apiError(slave.sourceRequestFlow('', '', ''))
00148 apiError(slave.sourceRequestFlow('', 'good.locator', 'badlocator'))
00149 apiError(slave.sourceRequestFlow('', 'badlocator', 'good.locator'))
00150
00151
00152
00153
00154
00155
00156 def testSourceKillFlow(self):
00157 slave = self.slave
00158 master = self.master
00159
00160 apiError(slave.sourceKillFlow('', '', ''))
00161 apiError(slave.sourceKillFlow('', 'good.locator', 'badlocator'))
00162 apiError(slave.sourceKillFlow('', 'badlocator', 'good.locator'))
00163
00164 tests = [
00165 [['testSourceKillFlow-nodeA','testSourceKillFlow-nodeB',],
00166 ['node', 'testSourceKillFlow-nodeA.out', 'testSourceKillFlow-nodeB.in']],
00167 [['g1.testSourceKillFlow-nodeA','g1.testSourceKillFlow-nodeB',],
00168 ['g1.node', 'testSourceKillFlow-nodeA.out', 'testSourceKillFlow-nodeB.in']],
00169 [['g1.g2.g3.testSourceKillFlow-nodeA','g1.g2.g3.testSourceKillFlow-nodeB',],
00170 ['g1.g2.node', 'g3.testSourceKillFlow-nodeA.out', 'g3.testSourceKillFlow-nodeB.in']],
00171 [['g1.g2.testSourceKillFlow-nodeA','g1.g2.testSourceKillFlow-nodeB',],
00172 ['g1.g2.node', 'testSourceKillFlow-nodeA.out', 'testSourceKillFlow-nodeB.in']],
00173 [['a1.g2.g3.testSourceKillFlow-nodeA','a1.g2.testSourceKillFlow-nodeB',],
00174 ['a1.node', 'g2.g3.testSourceKillFlow-nodeA.out', 'g2.testSourceKillFlow-nodeB.in']],
00175 [['a1.g2.testSourceKillFlow-nodeA','a1.g2.g3.testSourceKillFlow-nodeB',],
00176 ['a1.node', 'g2.testSourceKillFlow-nodeA.out', 'g2.g3.testSourceKillFlow-nodeB.in']],
00177
00178 ]
00179 sources = {}
00180
00181
00182 pkg, node = testNode
00183 for test in tests:
00184 sourceName, sinkName = test[0]
00185
00186 port = apiSuccess(master.addNode('', '', sourceName, pkg, node, TEST_MACHINE, 0))
00187 apiSuccess(master.addNode('', '', sinkName, pkg, node, TEST_MACHINE, 0))
00188 sourceUri = 'http://%s:%s/'%(testNodeAddr[0], port)
00189 sources[sourceName] = ServerProxy(sourceUri)
00190
00191 callerId, sourceLocator, sinkLocator = test[1]
00192 apiSuccess(master.connectFlow(callerId, sourceLocator, sinkLocator, 1))
00193
00194
00195 for test in tests:
00196 source = sources[test[0][0]]
00197 callerId, sourceLocator, sinkLocator = test[1]
00198
00199
00200 val = apiSuccess(source.sourceKillFlow(callerId, sourceLocator, 'fake.sink'))
00201 assert val == 0, "flowsKilled should be 0 for non-existent flow [fakesink]"
00202
00203 apiError(source.sourceKillFlow(callerId, 'fake.source', sinkLocator))
00204
00205
00206 for test in tests:
00207 source = sources[test[0][0]]
00208 callerId, sourceL, sinkL = test[1]
00209 for sub_test in tests:
00210 sub_callerId, sub_source, sub_sink = test[1]
00211 if sub_callerId == callerId and sub_source == sourceL and sub_sink == sinkL:
00212 continue
00213 val = apiSuccess(source.sourceKillFlow(sub_callerId, sub_source, sub_sink))
00214 assert val == 0, "flowsKilled should be 0 for non-existent flow [different context: %s,%s,%s on %s,%s,%s]"%(sub_callerId, sub_source, sub_sink, callerId, sourceL, sinkL)
00215
00216
00217 for test in tests:
00218 source = sources[test[0][0]]
00219 callerId, sourceLocator, sinkLocator = test[1]
00220 val = apiSuccess(source.sourceKillFlow(callerId, sourceLocator, sinkLocator))
00221 assert type(val) == int
00222 assert val == 1, "Source did not report 1 flow killed: %s, %s"%(val, getLastMsg())
00223
00224
00225 sinkTests = [
00226 [['testSinkConnectFlow-nodeA','testSinkConnectFlow-nodeB',],
00227 ['node', 'testSinkConnectFlow-nodeA.out', 'testSinkConnectFlow-nodeB.in']],
00228 [['g1.testSinkConnectFlow-nodeA','g1.testSinkConnectFlow-nodeB',],
00229 ['g1.node', 'testSinkConnectFlow-nodeA.out', 'testSinkConnectFlow-nodeB.in']],
00230 [['g1.g2.g3.testSinkConnectFlow-nodeA','g1.g2.testSinkConnectFlow-nodeB',],
00231 ['g1.g2.node', 'g3.testSinkConnectFlow-nodeA.out', 'testSinkConnectFlow-nodeB.in']],
00232 [['g1.g2.testSinkConnectFlow-nodeA','g1.g2.g3.testSinkConnectFlow-nodeB',],
00233 ['g1.g2.node', 'testSinkConnectFlow-nodeA.out', 'g3.testSinkConnectFlow-nodeB.in']],
00234
00235 [['a1.a2.testSinkConnectFlow-nodeA','b1.b2.testSinkConnectFlow-nodeB',],
00236 ['node', 'a1.a2.testSinkConnectFlow-nodeA.out', 'b1.b2.testSinkConnectFlow-nodeB.in']],
00237
00238 [['c1.node.testSinkConnectFlow-nodeA','c1.node2.testSinkConnectFlow-nodeB',],
00239 ['c1.node2', 'node.testSinkConnectFlow-nodeA.out', '.testSinkConnectFlow-nodeB.in']],
00240
00241 ]
00242
00243 def _sink_StartNodes(self, tests):
00244 """
00245 Test subroutine to startup all the nodes specified in the tests
00246 """
00247 master = self.master
00248 sourceUris = {}
00249 sinks = {}
00250
00251
00252 pkg, node = testNode
00253 for test in tests:
00254 sourceName, sinkName = test[0]
00255 sourcePort = apiSuccess(master.addNode('', '', sourceName, pkg, node, TEST_MACHINE, 0))
00256 sinkPort = apiSuccess(master.addNode('', '', sinkName, pkg, node, TEST_MACHINE, 0))
00257 sourceUri = 'http://%s:%s/'%(testNodeAddr[0], sourcePort)
00258 sinkUri = 'http://%s:%s/'%(testNodeAddr[0], sinkPort)
00259 sourceUris[sourceName] = sourceUri
00260 sinks[sinkName] = ServerProxy(sinkUri)
00261 return sourceUris, sinks
00262
00263 def _sink_StartFlows(self, tests, sourceUris, sinks):
00264 """
00265 subroutine to connect the flows specified in the tests, purely
00266 using the sink API.
00267
00268 In the future it would be nice to verify that the flow truly
00269 exists, but for now trust what the sink says
00270 """
00271 reliable = 1
00272 for test in tests:
00273 sourceName, sinkName = test[0]
00274 sourceUri = sourceUris[sourceName]
00275 sink = sinks[sinkName]
00276 callerId, sourceLocator, sinkLocator = test[1]
00277 print "Testing", test
00278 val = apiSuccess(sink.sinkConnectFlow(callerId, sourceLocator, sinkLocator, sourceUri, reliable))
00279 assert type(val) == int
00280
00281 def _sink_KillFlows(self, tests, sinks):
00282 """
00283 subroutine to kill the flows specified in the tests, purely
00284 using the sink API (i.e. source will still be running)
00285
00286 In the future it would be nice to verify that the flow was
00287 killed, but for now trust what the sink says"""
00288 reliable = 1
00289 for test in tests:
00290 sourceName, sinkName = test[0]
00291 sink = sinks[sinkName]
00292 callerId, sourceLocator, sinkLocator = test[1]
00293 print "Testing", test
00294 assert 1 == apiSuccess(sink.sinkKillFlow(callerId, sourceLocator, sinkLocator)),\
00295 "sink did not report 1 flow killed: %s, %s"%(getLastVal(), getLastMsg())
00296
00297
00298
00299
00300
00301
00302
00303 def testSinkConnectFlow(self):
00304 master = self.master
00305 sourceUris, sinks = self._sink_StartNodes(self.sinkTests)
00306 self._sink_StartFlows(self.sinkTests, sourceUris, sinks)
00307
00308
00309
00310 def testSinkKillFlow(self):
00311 slave = self.slave
00312 master = self.master
00313 apiError(slave.sinkKillFlow('', '', ''))
00314
00315
00316 sourceUris, sinks = self._sink_StartNodes(self.sinkTests)
00317 self._sink_StartFlows(self.sinkTests, sourceUris, sinks)
00318 self._sink_KillFlows(self.sinkTests, sinks)
00319
00320 def testMisc(self):
00321 slave = self.slave
00322 assert slave is not None, "slave is None"
00323
00324 masterUri = apiSuccess(slave.getMasterUri(''))
00325 assert getMasterUri() == masterUri or getMasterUriAlt() == masterUri, masterUri
00326 assert masterUri == apiSuccess(slave.getMasterUri('a.different.id'))
00327
00328 pid = apiSuccess(slave.getPid(''))
00329 assert pid == apiSuccess(slave.getPid('a.different.id'))
00330
00331 apiError(slave.getPid(0))
00332 apiError(slave.getMasterUri(0))
00333 try:
00334 slave.shutdown('some.id')
00335 except:
00336 pass
00337 time.sleep(0.1)
00338 try:
00339 code, status, val = slave.getPid('')
00340 assert code < 1, "Slave is still running after shutdown"
00341 except:
00342 pass
00343