testSlave.py
Go to the documentation of this file.
00001 # Software License Agreement (BSD License)
00002 #
00003 # Copyright (c) 2008, Willow Garage, Inc.
00004 # All rights reserved.
00005 #
00006 # Redistribution and use in source and binary forms, with or without
00007 # modification, are permitted provided that the following conditions
00008 # are met:
00009 #
00010 #  * Redistributions of source code must retain the above copyright
00011 #    notice, this list of conditions and the following disclaimer.
00012 #  * Redistributions in binary form must reproduce the above
00013 #    copyright notice, this list of conditions and the following
00014 #    disclaimer in the documentation and/or other materials provided
00015 #    with the distribution.
00016 #  * Neither the name of Willow Garage, Inc. nor the names of its
00017 #    contributors may be used to endorse or promote products derived
00018 #    from this software without specific prior written permission.
00019 #
00020 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00021 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00022 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00023 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00024 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00025 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00026 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00027 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00028 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00029 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00030 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00031 # POSSIBILITY OF SUCH DAMAGE.
00032 #
00033 # Revision $Id$
00034 
00035 import os
00036 import sys
00037 import string
00038 try:
00039     from xmlrpc.client import ServerProxy
00040 except ImportError:
00041     from xmlrpclib import ServerProxy
00042 
00043 import rospy
00044 from rostest import *
00045 
00046 singletest = None
00047 #singletest = 'testSourceKillFlow'
00048 
00049 #TODO: test single source to multiple sinks
00050 #TODO: test multiple sources to single sink
00051 
00052 # test_string_in
00053 # test_string_out
00054 
00055 # test_primitives_in
00056 # test_primitives_out
00057 
00058 # test_arrays_in
00059 # test_arrays_out
00060 
00061 # test_header_in
00062 # test_header_out
00063 
00064 _required_publications  = 'test_string_out', 'test_primitives_out', 'test_arrays_out', 'test_header_out'
00065 _required_subscriptions = 'test_string_in', 'test_primitives_in', 'test_arrays_in', 'test_header_in'
00066 
00067 ## Expects a single test node to be running with name 'test_node' and subscribed to 'test_string'
00068 class SlaveTestCase(TestRosClient):
00069 
00070     def setUp(self):
00071         super(SlaveTestCase, self).setUp()
00072         # retrieve handle on node
00073         self.caller_id = rospy.get_caller_id()
00074         self.node_api = self.apiSuccess(self.master.lookupNode(self.caller_id, 'test_node'))
00075         self.assert_(self.node_api.startswith('http'))
00076         self.node = ServerProxy(self.node_api)
00077         
00078     def testGetPid(self):
00079         pid = self.apiSuccess(self.node.getPid(self.caller_id))
00080         self.assert_(pid > 0)
00081         
00082     def testGetPublications(self):
00083         publications = self.apiSuccess(self.node.getPublications(self.caller_id))
00084         self.assert_(publications is not None)
00085         expected = [rospy.resolve_name(t) for t in _required_publications]
00086         missing = set(expected) - set(publications) 
00087         self.failIf(len(missing), 'missing required topics: %s'%(','.join(missing)))
00088 
00089     def _subTestSourceRequestFlow(self, testName, protocols, testEval):
00090         master = self.master
00091         tests = [
00092             [['testSourceRequestFlow-%s-nodeA','testSourceRequestFlow-%s-nodeB',],
00093              ['node', 'testSourceRequestFlow-%s-nodeA.out', 'testSourceRequestFlow-%s-nodeB.in']],
00094             [['g1.testSourceRequestFlow-%s-nodeA','g1.testSourceRequestFlow-%s-nodeB',],
00095              ['g1.node', 'testSourceRequestFlow-%s-nodeA.out', 'testSourceRequestFlow-%s-nodeB.in']],
00096             [['g1.g2.g3.testSourceRequestFlow-%s-nodeA','g1.g2.testSourceRequestFlow-%s-nodeB',],
00097              ['g1.g2.node', 'g3.testSourceRequestFlow-%s-nodeA.out', 'testSourceRequestFlow-%s-nodeB.in']],
00098             [['g1.g2.testSourceRequestFlow-%s-nodeA','g1.g2.g3.testSourceRequestFlow-%s-nodeB',],
00099              ['g1.g2.node', 'testSourceRequestFlow-%s-nodeA.out', 'g3.testSourceRequestFlow-%s-nodeB.in']],
00100             ]
00101         sources = {}
00102         #start the nodes
00103         # - save the source as we will be making calls on it
00104         pkg, node = testNode
00105         for test in tests:
00106             sourceName, sinkName = [val%testName for val in test[0]]
00107             port = apiSuccess(master.addNode('', '', sourceName, pkg, node, TEST_MACHINE, 0))
00108             apiSuccess(master.addNode('', '', sinkName, pkg, node, TEST_MACHINE, 0))
00109             sourceUri = 'http://%s:%s/'%(testNodeAddr[0], port)
00110             sources[sourceName] = ServerProxy(sourceUri)
00111 
00112         for test in tests:
00113             sourceName, sinkName = [val%testName for val in test[0]]
00114             source = sources[sourceName]
00115             callerId = test[1][0]
00116             sourceLocator, sinkLocator = [val%testName for val in test[1][1:]]
00117             args = source.sourceRequestFlow(callerId, sourceLocator, sinkLocator, protocols)
00118             testEval(args)
00119         #TODO: test locator name resolution            
00120     
00121     def testSourceRequestFlow_TCPROS1(self):
00122         def testEval(args):
00123             protocol = apiSuccess(args)
00124             assert type(protocol) == list
00125             assert string.upper(protocol[0]) == 'TCPROS', "source should have returned TCPROS as the desired protocol"
00126             assert len(protocol) == 3, "TCPROS parameter spec should be length 3"
00127         protocols = [['TCPROS']]
00128         self._subTestSourceRequestFlow('TCPROS1', protocols, testEval)
00129 
00130     def testSourceRequestFlow_TCPROS2(self):
00131         def testEval(args):
00132             protocol = apiSuccess(args)
00133             assert type(protocol) == list            
00134             assert string.upper(protocol[0]) == 'TCPROS', "source should have returned TCPROS as the desired protocol"
00135             assert len(protocol) == 3, "TCPROS parameter spec should be length 3"
00136         protocols = [['Fake1', 123, 132], ['Fake2', 1.0], ['Fake3'], ['Fake4', 'string'], ['Fake5', ['a', 'list'], ['a', 'nother', 'list']], ['TCPROS'], ['Fakelast', 'fake'] ]
00137         self._subTestSourceRequestFlow('TCPROS2', protocols, testEval)
00138 
00139     def testSourceRequestFlow_Fail(self):
00140         protocols = [['Fake1', 123, 132], ['Fake2', 1.0], ['Fake3'], ['Fake4', 'string'], ['Fake5', ['a', 'list'], ['a', 'nother', 'list']], ['Fakelast', 'fake'] ]
00141         self._subTestSourceRequestFlow('Fail', protocols, apiFail)
00142 
00143     def testSourceRequestFlow_Errors(self):
00144         slave = self.slave
00145         master = self.master
00146         #test that malformed locators return error codes
00147         apiError(slave.sourceRequestFlow('', '', ''))
00148         apiError(slave.sourceRequestFlow('', 'good.locator', 'badlocator'))
00149         apiError(slave.sourceRequestFlow('', 'badlocator', 'good.locator'))
00150                 
00151 # sourceKillFlow(callerId, sourceLocator, sinkLocator)
00152 #
00153 # * called by master
00154 # * returns int
00155 
00156     def testSourceKillFlow(self):
00157         slave = self.slave
00158         master = self.master
00159         #test that malformed locators return error codes
00160         apiError(slave.sourceKillFlow('', '', ''))
00161         apiError(slave.sourceKillFlow('', 'good.locator', 'badlocator'))
00162         apiError(slave.sourceKillFlow('', 'badlocator', 'good.locator'))
00163         
00164         tests = [
00165             [['testSourceKillFlow-nodeA','testSourceKillFlow-nodeB',],
00166              ['node', 'testSourceKillFlow-nodeA.out', 'testSourceKillFlow-nodeB.in']],
00167             [['g1.testSourceKillFlow-nodeA','g1.testSourceKillFlow-nodeB',],
00168              ['g1.node', 'testSourceKillFlow-nodeA.out', 'testSourceKillFlow-nodeB.in']],
00169             [['g1.g2.g3.testSourceKillFlow-nodeA','g1.g2.g3.testSourceKillFlow-nodeB',],
00170              ['g1.g2.node', 'g3.testSourceKillFlow-nodeA.out', 'g3.testSourceKillFlow-nodeB.in']],
00171             [['g1.g2.testSourceKillFlow-nodeA','g1.g2.testSourceKillFlow-nodeB',],
00172              ['g1.g2.node', 'testSourceKillFlow-nodeA.out', 'testSourceKillFlow-nodeB.in']],
00173             [['a1.g2.g3.testSourceKillFlow-nodeA','a1.g2.testSourceKillFlow-nodeB',],
00174              ['a1.node', 'g2.g3.testSourceKillFlow-nodeA.out', 'g2.testSourceKillFlow-nodeB.in']],
00175             [['a1.g2.testSourceKillFlow-nodeA','a1.g2.g3.testSourceKillFlow-nodeB',],
00176              ['a1.node', 'g2.testSourceKillFlow-nodeA.out', 'g2.g3.testSourceKillFlow-nodeB.in']],
00177 
00178             ]
00179         sources = {}
00180         #start the flows
00181         # - save the source as we will be making calls on it
00182         pkg, node = testNode
00183         for test in tests:
00184             sourceName, sinkName = test[0]
00185             # - start the source/sink nodes
00186             port = apiSuccess(master.addNode('', '', sourceName, pkg, node, TEST_MACHINE, 0))
00187             apiSuccess(master.addNode('', '', sinkName, pkg, node, TEST_MACHINE, 0))
00188             sourceUri = 'http://%s:%s/'%(testNodeAddr[0], port)
00189             sources[sourceName] = ServerProxy(sourceUri)
00190             # - start the flow
00191             callerId, sourceLocator, sinkLocator = test[1]
00192             apiSuccess(master.connectFlow(callerId, sourceLocator, sinkLocator, 1))
00193             
00194         #attempt to kill flow with 1 wrong endpoint
00195         for test in tests:
00196             source = sources[test[0][0]]
00197             callerId, sourceLocator, sinkLocator = test[1]
00198             #sourceKill flow does a silent succeed if there is no flow to sink, but returns
00199             #0 flows killed
00200             val = apiSuccess(source.sourceKillFlow(callerId, sourceLocator, 'fake.sink'))
00201             assert val == 0, "flowsKilled should be 0 for non-existent flow [fakesink]"
00202             #sourceKill flow fails if source param does not refer to it
00203             apiError(source.sourceKillFlow(callerId, 'fake.source', sinkLocator))
00204             
00205         #attempt to kill flow with wrong context
00206         for test in tests:
00207             source = sources[test[0][0]]
00208             callerId, sourceL, sinkL = test[1]
00209             for sub_test in tests:
00210                 sub_callerId, sub_source, sub_sink = test[1]
00211                 if sub_callerId == callerId and sub_source == sourceL and sub_sink == sinkL:
00212                     continue
00213                 val = apiSuccess(source.sourceKillFlow(sub_callerId, sub_source, sub_sink))
00214                 assert val == 0, "flowsKilled should be 0 for non-existent flow [different context: %s,%s,%s on %s,%s,%s]"%(sub_callerId, sub_source, sub_sink, callerId, sourceL, sinkL)
00215                 
00216         # try to kill all flows on the source that we started
00217         for test in tests:
00218             source = sources[test[0][0]]
00219             callerId, sourceLocator, sinkLocator = test[1]
00220             val = apiSuccess(source.sourceKillFlow(callerId, sourceLocator, sinkLocator))
00221             assert type(val) == int
00222             assert val == 1, "Source did not report 1 flow killed: %s, %s"%(val, getLastMsg())
00223         #TODO: test locator name resolution            
00224     
00225     sinkTests = [
00226         [['testSinkConnectFlow-nodeA','testSinkConnectFlow-nodeB',],
00227          ['node', 'testSinkConnectFlow-nodeA.out', 'testSinkConnectFlow-nodeB.in']],
00228         [['g1.testSinkConnectFlow-nodeA','g1.testSinkConnectFlow-nodeB',],
00229          ['g1.node', 'testSinkConnectFlow-nodeA.out', 'testSinkConnectFlow-nodeB.in']],
00230         [['g1.g2.g3.testSinkConnectFlow-nodeA','g1.g2.testSinkConnectFlow-nodeB',],
00231          ['g1.g2.node', 'g3.testSinkConnectFlow-nodeA.out', 'testSinkConnectFlow-nodeB.in']],
00232         [['g1.g2.testSinkConnectFlow-nodeA','g1.g2.g3.testSinkConnectFlow-nodeB',],
00233          ['g1.g2.node', 'testSinkConnectFlow-nodeA.out', 'g3.testSinkConnectFlow-nodeB.in']],
00234         #separate subgraphs
00235         [['a1.a2.testSinkConnectFlow-nodeA','b1.b2.testSinkConnectFlow-nodeB',],
00236          ['node', 'a1.a2.testSinkConnectFlow-nodeA.out', 'b1.b2.testSinkConnectFlow-nodeB.in']],
00237         # '.locator' resolution
00238         [['c1.node.testSinkConnectFlow-nodeA','c1.node2.testSinkConnectFlow-nodeB',],
00239          ['c1.node2', 'node.testSinkConnectFlow-nodeA.out', '.testSinkConnectFlow-nodeB.in']],
00240 
00241         ]
00242 
00243     def _sink_StartNodes(self, tests):
00244         """
00245         Test subroutine to startup all the nodes specified in the tests
00246         """
00247         master = self.master
00248         sourceUris = {}
00249         sinks = {}
00250         #start the nodes
00251         # - save the sink as we will be making calls on it
00252         pkg, node = testNode
00253         for test in tests:
00254             sourceName, sinkName = test[0]
00255             sourcePort = apiSuccess(master.addNode('', '', sourceName, pkg, node, TEST_MACHINE, 0))
00256             sinkPort = apiSuccess(master.addNode('', '', sinkName, pkg, node, TEST_MACHINE, 0))
00257             sourceUri = 'http://%s:%s/'%(testNodeAddr[0], sourcePort)
00258             sinkUri = 'http://%s:%s/'%(testNodeAddr[0], sinkPort)
00259             sourceUris[sourceName] = sourceUri
00260             sinks[sinkName] = ServerProxy(sinkUri)
00261         return sourceUris, sinks
00262 
00263     def _sink_StartFlows(self, tests, sourceUris, sinks):
00264         """
00265         subroutine to connect the flows specified in the tests, purely
00266         using the sink API.
00267 
00268         In the future it would be nice to verify that the flow truly
00269         exists, but for now trust what the sink says
00270         """
00271         reliable = 1 #don't have UDP yet
00272         for test in tests:
00273             sourceName, sinkName = test[0]
00274             sourceUri = sourceUris[sourceName]
00275             sink = sinks[sinkName]
00276             callerId, sourceLocator, sinkLocator = test[1]
00277             print "Testing", test
00278             val = apiSuccess(sink.sinkConnectFlow(callerId, sourceLocator, sinkLocator, sourceUri, reliable))
00279             assert type(val) == int
00280 
00281     def _sink_KillFlows(self, tests, sinks):
00282         """
00283         subroutine to kill the flows specified in the tests, purely
00284         using the sink API (i.e. source will still be running)
00285 
00286         In the future it would be nice to verify that the flow was
00287         killed, but for now trust what the sink says"""
00288         reliable = 1 #don't have UDP yet
00289         for test in tests:
00290             sourceName, sinkName = test[0]
00291             sink = sinks[sinkName]
00292             callerId, sourceLocator, sinkLocator = test[1]
00293             print "Testing", test
00294             assert 1 == apiSuccess(sink.sinkKillFlow(callerId, sourceLocator, sinkLocator)),\
00295                    "sink did not report 1 flow killed: %s, %s"%(getLastVal(), getLastMsg())
00296     
00297     #sinkConnectFlow(sourceLocator, sinkLocator, sourceXmlRpcURI, reliable)
00298     # * API call on the sink of a flow. Called by master.
00299     # * master requests that sink negotiate with the source to establish the flow
00300     # * returns meaningless int
00301     # * synchronous call, blocks until success/fail
00302 
00303     def testSinkConnectFlow(self):
00304         master = self.master
00305         sourceUris, sinks = self._sink_StartNodes(self.sinkTests)
00306         self._sink_StartFlows(self.sinkTests, sourceUris, sinks)
00307 
00308 # sinkKillFlow(sourceLocator, sinkLocator)
00309 # * called by master
00310     def testSinkKillFlow(self):
00311         slave = self.slave
00312         master = self.master
00313         apiError(slave.sinkKillFlow('', '', ''))
00314         
00315         #startup the standard sink tests
00316         sourceUris, sinks = self._sink_StartNodes(self.sinkTests)
00317         self._sink_StartFlows(self.sinkTests, sourceUris, sinks)
00318         self._sink_KillFlows(self.sinkTests, sinks)        
00319     
00320     def testMisc(self):
00321         slave = self.slave
00322         assert slave is not None, "slave is None"
00323         #getMasterUri
00324         masterUri = apiSuccess(slave.getMasterUri(''))
00325         assert getMasterUri() == masterUri or getMasterUriAlt() == masterUri, masterUri
00326         assert masterUri == apiSuccess(slave.getMasterUri('a.different.id'))
00327         #getPid
00328         pid = apiSuccess(slave.getPid(''))
00329         assert pid == apiSuccess(slave.getPid('a.different.id'))
00330         #callerId must be string
00331         apiError(slave.getPid(0))
00332         apiError(slave.getMasterUri(0))        
00333         try:
00334             slave.shutdown('some.id')
00335         except:
00336             pass
00337         time.sleep(0.1)
00338         try:
00339             code, status, val = slave.getPid('')
00340             assert code < 1, "Slave is still running after shutdown"
00341         except:
00342             pass
00343             


test_rosmaster
Author(s): Ken Conley
autogenerated on Tue Mar 7 2017 03:45:38