00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 
00033 
00034 
00035 import os
00036 import sys
00037 import string
00038 import xmlrpclib
00039 
00040 import rospy
00041 from rostest import *
00042 
00043 singletest = None
00044 
00045 
00046 
00047 
00048 
00049 
00050 
00051 
00052 
00053 
00054 
00055 
00056 
00057 
00058 
00059 
00060 
00061 _required_publications  = 'test_string_out', 'test_primitives_out', 'test_arrays_out', 'test_header_out'
00062 _required_subscriptions = 'test_string_in', 'test_primitives_in', 'test_arrays_in', 'test_header_in'
00063 
00064 
00065 class SlaveTestCase(TestRosClient):
00066 
00067     def setUp(self):
00068         super(SlaveTestCase, self).setUp()
00069         
00070         self.caller_id = rospy.get_caller_id()
00071         self.node_api = self.apiSuccess(self.master.lookupNode(self.caller_id, 'test_node'))
00072         self.assert_(self.node_api.startswith('http'))
00073         self.node = xmlrpclib.ServerProxy(self.node_api)
00074         
00075     def testGetPid(self):
00076         pid = self.apiSuccess(self.node.getPid(self.caller_id))
00077         self.assert_(pid > 0)
00078         
00079     def testGetPublications(self):
00080         publications = self.apiSuccess(self.node.getPublications(self.caller_id))
00081         self.assert_(publications is not None)
00082         expected = [rospy.resolve_name(t) for t in _required_publications]
00083         missing = set(expected) - set(publications) 
00084         self.failIf(len(missing), 'missing required topics: %s'%(','.join(missing)))
00085 
00086     def _subTestSourceRequestFlow(self, testName, protocols, testEval):
00087         master = self.master
00088         tests = [
00089             [['testSourceRequestFlow-%s-nodeA','testSourceRequestFlow-%s-nodeB',],
00090              ['node', 'testSourceRequestFlow-%s-nodeA.out', 'testSourceRequestFlow-%s-nodeB.in']],
00091             [['g1.testSourceRequestFlow-%s-nodeA','g1.testSourceRequestFlow-%s-nodeB',],
00092              ['g1.node', 'testSourceRequestFlow-%s-nodeA.out', 'testSourceRequestFlow-%s-nodeB.in']],
00093             [['g1.g2.g3.testSourceRequestFlow-%s-nodeA','g1.g2.testSourceRequestFlow-%s-nodeB',],
00094              ['g1.g2.node', 'g3.testSourceRequestFlow-%s-nodeA.out', 'testSourceRequestFlow-%s-nodeB.in']],
00095             [['g1.g2.testSourceRequestFlow-%s-nodeA','g1.g2.g3.testSourceRequestFlow-%s-nodeB',],
00096              ['g1.g2.node', 'testSourceRequestFlow-%s-nodeA.out', 'g3.testSourceRequestFlow-%s-nodeB.in']],
00097             ]
00098         sources = {}
00099         
00100         
00101         pkg, node = testNode
00102         for test in tests:
00103             sourceName, sinkName = [val%testName for val in test[0]]
00104             port = apiSuccess(master.addNode('', '', sourceName, pkg, node, TEST_MACHINE, 0))
00105             apiSuccess(master.addNode('', '', sinkName, pkg, node, TEST_MACHINE, 0))
00106             sourceUri = 'http://%s:%s/'%(testNodeAddr[0], port)
00107             sources[sourceName] = xmlrpclib.ServerProxy(sourceUri)
00108 
00109         for test in tests:
00110             sourceName, sinkName = [val%testName for val in test[0]]
00111             source = sources[sourceName]
00112             callerId = test[1][0]
00113             sourceLocator, sinkLocator = [val%testName for val in test[1][1:]]
00114             args = source.sourceRequestFlow(callerId, sourceLocator, sinkLocator, protocols)
00115             testEval(args)
00116         
00117     
00118     def testSourceRequestFlow_TCPROS1(self):
00119         def testEval(args):
00120             protocol = apiSuccess(args)
00121             assert type(protocol) == list
00122             assert string.upper(protocol[0]) == 'TCPROS', "source should have returned TCPROS as the desired protocol"
00123             assert len(protocol) == 3, "TCPROS parameter spec should be length 3"
00124         protocols = [['TCPROS']]
00125         self._subTestSourceRequestFlow('TCPROS1', protocols, testEval)
00126 
00127     def testSourceRequestFlow_TCPROS2(self):
00128         def testEval(args):
00129             protocol = apiSuccess(args)
00130             assert type(protocol) == list            
00131             assert string.upper(protocol[0]) == 'TCPROS', "source should have returned TCPROS as the desired protocol"
00132             assert len(protocol) == 3, "TCPROS parameter spec should be length 3"
00133         protocols = [['Fake1', 123, 132], ['Fake2', 1.0], ['Fake3'], ['Fake4', 'string'], ['Fake5', ['a', 'list'], ['a', 'nother', 'list']], ['TCPROS'], ['Fakelast', 'fake'] ]
00134         self._subTestSourceRequestFlow('TCPROS2', protocols, testEval)
00135 
00136     def testSourceRequestFlow_Fail(self):
00137         protocols = [['Fake1', 123, 132], ['Fake2', 1.0], ['Fake3'], ['Fake4', 'string'], ['Fake5', ['a', 'list'], ['a', 'nother', 'list']], ['Fakelast', 'fake'] ]
00138         self._subTestSourceRequestFlow('Fail', protocols, apiFail)
00139 
00140     def testSourceRequestFlow_Errors(self):
00141         slave = self.slave
00142         master = self.master
00143         
00144         apiError(slave.sourceRequestFlow('', '', ''))
00145         apiError(slave.sourceRequestFlow('', 'good.locator', 'badlocator'))
00146         apiError(slave.sourceRequestFlow('', 'badlocator', 'good.locator'))
00147                 
00148 
00149 
00150 
00151 
00152 
00153     def testSourceKillFlow(self):
00154         slave = self.slave
00155         master = self.master
00156         
00157         apiError(slave.sourceKillFlow('', '', ''))
00158         apiError(slave.sourceKillFlow('', 'good.locator', 'badlocator'))
00159         apiError(slave.sourceKillFlow('', 'badlocator', 'good.locator'))
00160         
00161         tests = [
00162             [['testSourceKillFlow-nodeA','testSourceKillFlow-nodeB',],
00163              ['node', 'testSourceKillFlow-nodeA.out', 'testSourceKillFlow-nodeB.in']],
00164             [['g1.testSourceKillFlow-nodeA','g1.testSourceKillFlow-nodeB',],
00165              ['g1.node', 'testSourceKillFlow-nodeA.out', 'testSourceKillFlow-nodeB.in']],
00166             [['g1.g2.g3.testSourceKillFlow-nodeA','g1.g2.g3.testSourceKillFlow-nodeB',],
00167              ['g1.g2.node', 'g3.testSourceKillFlow-nodeA.out', 'g3.testSourceKillFlow-nodeB.in']],
00168             [['g1.g2.testSourceKillFlow-nodeA','g1.g2.testSourceKillFlow-nodeB',],
00169              ['g1.g2.node', 'testSourceKillFlow-nodeA.out', 'testSourceKillFlow-nodeB.in']],
00170             [['a1.g2.g3.testSourceKillFlow-nodeA','a1.g2.testSourceKillFlow-nodeB',],
00171              ['a1.node', 'g2.g3.testSourceKillFlow-nodeA.out', 'g2.testSourceKillFlow-nodeB.in']],
00172             [['a1.g2.testSourceKillFlow-nodeA','a1.g2.g3.testSourceKillFlow-nodeB',],
00173              ['a1.node', 'g2.testSourceKillFlow-nodeA.out', 'g2.g3.testSourceKillFlow-nodeB.in']],
00174 
00175             ]
00176         sources = {}
00177         
00178         
00179         pkg, node = testNode
00180         for test in tests:
00181             sourceName, sinkName = test[0]
00182             
00183             port = apiSuccess(master.addNode('', '', sourceName, pkg, node, TEST_MACHINE, 0))
00184             apiSuccess(master.addNode('', '', sinkName, pkg, node, TEST_MACHINE, 0))
00185             sourceUri = 'http://%s:%s/'%(testNodeAddr[0], port)
00186             sources[sourceName] = xmlrpclib.ServerProxy(sourceUri)
00187             
00188             callerId, sourceLocator, sinkLocator = test[1]
00189             apiSuccess(master.connectFlow(callerId, sourceLocator, sinkLocator, 1))
00190             
00191         
00192         for test in tests:
00193             source = sources[test[0][0]]
00194             callerId, sourceLocator, sinkLocator = test[1]
00195             
00196             
00197             val = apiSuccess(source.sourceKillFlow(callerId, sourceLocator, 'fake.sink'))
00198             assert val == 0, "flowsKilled should be 0 for non-existent flow [fakesink]"
00199             
00200             apiError(source.sourceKillFlow(callerId, 'fake.source', sinkLocator))
00201             
00202         
00203         for test in tests:
00204             source = sources[test[0][0]]
00205             callerId, sourceL, sinkL = test[1]
00206             for sub_test in tests:
00207                 sub_callerId, sub_source, sub_sink = test[1]
00208                 if sub_callerId == callerId and sub_source == sourceL and sub_sink == sinkL:
00209                     continue
00210                 val = apiSuccess(source.sourceKillFlow(sub_callerId, sub_source, sub_sink))
00211                 assert val == 0, "flowsKilled should be 0 for non-existent flow [different context: %s,%s,%s on %s,%s,%s]"%(sub_callerId, sub_source, sub_sink, callerId, sourceL, sinkL)
00212                 
00213         
00214         for test in tests:
00215             source = sources[test[0][0]]
00216             callerId, sourceLocator, sinkLocator = test[1]
00217             val = apiSuccess(source.sourceKillFlow(callerId, sourceLocator, sinkLocator))
00218             assert type(val) == int
00219             assert val == 1, "Source did not report 1 flow killed: %s, %s"%(val, getLastMsg())
00220         
00221     
00222     sinkTests = [
00223         [['testSinkConnectFlow-nodeA','testSinkConnectFlow-nodeB',],
00224          ['node', 'testSinkConnectFlow-nodeA.out', 'testSinkConnectFlow-nodeB.in']],
00225         [['g1.testSinkConnectFlow-nodeA','g1.testSinkConnectFlow-nodeB',],
00226          ['g1.node', 'testSinkConnectFlow-nodeA.out', 'testSinkConnectFlow-nodeB.in']],
00227         [['g1.g2.g3.testSinkConnectFlow-nodeA','g1.g2.testSinkConnectFlow-nodeB',],
00228          ['g1.g2.node', 'g3.testSinkConnectFlow-nodeA.out', 'testSinkConnectFlow-nodeB.in']],
00229         [['g1.g2.testSinkConnectFlow-nodeA','g1.g2.g3.testSinkConnectFlow-nodeB',],
00230          ['g1.g2.node', 'testSinkConnectFlow-nodeA.out', 'g3.testSinkConnectFlow-nodeB.in']],
00231         
00232         [['a1.a2.testSinkConnectFlow-nodeA','b1.b2.testSinkConnectFlow-nodeB',],
00233          ['node', 'a1.a2.testSinkConnectFlow-nodeA.out', 'b1.b2.testSinkConnectFlow-nodeB.in']],
00234         
00235         [['c1.node.testSinkConnectFlow-nodeA','c1.node2.testSinkConnectFlow-nodeB',],
00236          ['c1.node2', 'node.testSinkConnectFlow-nodeA.out', '.testSinkConnectFlow-nodeB.in']],
00237 
00238         ]
00239 
00240     def _sink_StartNodes(self, tests):
00241         """
00242         Test subroutine to startup all the nodes specified in the tests
00243         """
00244         master = self.master
00245         sourceUris = {}
00246         sinks = {}
00247         
00248         
00249         pkg, node = testNode
00250         for test in tests:
00251             sourceName, sinkName = test[0]
00252             sourcePort = apiSuccess(master.addNode('', '', sourceName, pkg, node, TEST_MACHINE, 0))
00253             sinkPort = apiSuccess(master.addNode('', '', sinkName, pkg, node, TEST_MACHINE, 0))
00254             sourceUri = 'http://%s:%s/'%(testNodeAddr[0], sourcePort)
00255             sinkUri = 'http://%s:%s/'%(testNodeAddr[0], sinkPort)
00256             sourceUris[sourceName] = sourceUri
00257             sinks[sinkName] = xmlrpclib.ServerProxy(sinkUri)
00258         return sourceUris, sinks
00259 
00260     def _sink_StartFlows(self, tests, sourceUris, sinks):
00261         """
00262         subroutine to connect the flows specified in the tests, purely
00263         using the sink API.
00264 
00265         In the future it would be nice to verify that the flow truly
00266         exists, but for now trust what the sink says
00267         """
00268         reliable = 1 
00269         for test in tests:
00270             sourceName, sinkName = test[0]
00271             sourceUri = sourceUris[sourceName]
00272             sink = sinks[sinkName]
00273             callerId, sourceLocator, sinkLocator = test[1]
00274             print "Testing", test
00275             val = apiSuccess(sink.sinkConnectFlow(callerId, sourceLocator, sinkLocator, sourceUri, reliable))
00276             assert type(val) == int
00277 
00278     def _sink_KillFlows(self, tests, sinks):
00279         """
00280         subroutine to kill the flows specified in the tests, purely
00281         using the sink API (i.e. source will still be running)
00282 
00283         In the future it would be nice to verify that the flow was
00284         killed, but for now trust what the sink says"""
00285         reliable = 1 
00286         for test in tests:
00287             sourceName, sinkName = test[0]
00288             sink = sinks[sinkName]
00289             callerId, sourceLocator, sinkLocator = test[1]
00290             print "Testing", test
00291             assert 1 == apiSuccess(sink.sinkKillFlow(callerId, sourceLocator, sinkLocator)),\
00292                    "sink did not report 1 flow killed: %s, %s"%(getLastVal(), getLastMsg())
00293     
00294     
00295     
00296     
00297     
00298     
00299 
00300     def testSinkConnectFlow(self):
00301         master = self.master
00302         sourceUris, sinks = self._sink_StartNodes(self.sinkTests)
00303         self._sink_StartFlows(self.sinkTests, sourceUris, sinks)
00304 
00305 
00306 
00307     def testSinkKillFlow(self):
00308         slave = self.slave
00309         master = self.master
00310         apiError(slave.sinkKillFlow('', '', ''))
00311         
00312         
00313         sourceUris, sinks = self._sink_StartNodes(self.sinkTests)
00314         self._sink_StartFlows(self.sinkTests, sourceUris, sinks)
00315         self._sink_KillFlows(self.sinkTests, sinks)        
00316     
00317     def testMisc(self):
00318         slave = self.slave
00319         assert slave is not None, "slave is None"
00320         
00321         masterUri = apiSuccess(slave.getMasterUri(''))
00322         assert getMasterUri() == masterUri or getMasterUriAlt() == masterUri, masterUri
00323         assert masterUri == apiSuccess(slave.getMasterUri('a.different.id'))
00324         
00325         pid = apiSuccess(slave.getPid(''))
00326         assert pid == apiSuccess(slave.getPid('a.different.id'))
00327         
00328         apiError(slave.getPid(0))
00329         apiError(slave.getMasterUri(0))        
00330         try:
00331             slave.shutdown('some.id')
00332         except:
00333             pass
00334         time.sleep(0.1)
00335         try:
00336             code, status, val = slave.getPid('')
00337             assert code < 1, "Slave is still running after shutdown"
00338         except:
00339             pass
00340