00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 import os
00036 import sys
00037 import string
00038 import xmlrpclib
00039
00040 import rospy
00041 from test_ros.rostest import *
00042
00043 singletest = None
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061 _required_publications = 'test_string_out', 'test_primitives_out', 'test_arrays_out', 'test_header_out'
00062 _required_subscriptions = 'test_string_in', 'test_primitives_in', 'test_arrays_in', 'test_header_in'
00063
00064
00065 class SlaveTestCase(TestRosClient):
00066
00067 def setUp(self):
00068 super(SlaveTestCase, self).setUp()
00069
00070 self.caller_id = rospy.get_caller_id()
00071 self.node_api = self.apiSuccess(self.master.lookupNode(self.caller_id, 'test_node'))
00072 self.assert_(self.node_api.startswith('http'))
00073 self.node = xmlrpclib.ServerProxy(self.node_api)
00074
00075 def testGetPid(self):
00076 pid = self.apiSuccess(self.node.getPid(self.caller_id))
00077 self.assert_(pid > 0)
00078
00079 def testGetPublications(self):
00080 publications = self.apiSuccess(self.node.getPublications(self.caller_id))
00081 self.assert_(publications is not None)
00082 expected = [rospy.resolve_name(t) for t in _required_publications]
00083 missing = set(expected) - set(publications)
00084 self.failIf(len(missing), 'missing required topics: %s'%(','.join(missing)))
00085
00086 def _subTestSourceRequestFlow(self, testName, protocols, testEval):
00087 master = self.master
00088 tests = [
00089 [['testSourceRequestFlow-%s-nodeA','testSourceRequestFlow-%s-nodeB',],
00090 ['node', 'testSourceRequestFlow-%s-nodeA.out', 'testSourceRequestFlow-%s-nodeB.in']],
00091 [['g1.testSourceRequestFlow-%s-nodeA','g1.testSourceRequestFlow-%s-nodeB',],
00092 ['g1.node', 'testSourceRequestFlow-%s-nodeA.out', 'testSourceRequestFlow-%s-nodeB.in']],
00093 [['g1.g2.g3.testSourceRequestFlow-%s-nodeA','g1.g2.testSourceRequestFlow-%s-nodeB',],
00094 ['g1.g2.node', 'g3.testSourceRequestFlow-%s-nodeA.out', 'testSourceRequestFlow-%s-nodeB.in']],
00095 [['g1.g2.testSourceRequestFlow-%s-nodeA','g1.g2.g3.testSourceRequestFlow-%s-nodeB',],
00096 ['g1.g2.node', 'testSourceRequestFlow-%s-nodeA.out', 'g3.testSourceRequestFlow-%s-nodeB.in']],
00097 ]
00098 sources = {}
00099
00100
00101 pkg, node = testNode
00102 for test in tests:
00103 sourceName, sinkName = [val%testName for val in test[0]]
00104 port = apiSuccess(master.addNode('', '', sourceName, pkg, node, TEST_MACHINE, 0))
00105 apiSuccess(master.addNode('', '', sinkName, pkg, node, TEST_MACHINE, 0))
00106 sourceUri = 'http://%s:%s/'%(testNodeAddr[0], port)
00107 sources[sourceName] = xmlrpclib.ServerProxy(sourceUri)
00108
00109 for test in tests:
00110 sourceName, sinkName = [val%testName for val in test[0]]
00111 source = sources[sourceName]
00112 callerId = test[1][0]
00113 sourceLocator, sinkLocator = [val%testName for val in test[1][1:]]
00114 args = source.sourceRequestFlow(callerId, sourceLocator, sinkLocator, protocols)
00115 testEval(args)
00116
00117
00118 def testSourceRequestFlow_TCPROS1(self):
00119 def testEval(args):
00120 protocol = apiSuccess(args)
00121 assert type(protocol) == list
00122 assert string.upper(protocol[0]) == 'TCPROS', "source should have returned TCPROS as the desired protocol"
00123 assert len(protocol) == 3, "TCPROS parameter spec should be length 3"
00124 protocols = [['TCPROS']]
00125 self._subTestSourceRequestFlow('TCPROS1', protocols, testEval)
00126
00127 def testSourceRequestFlow_TCPROS2(self):
00128 def testEval(args):
00129 protocol = apiSuccess(args)
00130 assert type(protocol) == list
00131 assert string.upper(protocol[0]) == 'TCPROS', "source should have returned TCPROS as the desired protocol"
00132 assert len(protocol) == 3, "TCPROS parameter spec should be length 3"
00133 protocols = [['Fake1', 123, 132], ['Fake2', 1.0], ['Fake3'], ['Fake4', 'string'], ['Fake5', ['a', 'list'], ['a', 'nother', 'list']], ['TCPROS'], ['Fakelast', 'fake'] ]
00134 self._subTestSourceRequestFlow('TCPROS2', protocols, testEval)
00135
00136 def testSourceRequestFlow_Fail(self):
00137 protocols = [['Fake1', 123, 132], ['Fake2', 1.0], ['Fake3'], ['Fake4', 'string'], ['Fake5', ['a', 'list'], ['a', 'nother', 'list']], ['Fakelast', 'fake'] ]
00138 self._subTestSourceRequestFlow('Fail', protocols, apiFail)
00139
00140 def testSourceRequestFlow_Errors(self):
00141 slave = self.slave
00142 master = self.master
00143
00144 apiError(slave.sourceRequestFlow('', '', ''))
00145 apiError(slave.sourceRequestFlow('', 'good.locator', 'badlocator'))
00146 apiError(slave.sourceRequestFlow('', 'badlocator', 'good.locator'))
00147
00148
00149
00150
00151
00152
00153 def testSourceKillFlow(self):
00154 slave = self.slave
00155 master = self.master
00156
00157 apiError(slave.sourceKillFlow('', '', ''))
00158 apiError(slave.sourceKillFlow('', 'good.locator', 'badlocator'))
00159 apiError(slave.sourceKillFlow('', 'badlocator', 'good.locator'))
00160
00161 tests = [
00162 [['testSourceKillFlow-nodeA','testSourceKillFlow-nodeB',],
00163 ['node', 'testSourceKillFlow-nodeA.out', 'testSourceKillFlow-nodeB.in']],
00164 [['g1.testSourceKillFlow-nodeA','g1.testSourceKillFlow-nodeB',],
00165 ['g1.node', 'testSourceKillFlow-nodeA.out', 'testSourceKillFlow-nodeB.in']],
00166 [['g1.g2.g3.testSourceKillFlow-nodeA','g1.g2.g3.testSourceKillFlow-nodeB',],
00167 ['g1.g2.node', 'g3.testSourceKillFlow-nodeA.out', 'g3.testSourceKillFlow-nodeB.in']],
00168 [['g1.g2.testSourceKillFlow-nodeA','g1.g2.testSourceKillFlow-nodeB',],
00169 ['g1.g2.node', 'testSourceKillFlow-nodeA.out', 'testSourceKillFlow-nodeB.in']],
00170 [['a1.g2.g3.testSourceKillFlow-nodeA','a1.g2.testSourceKillFlow-nodeB',],
00171 ['a1.node', 'g2.g3.testSourceKillFlow-nodeA.out', 'g2.testSourceKillFlow-nodeB.in']],
00172 [['a1.g2.testSourceKillFlow-nodeA','a1.g2.g3.testSourceKillFlow-nodeB',],
00173 ['a1.node', 'g2.testSourceKillFlow-nodeA.out', 'g2.g3.testSourceKillFlow-nodeB.in']],
00174
00175 ]
00176 sources = {}
00177
00178
00179 pkg, node = testNode
00180 for test in tests:
00181 sourceName, sinkName = test[0]
00182
00183 port = apiSuccess(master.addNode('', '', sourceName, pkg, node, TEST_MACHINE, 0))
00184 apiSuccess(master.addNode('', '', sinkName, pkg, node, TEST_MACHINE, 0))
00185 sourceUri = 'http://%s:%s/'%(testNodeAddr[0], port)
00186 sources[sourceName] = xmlrpclib.ServerProxy(sourceUri)
00187
00188 callerId, sourceLocator, sinkLocator = test[1]
00189 apiSuccess(master.connectFlow(callerId, sourceLocator, sinkLocator, 1))
00190
00191
00192 for test in tests:
00193 source = sources[test[0][0]]
00194 callerId, sourceLocator, sinkLocator = test[1]
00195
00196
00197 val = apiSuccess(source.sourceKillFlow(callerId, sourceLocator, 'fake.sink'))
00198 assert val == 0, "flowsKilled should be 0 for non-existent flow [fakesink]"
00199
00200 apiError(source.sourceKillFlow(callerId, 'fake.source', sinkLocator))
00201
00202
00203 for test in tests:
00204 source = sources[test[0][0]]
00205 callerId, sourceL, sinkL = test[1]
00206 for sub_test in tests:
00207 sub_callerId, sub_source, sub_sink = test[1]
00208 if sub_callerId == callerId and sub_source == sourceL and sub_sink == sinkL:
00209 continue
00210 val = apiSuccess(source.sourceKillFlow(sub_callerId, sub_source, sub_sink))
00211 assert val == 0, "flowsKilled should be 0 for non-existent flow [different context: %s,%s,%s on %s,%s,%s]"%(sub_callerId, sub_source, sub_sink, callerId, sourceL, sinkL)
00212
00213
00214 for test in tests:
00215 source = sources[test[0][0]]
00216 callerId, sourceLocator, sinkLocator = test[1]
00217 val = apiSuccess(source.sourceKillFlow(callerId, sourceLocator, sinkLocator))
00218 assert type(val) == int
00219 assert val == 1, "Source did not report 1 flow killed: %s, %s"%(val, getLastMsg())
00220
00221
00222 sinkTests = [
00223 [['testSinkConnectFlow-nodeA','testSinkConnectFlow-nodeB',],
00224 ['node', 'testSinkConnectFlow-nodeA.out', 'testSinkConnectFlow-nodeB.in']],
00225 [['g1.testSinkConnectFlow-nodeA','g1.testSinkConnectFlow-nodeB',],
00226 ['g1.node', 'testSinkConnectFlow-nodeA.out', 'testSinkConnectFlow-nodeB.in']],
00227 [['g1.g2.g3.testSinkConnectFlow-nodeA','g1.g2.testSinkConnectFlow-nodeB',],
00228 ['g1.g2.node', 'g3.testSinkConnectFlow-nodeA.out', 'testSinkConnectFlow-nodeB.in']],
00229 [['g1.g2.testSinkConnectFlow-nodeA','g1.g2.g3.testSinkConnectFlow-nodeB',],
00230 ['g1.g2.node', 'testSinkConnectFlow-nodeA.out', 'g3.testSinkConnectFlow-nodeB.in']],
00231
00232 [['a1.a2.testSinkConnectFlow-nodeA','b1.b2.testSinkConnectFlow-nodeB',],
00233 ['node', 'a1.a2.testSinkConnectFlow-nodeA.out', 'b1.b2.testSinkConnectFlow-nodeB.in']],
00234
00235 [['c1.node.testSinkConnectFlow-nodeA','c1.node2.testSinkConnectFlow-nodeB',],
00236 ['c1.node2', 'node.testSinkConnectFlow-nodeA.out', '.testSinkConnectFlow-nodeB.in']],
00237
00238 ]
00239
00240 def _sink_StartNodes(self, tests):
00241 """
00242 Test subroutine to startup all the nodes specified in the tests
00243 """
00244 master = self.master
00245 sourceUris = {}
00246 sinks = {}
00247
00248
00249 pkg, node = testNode
00250 for test in tests:
00251 sourceName, sinkName = test[0]
00252 sourcePort = apiSuccess(master.addNode('', '', sourceName, pkg, node, TEST_MACHINE, 0))
00253 sinkPort = apiSuccess(master.addNode('', '', sinkName, pkg, node, TEST_MACHINE, 0))
00254 sourceUri = 'http://%s:%s/'%(testNodeAddr[0], sourcePort)
00255 sinkUri = 'http://%s:%s/'%(testNodeAddr[0], sinkPort)
00256 sourceUris[sourceName] = sourceUri
00257 sinks[sinkName] = xmlrpclib.ServerProxy(sinkUri)
00258 return sourceUris, sinks
00259
00260 def _sink_StartFlows(self, tests, sourceUris, sinks):
00261 """
00262 subroutine to connect the flows specified in the tests, purely
00263 using the sink API.
00264
00265 In the future it would be nice to verify that the flow truly
00266 exists, but for now trust what the sink says
00267 """
00268 reliable = 1
00269 for test in tests:
00270 sourceName, sinkName = test[0]
00271 sourceUri = sourceUris[sourceName]
00272 sink = sinks[sinkName]
00273 callerId, sourceLocator, sinkLocator = test[1]
00274 print "Testing", test
00275 val = apiSuccess(sink.sinkConnectFlow(callerId, sourceLocator, sinkLocator, sourceUri, reliable))
00276 assert type(val) == int
00277
00278 def _sink_KillFlows(self, tests, sinks):
00279 """
00280 subroutine to kill the flows specified in the tests, purely
00281 using the sink API (i.e. source will still be running)
00282
00283 In the future it would be nice to verify that the flow was
00284 killed, but for now trust what the sink says"""
00285 reliable = 1
00286 for test in tests:
00287 sourceName, sinkName = test[0]
00288 sink = sinks[sinkName]
00289 callerId, sourceLocator, sinkLocator = test[1]
00290 print "Testing", test
00291 assert 1 == apiSuccess(sink.sinkKillFlow(callerId, sourceLocator, sinkLocator)),\
00292 "sink did not report 1 flow killed: %s, %s"%(getLastVal(), getLastMsg())
00293
00294
00295
00296
00297
00298
00299
00300 def testSinkConnectFlow(self):
00301 master = self.master
00302 sourceUris, sinks = self._sink_StartNodes(self.sinkTests)
00303 self._sink_StartFlows(self.sinkTests, sourceUris, sinks)
00304
00305
00306
00307 def testSinkKillFlow(self):
00308 slave = self.slave
00309 master = self.master
00310 apiError(slave.sinkKillFlow('', '', ''))
00311
00312
00313 sourceUris, sinks = self._sink_StartNodes(self.sinkTests)
00314 self._sink_StartFlows(self.sinkTests, sourceUris, sinks)
00315 self._sink_KillFlows(self.sinkTests, sinks)
00316
00317 def testMisc(self):
00318 slave = self.slave
00319 assert slave is not None, "slave is None"
00320
00321 masterUri = apiSuccess(slave.getMasterUri(''))
00322 assert getMasterUri() == masterUri or getMasterUriAlt() == masterUri, masterUri
00323 assert masterUri == apiSuccess(slave.getMasterUri('a.different.id'))
00324
00325 pid = apiSuccess(slave.getPid(''))
00326 assert pid == apiSuccess(slave.getPid('a.different.id'))
00327
00328 apiError(slave.getPid(0))
00329 apiError(slave.getMasterUri(0))
00330 try:
00331 slave.shutdown('some.id')
00332 except:
00333 pass
00334 time.sleep(0.1)
00335 try:
00336 code, status, val = slave.getPid('')
00337 assert code < 1, "Slave is still running after shutdown"
00338 except:
00339 pass
00340