$search
00001 # Software License Agreement (BSD License) 00002 # 00003 # Copyright (c) 2008, Willow Garage, Inc. 00004 # All rights reserved. 00005 # 00006 # Redistribution and use in source and binary forms, with or without 00007 # modification, are permitted provided that the following conditions 00008 # are met: 00009 # 00010 # * Redistributions of source code must retain the above copyright 00011 # notice, this list of conditions and the following disclaimer. 00012 # * Redistributions in binary form must reproduce the above 00013 # copyright notice, this list of conditions and the following 00014 # disclaimer in the documentation and/or other materials provided 00015 # with the distribution. 00016 # * Neither the name of Willow Garage, Inc. nor the names of its 00017 # contributors may be used to endorse or promote products derived 00018 # from this software without specific prior written permission. 00019 # 00020 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00021 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00022 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00023 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00024 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00025 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00026 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00027 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00028 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00029 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00030 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00031 # POSSIBILITY OF SUCH DAMAGE. 00032 # 00033 # Revision $Id: testSlave.py 1100 2008-05-29 20:23:54Z sfkwc $ 00034 00035 import os 00036 import sys 00037 import string 00038 import time 00039 import xmlrpclib 00040 00041 import rospy 00042 import roslib.rosenv 00043 import roslib.names 00044 from test_ros.rosclient import * 00045 00046 NODE_INTEGRATION_NAME = "node_integration_test" 00047 00048 00049 _required_subscriptions = 'test_string_in', 'test_primitives_in', 'test_arrays_in', 'test_header_in', 'probe_topic' 00050 00051 # only publishers determine topic type, so we test against their declared spec 00052 _required_publications_map = { 00053 'test_string_out': 'test_ros/TestString', 00054 'test_primitives_out': 'test_ros/TestPrimitives', 00055 'test_arrays_out': 'test_ros/TestArrays', 00056 'test_header_out': 'test_ros/TestHeader', 00057 } 00058 _required_publications = _required_publications_map.keys() 00059 00060 _TCPROS = 'TCPROS' 00061 00062 # NOTE: probe_topic is a unpublished topic that merely exists to test 00063 # APIs that talk about subscriptions (e.g. publisherUpdate) 00064 00065 _name = None 00066 ## set_node_name() must be called prior to the unit test so that the test harness knows its 00067 ## ROS name. 00068 def set_node_name(name): 00069 global _name 00070 _name = name 00071 00072 # Have to try as hard as possible to not use rospy code in testing rospy code, so this is 00073 # a reimplementation of the caller ID spec so that NodeApiTestCase knows its name 00074 ## reimplementation of caller ID spec separately from rospy 00075 def get_caller_id(): 00076 if _name is None: 00077 raise Exception("set_node_name has not been called yet") 00078 ros_ns = os.environ.get(roslib.rosenv.ROS_NAMESPACE, roslib.names.GLOBALNS) 00079 return roslib.names.ns_join(ros_ns, _name) 00080 00081 ## Parent of node API and integration test cases. Performs common state setup 00082 class _NodeTestCase(TestRosClient): 00083 00084 def __init__(self, *args): 00085 super(_NodeTestCase, self).__init__(*args) 00086 00087 self.ns = os.environ.get(roslib.rosenv.ROS_NAMESPACE, roslib.names.GLOBALNS) 00088 self.caller_id = get_caller_id() 00089 00090 # load in name of test node 00091 self.test_node = 'test_node' #default 00092 for arg in sys.argv: 00093 if arg.startswith("--node="): 00094 self.test_node = arg[len("--node="):] 00095 # resolve 00096 self.test_node = roslib.names.ns_join(self.ns, self.test_node) 00097 00098 00099 def setUp(self): 00100 super(_NodeTestCase, self).setUp() 00101 # retrieve handle on node 00102 # give ourselves five seconds for node to appear 00103 import time 00104 timeout_t = 5.0 + time.time() 00105 self.node_api = None 00106 while time.time() < timeout_t and not self.node_api: 00107 code, msg, node_api = self.master.lookupNode(self.caller_id, self.test_node) 00108 if code == 1: 00109 self.node_api = node_api 00110 if not self.node_api: 00111 self.fail("master did not return XML-RPC API for [%s, %s]"%(self.caller_id, self.test_node)) 00112 print "[%s] API = %s"%(self.test_node, self.node_api) 00113 self.assert_(self.node_api.startswith('http')) 00114 self.node = xmlrpclib.ServerProxy(self.node_api) 00115 00116 ## validates a URI as being http(s) 00117 def _checkUri(self, uri): 00118 import urlparse 00119 parsed = urlparse.urlparse(uri) 00120 self.assert_(parsed[0] in ['http', 'https'], 'protocol [%s] in [%s] invalid'%(parsed[0], uri)) 00121 self.assert_(parsed[1], 'host missing [%s]'%uri) 00122 if not sys.version.startswith('2.4'): #check not available on py24 00123 self.assert_(parsed.port, 'port missing/invalid [%s]'%uri) 00124 00125 ## dynamically create the expected topic->type map based on the current name resolution context 00126 def _createTopicTypeMap(self): 00127 new_map = {} 00128 for t in _required_publications_map.iterkeys(): 00129 new_map[rospy.resolve_name(t)] = _required_publications_map[t] 00130 return new_map 00131 00132 ## Expects a single test node to be running with name 'test_node' and subscribed to 'test_string' 00133 class NodeApiTestCase(_NodeTestCase): 00134 00135 ## validate node.getPid(caller_id) 00136 def testGetPid(self): 00137 # test with bad arity 00138 self.apiError(self.node.getPid()) 00139 # test success 00140 pid = self.apiSuccess(self.node.getPid(self.caller_id)) 00141 self.assert_(pid > 0) 00142 00143 ## subroutine for testGetSubscriptions/testGetPublications 00144 def _checkTopics(self, required, actual): 00145 actual = [t for t, _ in actual] 00146 missing = set(required) - set(actual) 00147 self.failIf(len(missing), 'missing required topics: %s'%(','.join(missing))) 00148 00149 ## validate node.getPublications(caller_id) 00150 def testGetPublications(self): 00151 # test with bad arity 00152 self.apiError(self.node.getPublications()) 00153 self.apiError(self.node.getPublications(self.caller_id, 'something extra')) 00154 00155 # test success 00156 self._checkTopics([rospy.resolve_name(t) for t in _required_publications], 00157 self.apiSuccess(self.node.getPublications(self.caller_id))) 00158 ## validate node.getSubscriptions(caller_id) 00159 def testGetSubscriptions(self): 00160 # test with bad arity 00161 self.apiError(self.node.getSubscriptions()) 00162 self.apiError(self.node.getSubscriptions(self.caller_id, 'something extra')) 00163 # test success 00164 self._checkTopics([rospy.resolve_name(t) for t in _required_subscriptions], 00165 self.apiSuccess(self.node.getSubscriptions(self.caller_id))) 00166 00167 ## validate node.paramUpdate(caller_id, key, value) 00168 def testParamUpdate(self): 00169 node = self.node 00170 good_key = roslib.names.ns_join(self.ns, 'good_key') 00171 bad_key = roslib.names.ns_join(self.ns, 'bad_key') 00172 00173 # test bad key 00174 self.apiError(node.paramUpdate(self.caller_id, '', 'bad')) 00175 self.apiError(node.paramUpdate(self.caller_id, 'no_namespace', 'bad')) 00176 # test with bad arity 00177 self.apiError(node.paramUpdate(self.caller_id, bad_key)) 00178 self.apiError(node.paramUpdate(self.caller_id)) 00179 00180 # node is not subscribed to good_key (yet) 00181 self.apiError(node.paramUpdate(self.caller_id, good_key, 'good_value')) 00182 00183 # we can't actually test success cases without forcing node to subscribe 00184 #self.apiSuccess(node.paramUpdate(self.caller_id, good_key, 1)) 00185 #self.apiSuccess(node.paramUpdate(self.caller_id, good_key, True)) 00186 #self.apiSuccess(node.paramUpdate(self.caller_id, good_key, 10.0)) 00187 00188 ## validate node.getUri(caller_id) 00189 def testGetUri(self): 00190 # test bad arity 00191 self.apiError(self.node.getUri(self.caller_id, 'bad')) 00192 self.apiError(self.node.getUri()) 00193 # test success 00194 self._checkUri(self.apiSuccess(self.node.getUri(self.caller_id))) 00195 00196 ## validate node.getName(caller_id) 00197 def testGetName(self): 00198 # test bad arity 00199 self.apiError(self.node.getName(self.caller_id, 'bad')) 00200 self.apiError(self.node.getName()) 00201 # test success 00202 val = self.apiSuccess(self.node.getName(self.caller_id)) 00203 self.assert_(len(val), "empty name") 00204 00205 ## validate node.getMasterUri(caller_id) 00206 def testGetMasterUri(self): 00207 # test bad arity 00208 self.apiError(self.node.getMasterUri(self.caller_id, 'bad')) 00209 self.apiError(self.node.getMasterUri()) 00210 # test success 00211 uri = self.apiSuccess(self.node.getMasterUri(self.caller_id)) 00212 self._checkUri(uri) 00213 self.assertEquals(roslib.rosenv.get_master_uri(), uri) 00214 00215 ## validate node.publisherUpdate(caller_id, topic, uris) 00216 def testPublisherUpdate(self): 00217 node = self.node 00218 probe_topic = roslib.names.ns_join(self.ns, 'probe_topic') 00219 fake_topic = roslib.names.ns_join(self.ns, 'fake_topic') 00220 # test bad arity 00221 self.apiError(node.getBusStats(self.caller_id, 'bad')) 00222 self.apiError(node.getBusStats()) 00223 # test bad args 00224 self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', 'bad')) 00225 self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', 2)) 00226 self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', False)) 00227 self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', ['bad'])) 00228 self.apiError(node.publisherUpdate()) 00229 00230 # test success 00231 # still success even if not actually interested in topic 00232 self.apiSuccess(node.publisherUpdate(self.caller_id, fake_topic, 00233 ['http://localhost:1234', 'http://localhost:5678'])) 00234 self.apiSuccess(node.publisherUpdate(self.caller_id, fake_topic, 00235 [])) 00236 # try it with it the /probe_topic, which will exercise some error branches in the client 00237 self.apiSuccess(node.publisherUpdate(self.caller_id, probe_topic, 00238 ['http://unroutablefakeservice:1234'])) 00239 # give it some time to make sure it's attempted contact 00240 time.sleep(1.0) 00241 # check that it's still there 00242 self.apiSuccess(node.publisherUpdate(self.caller_id, probe_topic, 00243 [])) 00244 00245 def _checkTCPROS(self, protocol_params): 00246 self.assert_(protocol_params, "no protocol params returned") 00247 self.assert_(type(protocol_params) == list, "protocol params must be a list: %s"%protocol_params) 00248 self.assertEquals(3, len(protocol_params), "TCPROS params should have length 3: %s"%protocol_params) 00249 self.assertEquals(protocol_params[0], _TCPROS) 00250 # expect ['TCPROS', 1.2.3.4, 1234] 00251 self.assertEquals(protocol_params[0], _TCPROS) 00252 00253 def testRequestTopic(self): 00254 node = self.node 00255 protocols = [[_TCPROS]] 00256 probe_topic = roslib.names.ns_join(self.ns, 'probe_topic') 00257 fake_topic = roslib.names.ns_join(self.ns, 'fake_topic') 00258 00259 # test bad arity 00260 self.apiError(node.requestTopic(self.caller_id, probe_topic, protocols, 'extra stuff')) 00261 self.apiError(node.requestTopic(self.caller_id, probe_topic)) 00262 self.apiError(node.requestTopic(self.caller_id)) 00263 self.apiError(node.requestTopic()) 00264 # test bad args 00265 self.apiError(node.requestTopic(self.caller_id, 1, protocols)) 00266 self.apiError(node.requestTopic(self.caller_id, '', protocols)) 00267 self.apiError(node.requestTopic(self.caller_id, fake_topic, protocols)) 00268 self.apiError(node.requestTopic(self.caller_id, probe_topic, 'fake-protocols')) 00269 00270 topics = [roslib.names.ns_join(self.ns, t) for t in _required_publications] 00271 # currently only support TCPROS as we require all clients to support this 00272 protocols = [[_TCPROS]] 00273 for topic in topics: 00274 self._checkTCPROS(self.apiSuccess(node.requestTopic(self.caller_id, topic, protocols))) 00275 protocols = [['FakeTransport', 1234, 5678], [_TCPROS], ['AnotherFakeTransport']] 00276 # try each one more time, this time with more protocol choices 00277 for topic in topics: 00278 self._checkTCPROS(self.apiSuccess(node.requestTopic(self.caller_id, topic, protocols))) 00279 00280 def testGetBusInfo(self): 00281 # test bad arity 00282 self.apiError(self.node.getBusInfo(self.caller_id, 'bad')) 00283 self.apiError(self.node.getBusInfo()) 00284 #TODO: finish 00285 00286 def testGetBusStats(self): 00287 # test bad arity 00288 self.apiError(self.node.getBusStats(self.caller_id, 'bad')) 00289 self.apiError(self.node.getBusStats()) 00290 #TODO: finish 00291 00292 ## test the state of the master based on expected node registration 00293 def testRegistrations(self): 00294 # setUp() ensures the node has registered with the master 00295 topics = self.apiSuccess(self.master.getPublishedTopics(self.caller_id, '')) 00296 topic_names = [t for t, type in topics] 00297 required_topic_pubs = [rospy.resolve_name(t) for t in _required_publications] 00298 required_topic_subs = [rospy.resolve_name(t) for t in _required_subscriptions] 00299 self._checkTopics(required_topic_pubs, topics) 00300 00301 # now check types 00302 topicTypeMap = self._createTopicTypeMap() 00303 for topic, type in topics: 00304 if topic in topicTypeMap: 00305 self.assertEquals(type, topicTypeMap[topic], "topic [%s]: type [%s] does not match expected [%s]"%(type, topic, topicTypeMap[topic])) 00306 00307 # now check actual URIs 00308 node_name = self.test_node 00309 systemState = self.apiSuccess(self.master.getSystemState(self.caller_id)) 00310 pubs, subs, srvs = systemState 00311 for topic, list in pubs: 00312 if topic in required_topic_pubs: 00313 self.assert_(node_name in list, "%s not in %s"%(self.node_api, list)) 00314 for topic, list in subs: 00315 if topic in required_topic_subs: 00316 self.assert_(node_name in list, "%s not in %s"%(self.node_api, list)) 00317 for service, list in srvs: 00318 #TODO: no service tests yet 00319 pass 00320 00321 00322 ## Performs end-to-end integration tests of a test_node. NodeIntegrationTestCase 00323 ## itself is a rospy node and thus implicitly depends on rospy functionality. 00324 class NodeIntegrationTestCase(_NodeTestCase): 00325 00326 def __init__(self, *args): 00327 super(NodeIntegrationTestCase, self).__init__(*args) 00328 rospy.init_node(NODE_INTEGRATION_NAME) 00329 00330 def testString(self): 00331 pub = rospy.Publisher('test_string_in', std_msgs.msg.String) 00332 sub = rospy.Subscriber('test_string_in', std_msgs.msg.String) 00333 #TODO: publish a bunch and check sequencing + caller_id 00334 pub.unregister() 00335 sub.unregister() 00336 00337 def testPrimitives(self): 00338 pub = rospy.Publisher('test_primitives_in', std_msgs.msg.String) 00339 sub = rospy.Subscriber('test_primitives_out', std_msgs.msg.String) 00340 #TODO: publish a bunch and check sequencing + caller_id 00341 pub.unregister() 00342 sub.unregister() 00343 00344 def testArrays(self): 00345 pub = rospy.Publisher('test_header_in', std_msgs.msg.String) 00346 sub = rospy.Subscriber('test_header_out', std_msgs.msg.String) 00347 #TODO: publish a bunch and check sequencing + caller_id 00348 pub.unregister() 00349 sub.unregister() 00350 00351 def testHeader(self): 00352 #msg.auto_header = True 00353 pub = rospy.Publisher('test_header_in', std_msgs.msg.String) 00354 sub = rospy.Subscriber('test_header_out', std_msgs.msg.String) 00355 #TODO: publish a bunch and check sequencing + caller_id 00356 pub.unregister() 00357 sub.unregister() 00358 00359