testSlave.py
Go to the documentation of this file.
00001 # Software License Agreement (BSD License)
00002 #
00003 # Copyright (c) 2008, Willow Garage, Inc.
00004 # All rights reserved.
00005 #
00006 # Redistribution and use in source and binary forms, with or without
00007 # modification, are permitted provided that the following conditions
00008 # are met:
00009 #
00010 #  * Redistributions of source code must retain the above copyright
00011 #    notice, this list of conditions and the following disclaimer.
00012 #  * Redistributions in binary form must reproduce the above
00013 #    copyright notice, this list of conditions and the following
00014 #    disclaimer in the documentation and/or other materials provided
00015 #    with the distribution.
00016 #  * Neither the name of Willow Garage, Inc. nor the names of its
00017 #    contributors may be used to endorse or promote products derived
00018 #    from this software without specific prior written permission.
00019 #
00020 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00021 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00022 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00023 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00024 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00025 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00026 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00027 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00028 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00029 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00030 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00031 # POSSIBILITY OF SUCH DAMAGE.
00032 #
00033 # Revision $Id$
00034 
00035 import os
00036 import sys
00037 import string
00038 import xmlrpclib
00039 
00040 import rospy
00041 from rostest import *
00042 
00043 singletest = None
00044 #singletest = 'testSourceKillFlow'
00045 
00046 #TODO: test single source to multiple sinks
00047 #TODO: test multiple sources to single sink
00048 
00049 # test_string_in
00050 # test_string_out
00051 
00052 # test_primitives_in
00053 # test_primitives_out
00054 
00055 # test_arrays_in
00056 # test_arrays_out
00057 
00058 # test_header_in
00059 # test_header_out
00060 
00061 _required_publications  = 'test_string_out', 'test_primitives_out', 'test_arrays_out', 'test_header_out'
00062 _required_subscriptions = 'test_string_in', 'test_primitives_in', 'test_arrays_in', 'test_header_in'
00063 
00064 ## Expects a single test node to be running with name 'test_node' and subscribed to 'test_string'
00065 class SlaveTestCase(TestRosClient):
00066 
00067     def setUp(self):
00068         super(SlaveTestCase, self).setUp()
00069         # retrieve handle on node
00070         self.caller_id = rospy.get_caller_id()
00071         self.node_api = self.apiSuccess(self.master.lookupNode(self.caller_id, 'test_node'))
00072         self.assert_(self.node_api.startswith('http'))
00073         self.node = xmlrpclib.ServerProxy(self.node_api)
00074         
00075     def testGetPid(self):
00076         pid = self.apiSuccess(self.node.getPid(self.caller_id))
00077         self.assert_(pid > 0)
00078         
00079     def testGetPublications(self):
00080         publications = self.apiSuccess(self.node.getPublications(self.caller_id))
00081         self.assert_(publications is not None)
00082         expected = [rospy.resolve_name(t) for t in _required_publications]
00083         missing = set(expected) - set(publications) 
00084         self.failIf(len(missing), 'missing required topics: %s'%(','.join(missing)))
00085 
00086     def _subTestSourceRequestFlow(self, testName, protocols, testEval):
00087         master = self.master
00088         tests = [
00089             [['testSourceRequestFlow-%s-nodeA','testSourceRequestFlow-%s-nodeB',],
00090              ['node', 'testSourceRequestFlow-%s-nodeA.out', 'testSourceRequestFlow-%s-nodeB.in']],
00091             [['g1.testSourceRequestFlow-%s-nodeA','g1.testSourceRequestFlow-%s-nodeB',],
00092              ['g1.node', 'testSourceRequestFlow-%s-nodeA.out', 'testSourceRequestFlow-%s-nodeB.in']],
00093             [['g1.g2.g3.testSourceRequestFlow-%s-nodeA','g1.g2.testSourceRequestFlow-%s-nodeB',],
00094              ['g1.g2.node', 'g3.testSourceRequestFlow-%s-nodeA.out', 'testSourceRequestFlow-%s-nodeB.in']],
00095             [['g1.g2.testSourceRequestFlow-%s-nodeA','g1.g2.g3.testSourceRequestFlow-%s-nodeB',],
00096              ['g1.g2.node', 'testSourceRequestFlow-%s-nodeA.out', 'g3.testSourceRequestFlow-%s-nodeB.in']],
00097             ]
00098         sources = {}
00099         #start the nodes
00100         # - save the source as we will be making calls on it
00101         pkg, node = testNode
00102         for test in tests:
00103             sourceName, sinkName = [val%testName for val in test[0]]
00104             port = apiSuccess(master.addNode('', '', sourceName, pkg, node, TEST_MACHINE, 0))
00105             apiSuccess(master.addNode('', '', sinkName, pkg, node, TEST_MACHINE, 0))
00106             sourceUri = 'http://%s:%s/'%(testNodeAddr[0], port)
00107             sources[sourceName] = xmlrpclib.ServerProxy(sourceUri)
00108 
00109         for test in tests:
00110             sourceName, sinkName = [val%testName for val in test[0]]
00111             source = sources[sourceName]
00112             callerId = test[1][0]
00113             sourceLocator, sinkLocator = [val%testName for val in test[1][1:]]
00114             args = source.sourceRequestFlow(callerId, sourceLocator, sinkLocator, protocols)
00115             testEval(args)
00116         #TODO: test locator name resolution            
00117     
00118     def testSourceRequestFlow_TCPROS1(self):
00119         def testEval(args):
00120             protocol = apiSuccess(args)
00121             assert type(protocol) == list
00122             assert string.upper(protocol[0]) == 'TCPROS', "source should have returned TCPROS as the desired protocol"
00123             assert len(protocol) == 3, "TCPROS parameter spec should be length 3"
00124         protocols = [['TCPROS']]
00125         self._subTestSourceRequestFlow('TCPROS1', protocols, testEval)
00126 
00127     def testSourceRequestFlow_TCPROS2(self):
00128         def testEval(args):
00129             protocol = apiSuccess(args)
00130             assert type(protocol) == list            
00131             assert string.upper(protocol[0]) == 'TCPROS', "source should have returned TCPROS as the desired protocol"
00132             assert len(protocol) == 3, "TCPROS parameter spec should be length 3"
00133         protocols = [['Fake1', 123, 132], ['Fake2', 1.0], ['Fake3'], ['Fake4', 'string'], ['Fake5', ['a', 'list'], ['a', 'nother', 'list']], ['TCPROS'], ['Fakelast', 'fake'] ]
00134         self._subTestSourceRequestFlow('TCPROS2', protocols, testEval)
00135 
00136     def testSourceRequestFlow_Fail(self):
00137         protocols = [['Fake1', 123, 132], ['Fake2', 1.0], ['Fake3'], ['Fake4', 'string'], ['Fake5', ['a', 'list'], ['a', 'nother', 'list']], ['Fakelast', 'fake'] ]
00138         self._subTestSourceRequestFlow('Fail', protocols, apiFail)
00139 
00140     def testSourceRequestFlow_Errors(self):
00141         slave = self.slave
00142         master = self.master
00143         #test that malformed locators return error codes
00144         apiError(slave.sourceRequestFlow('', '', ''))
00145         apiError(slave.sourceRequestFlow('', 'good.locator', 'badlocator'))
00146         apiError(slave.sourceRequestFlow('', 'badlocator', 'good.locator'))
00147                 
00148 # sourceKillFlow(callerId, sourceLocator, sinkLocator)
00149 #
00150 # * called by master
00151 # * returns int
00152 
00153     def testSourceKillFlow(self):
00154         slave = self.slave
00155         master = self.master
00156         #test that malformed locators return error codes
00157         apiError(slave.sourceKillFlow('', '', ''))
00158         apiError(slave.sourceKillFlow('', 'good.locator', 'badlocator'))
00159         apiError(slave.sourceKillFlow('', 'badlocator', 'good.locator'))
00160         
00161         tests = [
00162             [['testSourceKillFlow-nodeA','testSourceKillFlow-nodeB',],
00163              ['node', 'testSourceKillFlow-nodeA.out', 'testSourceKillFlow-nodeB.in']],
00164             [['g1.testSourceKillFlow-nodeA','g1.testSourceKillFlow-nodeB',],
00165              ['g1.node', 'testSourceKillFlow-nodeA.out', 'testSourceKillFlow-nodeB.in']],
00166             [['g1.g2.g3.testSourceKillFlow-nodeA','g1.g2.g3.testSourceKillFlow-nodeB',],
00167              ['g1.g2.node', 'g3.testSourceKillFlow-nodeA.out', 'g3.testSourceKillFlow-nodeB.in']],
00168             [['g1.g2.testSourceKillFlow-nodeA','g1.g2.testSourceKillFlow-nodeB',],
00169              ['g1.g2.node', 'testSourceKillFlow-nodeA.out', 'testSourceKillFlow-nodeB.in']],
00170             [['a1.g2.g3.testSourceKillFlow-nodeA','a1.g2.testSourceKillFlow-nodeB',],
00171              ['a1.node', 'g2.g3.testSourceKillFlow-nodeA.out', 'g2.testSourceKillFlow-nodeB.in']],
00172             [['a1.g2.testSourceKillFlow-nodeA','a1.g2.g3.testSourceKillFlow-nodeB',],
00173              ['a1.node', 'g2.testSourceKillFlow-nodeA.out', 'g2.g3.testSourceKillFlow-nodeB.in']],
00174 
00175             ]
00176         sources = {}
00177         #start the flows
00178         # - save the source as we will be making calls on it
00179         pkg, node = testNode
00180         for test in tests:
00181             sourceName, sinkName = test[0]
00182             # - start the source/sink nodes
00183             port = apiSuccess(master.addNode('', '', sourceName, pkg, node, TEST_MACHINE, 0))
00184             apiSuccess(master.addNode('', '', sinkName, pkg, node, TEST_MACHINE, 0))
00185             sourceUri = 'http://%s:%s/'%(testNodeAddr[0], port)
00186             sources[sourceName] = xmlrpclib.ServerProxy(sourceUri)
00187             # - start the flow
00188             callerId, sourceLocator, sinkLocator = test[1]
00189             apiSuccess(master.connectFlow(callerId, sourceLocator, sinkLocator, 1))
00190             
00191         #attempt to kill flow with 1 wrong endpoint
00192         for test in tests:
00193             source = sources[test[0][0]]
00194             callerId, sourceLocator, sinkLocator = test[1]
00195             #sourceKill flow does a silent succeed if there is no flow to sink, but returns
00196             #0 flows killed
00197             val = apiSuccess(source.sourceKillFlow(callerId, sourceLocator, 'fake.sink'))
00198             assert val == 0, "flowsKilled should be 0 for non-existent flow [fakesink]"
00199             #sourceKill flow fails if source param does not refer to it
00200             apiError(source.sourceKillFlow(callerId, 'fake.source', sinkLocator))
00201             
00202         #attempt to kill flow with wrong context
00203         for test in tests:
00204             source = sources[test[0][0]]
00205             callerId, sourceL, sinkL = test[1]
00206             for sub_test in tests:
00207                 sub_callerId, sub_source, sub_sink = test[1]
00208                 if sub_callerId == callerId and sub_source == sourceL and sub_sink == sinkL:
00209                     continue
00210                 val = apiSuccess(source.sourceKillFlow(sub_callerId, sub_source, sub_sink))
00211                 assert val == 0, "flowsKilled should be 0 for non-existent flow [different context: %s,%s,%s on %s,%s,%s]"%(sub_callerId, sub_source, sub_sink, callerId, sourceL, sinkL)
00212                 
00213         # try to kill all flows on the source that we started
00214         for test in tests:
00215             source = sources[test[0][0]]
00216             callerId, sourceLocator, sinkLocator = test[1]
00217             val = apiSuccess(source.sourceKillFlow(callerId, sourceLocator, sinkLocator))
00218             assert type(val) == int
00219             assert val == 1, "Source did not report 1 flow killed: %s, %s"%(val, getLastMsg())
00220         #TODO: test locator name resolution            
00221     
00222     sinkTests = [
00223         [['testSinkConnectFlow-nodeA','testSinkConnectFlow-nodeB',],
00224          ['node', 'testSinkConnectFlow-nodeA.out', 'testSinkConnectFlow-nodeB.in']],
00225         [['g1.testSinkConnectFlow-nodeA','g1.testSinkConnectFlow-nodeB',],
00226          ['g1.node', 'testSinkConnectFlow-nodeA.out', 'testSinkConnectFlow-nodeB.in']],
00227         [['g1.g2.g3.testSinkConnectFlow-nodeA','g1.g2.testSinkConnectFlow-nodeB',],
00228          ['g1.g2.node', 'g3.testSinkConnectFlow-nodeA.out', 'testSinkConnectFlow-nodeB.in']],
00229         [['g1.g2.testSinkConnectFlow-nodeA','g1.g2.g3.testSinkConnectFlow-nodeB',],
00230          ['g1.g2.node', 'testSinkConnectFlow-nodeA.out', 'g3.testSinkConnectFlow-nodeB.in']],
00231         #separate subgraphs
00232         [['a1.a2.testSinkConnectFlow-nodeA','b1.b2.testSinkConnectFlow-nodeB',],
00233          ['node', 'a1.a2.testSinkConnectFlow-nodeA.out', 'b1.b2.testSinkConnectFlow-nodeB.in']],
00234         # '.locator' resolution
00235         [['c1.node.testSinkConnectFlow-nodeA','c1.node2.testSinkConnectFlow-nodeB',],
00236          ['c1.node2', 'node.testSinkConnectFlow-nodeA.out', '.testSinkConnectFlow-nodeB.in']],
00237 
00238         ]
00239 
00240     def _sink_StartNodes(self, tests):
00241         """
00242         Test subroutine to startup all the nodes specified in the tests
00243         """
00244         master = self.master
00245         sourceUris = {}
00246         sinks = {}
00247         #start the nodes
00248         # - save the sink as we will be making calls on it
00249         pkg, node = testNode
00250         for test in tests:
00251             sourceName, sinkName = test[0]
00252             sourcePort = apiSuccess(master.addNode('', '', sourceName, pkg, node, TEST_MACHINE, 0))
00253             sinkPort = apiSuccess(master.addNode('', '', sinkName, pkg, node, TEST_MACHINE, 0))
00254             sourceUri = 'http://%s:%s/'%(testNodeAddr[0], sourcePort)
00255             sinkUri = 'http://%s:%s/'%(testNodeAddr[0], sinkPort)
00256             sourceUris[sourceName] = sourceUri
00257             sinks[sinkName] = xmlrpclib.ServerProxy(sinkUri)
00258         return sourceUris, sinks
00259 
00260     def _sink_StartFlows(self, tests, sourceUris, sinks):
00261         """
00262         subroutine to connect the flows specified in the tests, purely
00263         using the sink API.
00264 
00265         In the future it would be nice to verify that the flow truly
00266         exists, but for now trust what the sink says
00267         """
00268         reliable = 1 #don't have UDP yet
00269         for test in tests:
00270             sourceName, sinkName = test[0]
00271             sourceUri = sourceUris[sourceName]
00272             sink = sinks[sinkName]
00273             callerId, sourceLocator, sinkLocator = test[1]
00274             print "Testing", test
00275             val = apiSuccess(sink.sinkConnectFlow(callerId, sourceLocator, sinkLocator, sourceUri, reliable))
00276             assert type(val) == int
00277 
00278     def _sink_KillFlows(self, tests, sinks):
00279         """
00280         subroutine to kill the flows specified in the tests, purely
00281         using the sink API (i.e. source will still be running)
00282 
00283         In the future it would be nice to verify that the flow was
00284         killed, but for now trust what the sink says"""
00285         reliable = 1 #don't have UDP yet
00286         for test in tests:
00287             sourceName, sinkName = test[0]
00288             sink = sinks[sinkName]
00289             callerId, sourceLocator, sinkLocator = test[1]
00290             print "Testing", test
00291             assert 1 == apiSuccess(sink.sinkKillFlow(callerId, sourceLocator, sinkLocator)),\
00292                    "sink did not report 1 flow killed: %s, %s"%(getLastVal(), getLastMsg())
00293     
00294     #sinkConnectFlow(sourceLocator, sinkLocator, sourceXmlRpcURI, reliable)
00295     # * API call on the sink of a flow. Called by master.
00296     # * master requests that sink negotiate with the source to establish the flow
00297     # * returns meaningless int
00298     # * synchronous call, blocks until success/fail
00299 
00300     def testSinkConnectFlow(self):
00301         master = self.master
00302         sourceUris, sinks = self._sink_StartNodes(self.sinkTests)
00303         self._sink_StartFlows(self.sinkTests, sourceUris, sinks)
00304 
00305 # sinkKillFlow(sourceLocator, sinkLocator)
00306 # * called by master
00307     def testSinkKillFlow(self):
00308         slave = self.slave
00309         master = self.master
00310         apiError(slave.sinkKillFlow('', '', ''))
00311         
00312         #startup the standard sink tests
00313         sourceUris, sinks = self._sink_StartNodes(self.sinkTests)
00314         self._sink_StartFlows(self.sinkTests, sourceUris, sinks)
00315         self._sink_KillFlows(self.sinkTests, sinks)        
00316     
00317     def testMisc(self):
00318         slave = self.slave
00319         assert slave is not None, "slave is None"
00320         #getMasterUri
00321         masterUri = apiSuccess(slave.getMasterUri(''))
00322         assert getMasterUri() == masterUri or getMasterUriAlt() == masterUri, masterUri
00323         assert masterUri == apiSuccess(slave.getMasterUri('a.different.id'))
00324         #getPid
00325         pid = apiSuccess(slave.getPid(''))
00326         assert pid == apiSuccess(slave.getPid('a.different.id'))
00327         #callerId must be string
00328         apiError(slave.getPid(0))
00329         apiError(slave.getMasterUri(0))        
00330         try:
00331             slave.shutdown('some.id')
00332         except:
00333             pass
00334         time.sleep(0.1)
00335         try:
00336             code, status, val = slave.getPid('')
00337             assert code < 1, "Slave is still running after shutdown"
00338         except:
00339             pass
00340             


test_rosmaster
Author(s): Ken Conley
autogenerated on Fri Aug 28 2015 12:33:45