$search
00001 # Software License Agreement (BSD License) 00002 # 00003 # Copyright (c) 2008, Willow Garage, Inc. 00004 # All rights reserved. 00005 # 00006 # Redistribution and use in source and binary forms, with or without 00007 # modification, are permitted provided that the following conditions 00008 # are met: 00009 # 00010 # * Redistributions of source code must retain the above copyright 00011 # notice, this list of conditions and the following disclaimer. 00012 # * Redistributions in binary form must reproduce the above 00013 # copyright notice, this list of conditions and the following 00014 # disclaimer in the documentation and/or other materials provided 00015 # with the distribution. 00016 # * Neither the name of Willow Garage, Inc. nor the names of its 00017 # contributors may be used to endorse or promote products derived 00018 # from this software without specific prior written permission. 00019 # 00020 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00021 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00022 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00023 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00024 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00025 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00026 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00027 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00028 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00029 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00030 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00031 # POSSIBILITY OF SUCH DAMAGE. 00032 # 00033 # Revision $Id$ 00034 00035 import os 00036 import sys 00037 import string 00038 import xmlrpclib 00039 00040 import rospy 00041 from test_ros.rostest import * 00042 00043 singletest = None 00044 #singletest = 'testSourceKillFlow' 00045 00046 #TODO: test single source to multiple sinks 00047 #TODO: test multiple sources to single sink 00048 00049 # test_string_in 00050 # test_string_out 00051 00052 # test_primitives_in 00053 # test_primitives_out 00054 00055 # test_arrays_in 00056 # test_arrays_out 00057 00058 # test_header_in 00059 # test_header_out 00060 00061 _required_publications = 'test_string_out', 'test_primitives_out', 'test_arrays_out', 'test_header_out' 00062 _required_subscriptions = 'test_string_in', 'test_primitives_in', 'test_arrays_in', 'test_header_in' 00063 00064 ## Expects a single test node to be running with name 'test_node' and subscribed to 'test_string' 00065 class SlaveTestCase(TestRosClient): 00066 00067 def setUp(self): 00068 super(SlaveTestCase, self).setUp() 00069 # retrieve handle on node 00070 self.caller_id = rospy.get_caller_id() 00071 self.node_api = self.apiSuccess(self.master.lookupNode(self.caller_id, 'test_node')) 00072 self.assert_(self.node_api.startswith('http')) 00073 self.node = xmlrpclib.ServerProxy(self.node_api) 00074 00075 def testGetPid(self): 00076 pid = self.apiSuccess(self.node.getPid(self.caller_id)) 00077 self.assert_(pid > 0) 00078 00079 def testGetPublications(self): 00080 publications = self.apiSuccess(self.node.getPublications(self.caller_id)) 00081 self.assert_(publications is not None) 00082 expected = [rospy.resolve_name(t) for t in _required_publications] 00083 missing = set(expected) - set(publications) 00084 self.failIf(len(missing), 'missing required topics: %s'%(','.join(missing))) 00085 00086 def _subTestSourceRequestFlow(self, testName, protocols, testEval): 00087 master = self.master 00088 tests = [ 00089 [['testSourceRequestFlow-%s-nodeA','testSourceRequestFlow-%s-nodeB',], 00090 ['node', 'testSourceRequestFlow-%s-nodeA.out', 'testSourceRequestFlow-%s-nodeB.in']], 00091 [['g1.testSourceRequestFlow-%s-nodeA','g1.testSourceRequestFlow-%s-nodeB',], 00092 ['g1.node', 'testSourceRequestFlow-%s-nodeA.out', 'testSourceRequestFlow-%s-nodeB.in']], 00093 [['g1.g2.g3.testSourceRequestFlow-%s-nodeA','g1.g2.testSourceRequestFlow-%s-nodeB',], 00094 ['g1.g2.node', 'g3.testSourceRequestFlow-%s-nodeA.out', 'testSourceRequestFlow-%s-nodeB.in']], 00095 [['g1.g2.testSourceRequestFlow-%s-nodeA','g1.g2.g3.testSourceRequestFlow-%s-nodeB',], 00096 ['g1.g2.node', 'testSourceRequestFlow-%s-nodeA.out', 'g3.testSourceRequestFlow-%s-nodeB.in']], 00097 ] 00098 sources = {} 00099 #start the nodes 00100 # - save the source as we will be making calls on it 00101 pkg, node = testNode 00102 for test in tests: 00103 sourceName, sinkName = [val%testName for val in test[0]] 00104 port = apiSuccess(master.addNode('', '', sourceName, pkg, node, TEST_MACHINE, 0)) 00105 apiSuccess(master.addNode('', '', sinkName, pkg, node, TEST_MACHINE, 0)) 00106 sourceUri = 'http://%s:%s/'%(testNodeAddr[0], port) 00107 sources[sourceName] = xmlrpclib.ServerProxy(sourceUri) 00108 00109 for test in tests: 00110 sourceName, sinkName = [val%testName for val in test[0]] 00111 source = sources[sourceName] 00112 callerId = test[1][0] 00113 sourceLocator, sinkLocator = [val%testName for val in test[1][1:]] 00114 args = source.sourceRequestFlow(callerId, sourceLocator, sinkLocator, protocols) 00115 testEval(args) 00116 #TODO: test locator name resolution 00117 00118 def testSourceRequestFlow_TCPROS1(self): 00119 def testEval(args): 00120 protocol = apiSuccess(args) 00121 assert type(protocol) == list 00122 assert string.upper(protocol[0]) == 'TCPROS', "source should have returned TCPROS as the desired protocol" 00123 assert len(protocol) == 3, "TCPROS parameter spec should be length 3" 00124 protocols = [['TCPROS']] 00125 self._subTestSourceRequestFlow('TCPROS1', protocols, testEval) 00126 00127 def testSourceRequestFlow_TCPROS2(self): 00128 def testEval(args): 00129 protocol = apiSuccess(args) 00130 assert type(protocol) == list 00131 assert string.upper(protocol[0]) == 'TCPROS', "source should have returned TCPROS as the desired protocol" 00132 assert len(protocol) == 3, "TCPROS parameter spec should be length 3" 00133 protocols = [['Fake1', 123, 132], ['Fake2', 1.0], ['Fake3'], ['Fake4', 'string'], ['Fake5', ['a', 'list'], ['a', 'nother', 'list']], ['TCPROS'], ['Fakelast', 'fake'] ] 00134 self._subTestSourceRequestFlow('TCPROS2', protocols, testEval) 00135 00136 def testSourceRequestFlow_Fail(self): 00137 protocols = [['Fake1', 123, 132], ['Fake2', 1.0], ['Fake3'], ['Fake4', 'string'], ['Fake5', ['a', 'list'], ['a', 'nother', 'list']], ['Fakelast', 'fake'] ] 00138 self._subTestSourceRequestFlow('Fail', protocols, apiFail) 00139 00140 def testSourceRequestFlow_Errors(self): 00141 slave = self.slave 00142 master = self.master 00143 #test that malformed locators return error codes 00144 apiError(slave.sourceRequestFlow('', '', '')) 00145 apiError(slave.sourceRequestFlow('', 'good.locator', 'badlocator')) 00146 apiError(slave.sourceRequestFlow('', 'badlocator', 'good.locator')) 00147 00148 # sourceKillFlow(callerId, sourceLocator, sinkLocator) 00149 # 00150 # * called by master 00151 # * returns int 00152 00153 def testSourceKillFlow(self): 00154 slave = self.slave 00155 master = self.master 00156 #test that malformed locators return error codes 00157 apiError(slave.sourceKillFlow('', '', '')) 00158 apiError(slave.sourceKillFlow('', 'good.locator', 'badlocator')) 00159 apiError(slave.sourceKillFlow('', 'badlocator', 'good.locator')) 00160 00161 tests = [ 00162 [['testSourceKillFlow-nodeA','testSourceKillFlow-nodeB',], 00163 ['node', 'testSourceKillFlow-nodeA.out', 'testSourceKillFlow-nodeB.in']], 00164 [['g1.testSourceKillFlow-nodeA','g1.testSourceKillFlow-nodeB',], 00165 ['g1.node', 'testSourceKillFlow-nodeA.out', 'testSourceKillFlow-nodeB.in']], 00166 [['g1.g2.g3.testSourceKillFlow-nodeA','g1.g2.g3.testSourceKillFlow-nodeB',], 00167 ['g1.g2.node', 'g3.testSourceKillFlow-nodeA.out', 'g3.testSourceKillFlow-nodeB.in']], 00168 [['g1.g2.testSourceKillFlow-nodeA','g1.g2.testSourceKillFlow-nodeB',], 00169 ['g1.g2.node', 'testSourceKillFlow-nodeA.out', 'testSourceKillFlow-nodeB.in']], 00170 [['a1.g2.g3.testSourceKillFlow-nodeA','a1.g2.testSourceKillFlow-nodeB',], 00171 ['a1.node', 'g2.g3.testSourceKillFlow-nodeA.out', 'g2.testSourceKillFlow-nodeB.in']], 00172 [['a1.g2.testSourceKillFlow-nodeA','a1.g2.g3.testSourceKillFlow-nodeB',], 00173 ['a1.node', 'g2.testSourceKillFlow-nodeA.out', 'g2.g3.testSourceKillFlow-nodeB.in']], 00174 00175 ] 00176 sources = {} 00177 #start the flows 00178 # - save the source as we will be making calls on it 00179 pkg, node = testNode 00180 for test in tests: 00181 sourceName, sinkName = test[0] 00182 # - start the source/sink nodes 00183 port = apiSuccess(master.addNode('', '', sourceName, pkg, node, TEST_MACHINE, 0)) 00184 apiSuccess(master.addNode('', '', sinkName, pkg, node, TEST_MACHINE, 0)) 00185 sourceUri = 'http://%s:%s/'%(testNodeAddr[0], port) 00186 sources[sourceName] = xmlrpclib.ServerProxy(sourceUri) 00187 # - start the flow 00188 callerId, sourceLocator, sinkLocator = test[1] 00189 apiSuccess(master.connectFlow(callerId, sourceLocator, sinkLocator, 1)) 00190 00191 #attempt to kill flow with 1 wrong endpoint 00192 for test in tests: 00193 source = sources[test[0][0]] 00194 callerId, sourceLocator, sinkLocator = test[1] 00195 #sourceKill flow does a silent succeed if there is no flow to sink, but returns 00196 #0 flows killed 00197 val = apiSuccess(source.sourceKillFlow(callerId, sourceLocator, 'fake.sink')) 00198 assert val == 0, "flowsKilled should be 0 for non-existent flow [fakesink]" 00199 #sourceKill flow fails if source param does not refer to it 00200 apiError(source.sourceKillFlow(callerId, 'fake.source', sinkLocator)) 00201 00202 #attempt to kill flow with wrong context 00203 for test in tests: 00204 source = sources[test[0][0]] 00205 callerId, sourceL, sinkL = test[1] 00206 for sub_test in tests: 00207 sub_callerId, sub_source, sub_sink = test[1] 00208 if sub_callerId == callerId and sub_source == sourceL and sub_sink == sinkL: 00209 continue 00210 val = apiSuccess(source.sourceKillFlow(sub_callerId, sub_source, sub_sink)) 00211 assert val == 0, "flowsKilled should be 0 for non-existent flow [different context: %s,%s,%s on %s,%s,%s]"%(sub_callerId, sub_source, sub_sink, callerId, sourceL, sinkL) 00212 00213 # try to kill all flows on the source that we started 00214 for test in tests: 00215 source = sources[test[0][0]] 00216 callerId, sourceLocator, sinkLocator = test[1] 00217 val = apiSuccess(source.sourceKillFlow(callerId, sourceLocator, sinkLocator)) 00218 assert type(val) == int 00219 assert val == 1, "Source did not report 1 flow killed: %s, %s"%(val, getLastMsg()) 00220 #TODO: test locator name resolution 00221 00222 sinkTests = [ 00223 [['testSinkConnectFlow-nodeA','testSinkConnectFlow-nodeB',], 00224 ['node', 'testSinkConnectFlow-nodeA.out', 'testSinkConnectFlow-nodeB.in']], 00225 [['g1.testSinkConnectFlow-nodeA','g1.testSinkConnectFlow-nodeB',], 00226 ['g1.node', 'testSinkConnectFlow-nodeA.out', 'testSinkConnectFlow-nodeB.in']], 00227 [['g1.g2.g3.testSinkConnectFlow-nodeA','g1.g2.testSinkConnectFlow-nodeB',], 00228 ['g1.g2.node', 'g3.testSinkConnectFlow-nodeA.out', 'testSinkConnectFlow-nodeB.in']], 00229 [['g1.g2.testSinkConnectFlow-nodeA','g1.g2.g3.testSinkConnectFlow-nodeB',], 00230 ['g1.g2.node', 'testSinkConnectFlow-nodeA.out', 'g3.testSinkConnectFlow-nodeB.in']], 00231 #separate subgraphs 00232 [['a1.a2.testSinkConnectFlow-nodeA','b1.b2.testSinkConnectFlow-nodeB',], 00233 ['node', 'a1.a2.testSinkConnectFlow-nodeA.out', 'b1.b2.testSinkConnectFlow-nodeB.in']], 00234 # '.locator' resolution 00235 [['c1.node.testSinkConnectFlow-nodeA','c1.node2.testSinkConnectFlow-nodeB',], 00236 ['c1.node2', 'node.testSinkConnectFlow-nodeA.out', '.testSinkConnectFlow-nodeB.in']], 00237 00238 ] 00239 00240 def _sink_StartNodes(self, tests): 00241 """ 00242 Test subroutine to startup all the nodes specified in the tests 00243 """ 00244 master = self.master 00245 sourceUris = {} 00246 sinks = {} 00247 #start the nodes 00248 # - save the sink as we will be making calls on it 00249 pkg, node = testNode 00250 for test in tests: 00251 sourceName, sinkName = test[0] 00252 sourcePort = apiSuccess(master.addNode('', '', sourceName, pkg, node, TEST_MACHINE, 0)) 00253 sinkPort = apiSuccess(master.addNode('', '', sinkName, pkg, node, TEST_MACHINE, 0)) 00254 sourceUri = 'http://%s:%s/'%(testNodeAddr[0], sourcePort) 00255 sinkUri = 'http://%s:%s/'%(testNodeAddr[0], sinkPort) 00256 sourceUris[sourceName] = sourceUri 00257 sinks[sinkName] = xmlrpclib.ServerProxy(sinkUri) 00258 return sourceUris, sinks 00259 00260 def _sink_StartFlows(self, tests, sourceUris, sinks): 00261 """ 00262 subroutine to connect the flows specified in the tests, purely 00263 using the sink API. 00264 00265 In the future it would be nice to verify that the flow truly 00266 exists, but for now trust what the sink says 00267 """ 00268 reliable = 1 #don't have UDP yet 00269 for test in tests: 00270 sourceName, sinkName = test[0] 00271 sourceUri = sourceUris[sourceName] 00272 sink = sinks[sinkName] 00273 callerId, sourceLocator, sinkLocator = test[1] 00274 print "Testing", test 00275 val = apiSuccess(sink.sinkConnectFlow(callerId, sourceLocator, sinkLocator, sourceUri, reliable)) 00276 assert type(val) == int 00277 00278 def _sink_KillFlows(self, tests, sinks): 00279 """ 00280 subroutine to kill the flows specified in the tests, purely 00281 using the sink API (i.e. source will still be running) 00282 00283 In the future it would be nice to verify that the flow was 00284 killed, but for now trust what the sink says""" 00285 reliable = 1 #don't have UDP yet 00286 for test in tests: 00287 sourceName, sinkName = test[0] 00288 sink = sinks[sinkName] 00289 callerId, sourceLocator, sinkLocator = test[1] 00290 print "Testing", test 00291 assert 1 == apiSuccess(sink.sinkKillFlow(callerId, sourceLocator, sinkLocator)),\ 00292 "sink did not report 1 flow killed: %s, %s"%(getLastVal(), getLastMsg()) 00293 00294 #sinkConnectFlow(sourceLocator, sinkLocator, sourceXmlRpcURI, reliable) 00295 # * API call on the sink of a flow. Called by master. 00296 # * master requests that sink negotiate with the source to establish the flow 00297 # * returns meaningless int 00298 # * synchronous call, blocks until success/fail 00299 00300 def testSinkConnectFlow(self): 00301 master = self.master 00302 sourceUris, sinks = self._sink_StartNodes(self.sinkTests) 00303 self._sink_StartFlows(self.sinkTests, sourceUris, sinks) 00304 00305 # sinkKillFlow(sourceLocator, sinkLocator) 00306 # * called by master 00307 def testSinkKillFlow(self): 00308 slave = self.slave 00309 master = self.master 00310 apiError(slave.sinkKillFlow('', '', '')) 00311 00312 #startup the standard sink tests 00313 sourceUris, sinks = self._sink_StartNodes(self.sinkTests) 00314 self._sink_StartFlows(self.sinkTests, sourceUris, sinks) 00315 self._sink_KillFlows(self.sinkTests, sinks) 00316 00317 def testMisc(self): 00318 slave = self.slave 00319 assert slave is not None, "slave is None" 00320 #getMasterUri 00321 masterUri = apiSuccess(slave.getMasterUri('')) 00322 assert getMasterUri() == masterUri or getMasterUriAlt() == masterUri, masterUri 00323 assert masterUri == apiSuccess(slave.getMasterUri('a.different.id')) 00324 #getPid 00325 pid = apiSuccess(slave.getPid('')) 00326 assert pid == apiSuccess(slave.getPid('a.different.id')) 00327 #callerId must be string 00328 apiError(slave.getPid(0)) 00329 apiError(slave.getMasterUri(0)) 00330 try: 00331 slave.shutdown('some.id') 00332 except: 00333 pass 00334 time.sleep(0.1) 00335 try: 00336 code, status, val = slave.getPid('') 00337 assert code < 1, "Slave is still running after shutdown" 00338 except: 00339 pass 00340