$search
00001 #!/usr/bin/env python 00002 # Software License Agreement (BSD License) 00003 # 00004 # Copyright (c) 2008, Willow Garage, Inc. 00005 # All rights reserved. 00006 # 00007 # Redistribution and use in source and binary forms, with or without 00008 # modification, are permitted provided that the following conditions 00009 # are met: 00010 # 00011 # * Redistributions of source code must retain the above copyright 00012 # notice, this list of conditions and the following disclaimer. 00013 # * Redistributions in binary form must reproduce the above 00014 # copyright notice, this list of conditions and the following 00015 # disclaimer in the documentation and/or other materials provided 00016 # with the distribution. 00017 # * Neither the name of Willow Garage, Inc. nor the names of its 00018 # contributors may be used to endorse or promote products derived 00019 # from this software without specific prior written permission. 00020 # 00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 # POSSIBILITY OF SUCH DAMAGE. 00033 # 00034 # Revision $Id: testSlave.py 1100 2008-05-29 20:23:54Z sfkwc $ 00035 00036 # This is a rewrite of the old node API tests, which focus too much on 00037 # a completed node API and don't facilitate easy bring up of a new 00038 # client library. 00039 00040 import roslib; roslib.load_manifest('test_ros') 00041 00042 import os 00043 import sys 00044 import string 00045 import time 00046 import rosgraph.masterapi 00047 import rostest 00048 import unittest 00049 import xmlrpclib 00050 00051 import roslib.rosenv 00052 import roslib.names 00053 00054 TCPROS = 'TCPROS' 00055 00056 CALLER_ID = '/test_harness' 00057 TEST_NODE_NAME = '/test_node' #default 00058 00059 class TopicDescription(object): 00060 def __init__(self, topic_name, topic_type): 00061 self.topic_name = topic_name 00062 self.topic_type = topic_type 00063 00064 #validate topic 00065 if not roslib.names.is_legal_name(topic_name): 00066 raise ValueError('topic name: %s'%(topic_name)) 00067 00068 # validate type 00069 try: 00070 p, t = roslib.names.package_resource_name(topic_type) 00071 except: 00072 raise ValueError('topic type: %s'%(topic_type)) 00073 00074 class TopicDescriptionList(object): 00075 00076 def __init__(self, xmlrpcvalue): 00077 # [ [topic1, topicType1]...[topicN, topicTypeN]]] 00078 if not type(xmlrpcvalue) == list: 00079 raise ValueError("publications must be a list") 00080 self.topics = [] 00081 for n, t in xmlrpcvalue: 00082 self.topics.append(TopicDescription(n, t)) 00083 00084 def as_dict(self): 00085 d = {} 00086 for t in self.topics: 00087 d[t.topic_name] = t.topic_type 00088 return d 00089 00090 class TestSlaveApi(unittest.TestCase): 00091 00092 def __init__(self, *args, **kwds): 00093 super(TestSlaveApi, self).__init__(*args) 00094 00095 self.ns = os.environ.get(roslib.rosenv.ROS_NAMESPACE, roslib.names.GLOBALNS) 00096 00097 # load in name of test node 00098 self.test_node = 'test_node' #default 00099 self.required_pubs = TopicDescriptionList([]) 00100 self.required_subs = TopicDescriptionList([]) 00101 00102 for arg in sys.argv: 00103 if arg.startswith("--node="): 00104 self.test_node = arg[len("--node="):] 00105 if arg.startswith("--profile="): 00106 self.test_node_profile = arg[len("--profile="):] 00107 self.load_profile(self.test_node_profile) 00108 00109 # resolve 00110 self.test_node = roslib.names.ns_join(self.ns, self.test_node) 00111 00112 def load_profile(self, filename): 00113 import yaml 00114 with open(filename) as f: 00115 d = yaml.load(f) 00116 self.required_pubs = d.get('pubs', {}) 00117 self.required_subs = d.get('subs', {}) 00118 00119 def setUp(self): 00120 self.caller_id = CALLER_ID 00121 # retrieve handle on node 00122 # give ourselves 10 seconds for node to appear 00123 timeout_t = 10.0 + time.time() 00124 self.node_api = None 00125 self.master = rosgraph.masterapi.Master(self.caller_id) 00126 while time.time() < timeout_t and not self.node_api: 00127 try: 00128 self.node_api = self.master.lookupNode(self.test_node) 00129 except: 00130 time.sleep(0.1) 00131 if not self.node_api: 00132 self.fail("master did not return XML-RPC API for [%s, %s]"%(self.caller_id, self.test_node)) 00133 print "[%s] API = %s"%(self.test_node, self.node_api) 00134 self.assert_(self.node_api.startswith('http')) 00135 self.node = xmlrpclib.ServerProxy(self.node_api) 00136 00137 # hack: sleep for a couple seconds just in case the node is 00138 # still registering with the master. 00139 time.sleep(2.) 00140 00141 def apiSuccess(self, args): 00142 """ 00143 unit test assertion that fails if status code is not 1 and otherwise returns the value parameter. 00144 @param args: returnv value from ROS API call 00145 @type args: [int, str, val] 00146 @return: value parameter from args (arg[2] for master/slave API) 00147 """ 00148 self.assert_(len(args) == 3, "invalid API return value triplet: %s"%str(args)) 00149 self.last_code, self.last_msg, self.last_val = args 00150 assert self.last_code == 1, "status code is not 1: %s"%self.last_msg 00151 return self.last_val 00152 00153 def apiFail(self, args): 00154 """ 00155 unit test assertions that fails if status code is not 0 and otherwise returns true. 00156 @param args: returnv value from ROS API call 00157 @type args: [int, str, val] 00158 @return: True if status code is 0 00159 """ 00160 self.assert_(len(args) == 3, "invalid API return value triplet: %s"%str(args)) 00161 self.last_code, self.last_msg, self.last_val = args 00162 assert self.last_code == 0, "Call should have failed with status code 0: %s"%self.last_msg 00163 00164 def apiError(self, args, msg=None): 00165 """ 00166 unit test assertion that fails if status code is not -1 and otherwise returns true. 00167 @param args: returnv value from ROS API call 00168 @type args: [int, str, val] 00169 @return: True if status code is -1 00170 """ 00171 self.assert_(len(args) == 3, "invalid API return value triplet: %s"%str(args)) 00172 self.last_code, self.last_msg, self.last_val = args 00173 if msg: 00174 assert self.last_code == -1, "%s (return msg was %s)"%(msg, self.last_msg) 00175 else: 00176 assert self.last_code == -1, "Call should have returned error -1 code: %s"%self.last_msg 00177 00178 def check_uri(self, uri): 00179 """ 00180 validates a URI as being http(s) 00181 """ 00182 import urlparse 00183 parsed = urlparse.urlparse(uri) 00184 self.assert_(parsed[0] in ['http', 'https'], 'protocol [%s] is [%s] invalid'%(parsed[0], uri)) 00185 self.assert_(parsed[1], 'host missing [%s]'%uri) 00186 self.assert_(parsed.port, 'port missing/invalid [%s]'%uri) 00187 00188 def test_getPid(self): 00189 """ 00190 validate node.getPid(caller_id) 00191 """ 00192 # test success 00193 pid = self.apiSuccess(self.node.getPid(self.caller_id)) 00194 self.assert_(pid > 0) 00195 00196 # test with bad arity: accept error or fault 00197 try: 00198 self.apiError(self.node.getPid()) 00199 except xmlrpclib.Fault: 00200 pass 00201 00202 def test_rosout(self): 00203 """ 00204 make sure rosout is in publication and connection list 00205 """ 00206 val = self.apiSuccess(self.node.getPublications(self.caller_id)) 00207 pubs_d = TopicDescriptionList(val).as_dict() 00208 self.assertTrue('/rosout' in pubs_d, "node is not publishing to rosout") 00209 self.assertEquals('rosgraph_msgs/Log', pubs_d['/rosout'], "/rosout is not correct type") 00210 00211 def test_simtime(self): 00212 """ 00213 test that node obeys simtime (/Clock) contract 00214 00215 http://www.ros.org/wiki/Clock 00216 """ 00217 try: 00218 use_sim_time = self.master.getParam('/use_sim_time') 00219 except: 00220 use_sim_time = False 00221 00222 val = self.apiSuccess(self.node.getSubscriptions(self.caller_id)) 00223 subs_d = TopicDescriptionList(val).as_dict() 00224 if use_sim_time: 00225 self.assertTrue('/clock' in subs_d, "node is not subscribing to clock") 00226 self.assertEquals('rosgraph_msgs/Clock', subs_d['/clock'], "/clock is not correct type") 00227 else: 00228 self.assertFalse('/clock' in subs_d, "node is subscribed to /clock even though /use_sim_time is false") 00229 00230 def test_getPublications(self): 00231 """ 00232 validate node.getPublications(caller_id) 00233 """ 00234 # test success 00235 pubs_value = self.apiSuccess(self.node.getPublications(self.caller_id)) 00236 pubs = TopicDescriptionList(pubs_value) 00237 00238 pubs_dict = pubs.as_dict() 00239 # this is separately tested by test_rosout 00240 if '/rosout' in pubs_dict: 00241 del pubs_dict['/rosout'] 00242 self.assertEquals(self.required_pubs, pubs_dict) 00243 00244 # test with bad arity: accept error or fault 00245 try: 00246 self.apiError(self.node.getPublications()) 00247 except xmlrpclib.Fault: 00248 pass 00249 try: 00250 self.apiError(self.node.getPublications(self.caller_id, 'something extra')) 00251 except xmlrpclib.Fault: 00252 pass 00253 00254 def test_getSubscriptions(self): 00255 """ 00256 validate node.getSubscriptions(caller_id) 00257 """ 00258 00259 # test success 00260 value = self.apiSuccess(self.node.getSubscriptions(self.caller_id)) 00261 subs = TopicDescriptionList(value) 00262 00263 subs_dict = subs.as_dict() 00264 self.assertEquals(self.required_subs, subs_dict) 00265 00266 # test with bad arity: accept error or fault 00267 try: 00268 self.apiError(self.node.getSubscriptions()) 00269 except xmlrpclib.Fault: 00270 pass 00271 try: 00272 self.apiError(self.node.getSubscriptions(self.caller_id, 'something extra')) 00273 except xmlrpclib.Fault: 00274 pass 00275 00276 ## validate node.paramUpdate(caller_id, key, value) 00277 def test_paramUpdate(self): 00278 node = self.node 00279 good_key = roslib.names.ns_join(self.ns, 'good_key') 00280 bad_key = roslib.names.ns_join(self.ns, 'bad_key') 00281 00282 # node is not subscribed to good_key (yet) 00283 self.apiError(node.paramUpdate(self.caller_id, good_key, 'good_value')) 00284 00285 # test bad key 00286 self.apiError(node.paramUpdate(self.caller_id, '', 'bad')) 00287 self.apiError(node.paramUpdate(self.caller_id, 'no_namespace', 'bad')) 00288 00289 # test with bad arity: accept error or fault 00290 try: 00291 self.apiError(node.paramUpdate(self.caller_id, bad_key)) 00292 except xmlrpclib.Fault: 00293 pass 00294 00295 try: 00296 self.apiError(node.paramUpdate(self.caller_id)) 00297 except xmlrpclib.Fault: 00298 pass 00299 00300 # we can't actually test success cases without forcing node to subscribe 00301 #self.apiSuccess(node.paramUpdate(self.caller_id, good_key, 1)) 00302 #self.apiSuccess(node.paramUpdate(self.caller_id, good_key, True)) 00303 #self.apiSuccess(node.paramUpdate(self.caller_id, good_key, 10.0)) 00304 00305 def xtest_getUri(self): 00306 """ 00307 Future: validate node.getUri(caller_id). It would be nice to 00308 make this official API as it provides some debugging info. 00309 """ 00310 # test success 00311 self.check_uri(self.apiSuccess(self.node.getUri(self.caller_id))) 00312 00313 # test bad arity 00314 try: 00315 self.apiError(self.node.getUri(self.caller_id, 'bad')) 00316 except xmlrpclib.Fault: 00317 pass 00318 try: 00319 self.apiError(self.node.getUri()) 00320 except xmlrpclib.Fault: 00321 pass 00322 00323 def test_getMasterUri(self): 00324 """ 00325 validate node.getMasterUri(caller_id) 00326 """ 00327 # test success 00328 uri = self.apiSuccess(self.node.getMasterUri(self.caller_id)) 00329 self.check_uri(uri) 00330 00331 # check against env, canonicalize for comparison 00332 import urlparse 00333 master_env = roslib.rosenv.get_master_uri() 00334 if not master_env.endswith('/'): 00335 master_env = master_env + '/' 00336 self.assertEquals(urlparse.urlparse(master_env), urlparse.urlparse(uri)) 00337 00338 # test bad arity 00339 try: 00340 self.apiError(self.node.getMasterUri(self.caller_id, 'bad')) 00341 except xmlrpclib.Fault: 00342 pass 00343 try: 00344 self.apiError(self.node.getMasterUri()) 00345 except xmlrpclib.Fault: 00346 pass 00347 00348 def test_publisherUpdate(self): 00349 """ 00350 validate node.publisherUpdate(caller_id, topic, uris) 00351 """ 00352 node = self.node 00353 probe_topic = roslib.names.ns_join(self.ns, 'probe_topic') 00354 fake_topic = roslib.names.ns_join(self.ns, 'fake_topic') 00355 00356 # test success 00357 # still success even if not actually interested in topic 00358 self.apiSuccess(node.publisherUpdate(self.caller_id, fake_topic, 00359 ['http://localhost:1234', 'http://localhost:5678'])) 00360 self.apiSuccess(node.publisherUpdate(self.caller_id, fake_topic, 00361 [])) 00362 # try it with it the /probe_topic, which will exercise some error branches in the client 00363 self.apiSuccess(node.publisherUpdate(self.caller_id, probe_topic, 00364 ['http://unroutablefakeservice:1234'])) 00365 # give it some time to make sure it's attempted contact 00366 time.sleep(1.0) 00367 # check that it's still there 00368 self.apiSuccess(node.publisherUpdate(self.caller_id, probe_topic, 00369 [])) 00370 00371 # test bad args 00372 try: 00373 self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', 'bad')) 00374 except xmlrpclib.Fault: 00375 pass 00376 try: 00377 self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', 2)) 00378 except xmlrpclib.Fault: 00379 pass 00380 try: 00381 self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', False)) 00382 except xmlrpclib.Fault: 00383 pass 00384 try: 00385 self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', ['bad'])) 00386 except xmlrpclib.Fault: 00387 pass 00388 00389 # test bad arity 00390 try: 00391 self.apiError(node.publisherUpdate()) 00392 except xmlrpclib.Fault: 00393 pass 00394 try: 00395 self.apiError(node.getBusStats(self.caller_id, 'bad')) 00396 except xmlrpclib.Fault: 00397 pass 00398 try: 00399 self.apiError(node.getBusStats()) 00400 except xmlrpclib.Fault: 00401 pass 00402 00403 def check_TCPROS(self, protocol_params): 00404 self.assert_(protocol_params, "no protocol params returned") 00405 self.assert_(type(protocol_params) == list, "protocol params must be a list: %s"%protocol_params) 00406 self.assertEquals(3, len(protocol_params), "TCPROS params should have length 3: %s"%protocol_params) 00407 self.assertEquals(protocol_params[0], TCPROS) 00408 # expect ['TCPROS', 1.2.3.4, 1234] 00409 self.assertEquals(protocol_params[0], TCPROS) 00410 00411 def testRequestTopic(self): 00412 node = self.node 00413 protocols = [[TCPROS]] 00414 00415 publications = node.getPublications(self.caller_id) 00416 00417 topics = self.required_pubs.keys() 00418 probe_topic = topics[0] if topics else None 00419 fake_topic = roslib.names.ns_join(self.ns, 'fake_topic') 00420 00421 # currently only support TCPROS as we require all clients to support this 00422 protocols = [[TCPROS]] 00423 for topic in topics: 00424 self.check_TCPROS(self.apiSuccess(node.requestTopic(self.caller_id, topic, protocols))) 00425 protocols = [['FakeTransport', 1234, 5678], [TCPROS], ['AnotherFakeTransport']] 00426 # try each one more time, this time with more protocol choices 00427 for topic in topics: 00428 self.check_TCPROS(self.apiSuccess(node.requestTopic(self.caller_id, topic, protocols))) 00429 00430 # test bad arity 00431 if probe_topic: 00432 try: 00433 self.apiError(node.requestTopic(self.caller_id, probe_topic, protocols, 'extra stuff')) 00434 except xmlrpclib.Fault: 00435 pass 00436 try: 00437 self.apiError(node.requestTopic(self.caller_id, probe_topic)) 00438 except xmlrpclib.Fault: 00439 pass 00440 try: 00441 self.apiError(node.requestTopic(self.caller_id)) 00442 except xmlrpclib.Fault: 00443 pass 00444 try: 00445 self.apiError(node.requestTopic()) 00446 except xmlrpclib.Fault: 00447 pass 00448 00449 # test bad args 00450 try: 00451 self.apiError(node.requestTopic(self.caller_id, 1, protocols)) 00452 except xmlrpclib.Fault: 00453 pass 00454 try: 00455 self.apiError(node.requestTopic(self.caller_id, '', protocols)) 00456 except xmlrpclib.Fault: 00457 pass 00458 try: 00459 self.apiError(node.requestTopic(self.caller_id, fake_topic, protocols)) 00460 except xmlrpclib.Fault: 00461 pass 00462 try: 00463 self.apiError(node.requestTopic(self.caller_id, probe_topic, 'fake-protocols')) 00464 except xmlrpclib.Fault: 00465 pass 00466 00467 00468 def test_getBusInfo(self): 00469 #TODO: finish 00470 # there should be a connection to rosout 00471 00472 # test bad arity 00473 try: 00474 self.apiError(self.node.getBusInfo(self.caller_id, 'bad')) 00475 except xmlrpclib.Fault: 00476 pass 00477 try: 00478 self.apiError(self.node.getBusInfo()) 00479 except xmlrpclib.Fault: 00480 pass 00481 00482 00483 ## test the state of the master based on expected node registration 00484 def test_registrations(self): 00485 # setUp() ensures the node has registered with the master 00486 00487 # check actual URIs 00488 node_name = self.test_node 00489 pubs, subs, srvs = self.master.getSystemState() 00490 pub_topics = [t for t, _ in pubs] 00491 sub_topics = [t for t, _ in subs] 00492 00493 # make sure all required topics are registered 00494 for t in self.required_pubs: 00495 self.assert_(t in pub_topics, "node did not register publication %s on master"%(t)) 00496 for t in self.required_subs: 00497 self.assert_(t in sub_topics, "node did not register subscription %s on master"%(t)) 00498 00499 # check for node URI on master 00500 for topic, node_list in pubs: 00501 if topic in self.required_pubs: 00502 self.assert_(node_name in node_list, "%s not in %s"%(self.node_api, node_list)) 00503 for topic, node_list in subs: 00504 if topic in self.required_subs: 00505 self.assert_(node_name in node_list, "%s not in %s"%(self.node_api, node_list)) 00506 for service, srv_list in srvs: 00507 #TODO: no service tests yet 00508 pass 00509 00510 if __name__ == '__main__': 00511 import rostest 00512 rostest.unitrun('test_ros', sys.argv[0], TestSlaveApi)