test_slave_api.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2008, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 
00034 # This is a rewrite of the old node API tests, which focus too much on
00035 # a completed node API and don't facilitate easy bring up of a new
00036 # client library.
00037 
00038 import os
00039 import sys
00040 import string
00041 import time
00042 import unittest
00043 try:
00044     from xmlrpc.client import Fault, ServerProxy
00045 except ImportError:
00046     from xmlrpclib import Fault, ServerProxy
00047 
00048 import rosunit
00049 import rosgraph
00050 
00051 TCPROS = 'TCPROS'
00052 
00053 CALLER_ID = '/test_harness'
00054 TEST_NODE_NAME = '/test_node' #default
00055 
00056 class TopicDescription(object):
00057     def __init__(self, topic_name, topic_type):
00058         self.topic_name = topic_name
00059         self.topic_type = topic_type
00060 
00061         #validate topic
00062         if not rosgraph.names.is_legal_name(topic_name):
00063             raise ValueError('topic name: %s'%(topic_name))
00064         
00065         # validate type
00066         p, t = topic_type.split('/')
00067 
00068 class TopicDescriptionList(object):
00069     
00070     def __init__(self, xmlrpcvalue):
00071         # [ [topic1, topicType1]...[topicN, topicTypeN]]]
00072         if not type(xmlrpcvalue) == list:
00073             raise ValueError("publications must be a list")
00074         self.topics = []
00075         for n, t in xmlrpcvalue:
00076             self.topics.append(TopicDescription(n, t))
00077 
00078     def as_dict(self):
00079         d = {}
00080         for t in self.topics:
00081             d[t.topic_name] = t.topic_type
00082         return d
00083 
00084 class TestSlaveApi(unittest.TestCase):
00085 
00086     def __init__(self, *args, **kwds):
00087         super(TestSlaveApi, self).__init__(*args)
00088         
00089         self.ns = os.environ.get(rosgraph.ROS_NAMESPACE, rosgraph.names.GLOBALNS)
00090 
00091         # load in name of test node
00092         self.test_node = 'test_node' #default
00093         self.required_pubs = TopicDescriptionList([])
00094         self.required_subs = TopicDescriptionList([])
00095 
00096         for arg in sys.argv:
00097             if arg.startswith("--node="):
00098                 self.test_node = arg[len("--node="):]
00099             if arg.startswith("--profile="):
00100                 self.test_node_profile = arg[len("--profile="):]
00101                 self.load_profile(self.test_node_profile)
00102                 
00103         # resolve
00104         self.test_node = rosgraph.names.ns_join(self.ns, self.test_node)
00105                 
00106     def load_profile(self, filename):
00107         import yaml
00108         with open(filename) as f:
00109             d = yaml.load(f)
00110         self.required_pubs = d.get('pubs', {})
00111         self.required_subs = d.get('subs', {})
00112         
00113     def setUp(self):
00114         self.caller_id = CALLER_ID
00115         # retrieve handle on node
00116         # give ourselves 10 seconds for node to appear
00117         timeout_t = 10.0 + time.time()
00118         self.node_api = None
00119         self.master = rosgraph.Master(self.caller_id)
00120         while time.time() < timeout_t and not self.node_api:
00121             try:
00122                 self.node_api = self.master.lookupNode(self.test_node)
00123             except:
00124                 time.sleep(0.1)
00125         if not self.node_api:
00126             self.fail("master did not return XML-RPC API for [%s, %s]"%(self.caller_id, self.test_node))
00127         print "[%s] API  = %s"%(self.test_node, self.node_api)
00128         self.assert_(self.node_api.startswith('http'))
00129         self.node = ServerProxy(self.node_api)
00130 
00131         # hack: sleep for a couple seconds just in case the node is
00132         # still registering with the master.
00133         time.sleep(2.)
00134 
00135     def apiSuccess(self, args):
00136         """
00137         unit test assertion that fails if status code is not 1 and otherwise returns the value parameter.
00138         @param args: returnv value from ROS API call
00139         @type  args: [int, str, val]
00140         @return: value parameter from args (arg[2] for master/slave API)
00141         """
00142         self.assert_(len(args) == 3, "invalid API return value triplet: %s"%str(args))
00143         self.last_code, self.last_msg, self.last_val = args
00144         assert self.last_code == 1, "status code is not 1: %s"%self.last_msg
00145         return self.last_val
00146 
00147     def apiFail(self, args):
00148         """
00149         unit test assertions that fails if status code is not 0 and otherwise returns true.
00150         @param args: returnv value from ROS API call
00151         @type  args: [int, str, val]
00152         @return: True if status code is 0
00153         """
00154         self.assert_(len(args) == 3, "invalid API return value triplet: %s"%str(args))
00155         self.last_code, self.last_msg, self.last_val = args
00156         assert self.last_code == 0, "Call should have failed with status code 0: %s"%self.last_msg
00157 
00158     def apiError(self, args, msg=None):
00159         """
00160         unit test assertion that fails if status code is not -1 and otherwise returns true.
00161         @param args: returnv value from ROS API call
00162         @type  args: [int, str, val]
00163         @return: True if status code is -1
00164         """
00165         self.assert_(len(args) == 3, "invalid API return value triplet: %s"%str(args))
00166         self.last_code, self.last_msg, self.last_val = args
00167         if msg:
00168             assert self.last_code == -1, "%s (return msg was %s)"%(msg, self.last_msg)
00169         else:
00170             assert self.last_code == -1, "Call should have returned error -1 code: %s"%self.last_msg
00171 
00172     def check_uri(self, uri):
00173         """
00174         validates a URI as being http(s)
00175         """
00176         import urlparse
00177         parsed = urlparse.urlparse(uri)
00178         self.assert_(parsed[0] in ['http', 'https'], 'protocol [%s] is [%s] invalid'%(parsed[0], uri))
00179         self.assert_(parsed[1], 'host missing [%s]'%uri)
00180         self.assert_(parsed.port, 'port missing/invalid [%s]'%uri)        
00181 
00182     def test_getPid(self):
00183         """
00184         validate node.getPid(caller_id)        
00185         """
00186         # test success        
00187         pid = self.apiSuccess(self.node.getPid(self.caller_id))
00188         self.assert_(pid > 0)
00189 
00190         # test with bad arity: accept error or fault
00191         try:
00192             self.apiError(self.node.getPid())
00193         except Fault:
00194             pass
00195 
00196     def test_rosout(self):
00197         """
00198         make sure rosout is in publication and connection list
00199         """
00200         val = self.apiSuccess(self.node.getPublications(self.caller_id))
00201         pubs_d = TopicDescriptionList(val).as_dict()
00202         self.assertTrue('/rosout' in pubs_d, "node is not publishing to rosout")
00203         self.assertEquals('rosgraph_msgs/Log', pubs_d['/rosout'], "/rosout is not correct type")
00204         
00205     def test_simtime(self):
00206         """
00207         test that node obeys simtime (/Clock) contract
00208 
00209         http://www.ros.org/wiki/Clock
00210         """
00211         try:
00212             use_sim_time = self.master.getParam('/use_sim_time')
00213         except:
00214             use_sim_time = False
00215             
00216         val = self.apiSuccess(self.node.getSubscriptions(self.caller_id))
00217         subs_d = TopicDescriptionList(val).as_dict()
00218         if use_sim_time:
00219             self.assertTrue('/clock' in subs_d, "node is not subscribing to clock")
00220             self.assertEquals('rosgraph_msgs/Clock', subs_d['/clock'], "/clock is not correct type")
00221         else:
00222             self.assertFalse('/clock' in subs_d, "node is subscribed to /clock even though /use_sim_time is false")
00223 
00224     def test_getPublications(self):
00225         """
00226         validate node.getPublications(caller_id)
00227         """
00228         # test success
00229         pubs_value = self.apiSuccess(self.node.getPublications(self.caller_id))
00230         pubs = TopicDescriptionList(pubs_value)
00231 
00232         pubs_dict = pubs.as_dict()
00233         # this is separately tested by test_rosout
00234         if '/rosout' in pubs_dict:
00235             del pubs_dict['/rosout']
00236         self.assertEquals(self.required_pubs, pubs_dict)
00237         
00238         # test with bad arity: accept error or fault
00239         try:
00240             self.apiError(self.node.getPublications())
00241         except Fault:
00242             pass
00243         try:
00244             self.apiError(self.node.getPublications(self.caller_id, 'something extra'))
00245         except Fault:
00246             pass
00247         
00248     def test_getSubscriptions(self):
00249         """
00250         validate node.getSubscriptions(caller_id)
00251         """
00252         
00253         # test success
00254         value = self.apiSuccess(self.node.getSubscriptions(self.caller_id))
00255         subs = TopicDescriptionList(value)
00256 
00257         subs_dict = subs.as_dict()
00258         self.assertEquals(self.required_subs, subs_dict)
00259 
00260         # test with bad arity: accept error or fault
00261         try:
00262             self.apiError(self.node.getSubscriptions())
00263         except Fault:
00264             pass
00265         try:
00266             self.apiError(self.node.getSubscriptions(self.caller_id, 'something extra'))        
00267         except Fault:
00268             pass
00269 
00270     ## validate node.paramUpdate(caller_id, key, value)
00271     def test_paramUpdate(self):
00272         node = self.node
00273         good_key = rosgraph.names.ns_join(self.ns, 'good_key')
00274         bad_key = rosgraph.names.ns_join(self.ns, 'bad_key')
00275         
00276         # node is not subscribed to good_key (yet)
00277         self.apiError(node.paramUpdate(self.caller_id, good_key, 'good_value'))
00278 
00279         # test bad key
00280         self.apiError(node.paramUpdate(self.caller_id, '', 'bad'))
00281         self.apiError(node.paramUpdate(self.caller_id, 'no_namespace', 'bad'))
00282 
00283         # test with bad arity: accept error or fault
00284         try:
00285             self.apiError(node.paramUpdate(self.caller_id, bad_key))     
00286         except Fault:
00287             pass
00288 
00289         try:
00290             self.apiError(node.paramUpdate(self.caller_id)) 
00291         except Fault:
00292             pass
00293 
00294         # we can't actually test success cases without forcing node to subscribe
00295         #self.apiSuccess(node.paramUpdate(self.caller_id, good_key, 1))
00296         #self.apiSuccess(node.paramUpdate(self.caller_id, good_key, True))
00297         #self.apiSuccess(node.paramUpdate(self.caller_id, good_key, 10.0))
00298 
00299     def xtest_getUri(self):
00300         """
00301         Future: validate node.getUri(caller_id).  It would be nice to
00302         make this official API as it provides some debugging info.
00303         """
00304         # test success
00305         self.check_uri(self.apiSuccess(self.node.getUri(self.caller_id)))
00306         
00307         # test bad arity
00308         try:
00309             self.apiError(self.node.getUri(self.caller_id, 'bad'))
00310         except Fault:
00311             pass
00312         try:
00313             self.apiError(self.node.getUri())
00314         except Fault:
00315             pass
00316             
00317     def test_getMasterUri(self):
00318         """
00319         validate node.getMasterUri(caller_id)                
00320         """
00321         # test success
00322         uri = self.apiSuccess(self.node.getMasterUri(self.caller_id))
00323         self.check_uri(uri)
00324 
00325         # check against env, canonicalize for comparison
00326         import urlparse
00327         master_env = rosgraph.get_master_uri()
00328         if not master_env.endswith('/'):
00329             master_env = master_env + '/'
00330         self.assertEquals(urlparse.urlparse(master_env), urlparse.urlparse(uri))
00331 
00332         # test bad arity
00333         try:
00334             self.apiError(self.node.getMasterUri(self.caller_id, 'bad'))
00335         except Fault:
00336             pass
00337         try:
00338             self.apiError(self.node.getMasterUri())
00339         except Fault:
00340             pass
00341 
00342     def test_publisherUpdate(self):
00343         """
00344         validate node.publisherUpdate(caller_id, topic, uris) 
00345         """
00346         node = self.node
00347         probe_topic = rosgraph.names.ns_join(self.ns, 'probe_topic')
00348         fake_topic = rosgraph.names.ns_join(self.ns, 'fake_topic')        
00349         
00350         # test success
00351         # still success even if not actually interested in topic
00352         self.apiSuccess(node.publisherUpdate(self.caller_id, fake_topic,
00353                                              ['http://localhost:1234', 'http://localhost:5678']))
00354         self.apiSuccess(node.publisherUpdate(self.caller_id, fake_topic,
00355                                              []))
00356         # try it with it the /probe_topic, which will exercise some error branches in the client
00357         self.apiSuccess(node.publisherUpdate(self.caller_id, probe_topic,
00358                                              ['http://unroutablefakeservice:1234']))
00359         # give it some time to make sure it's attempted contact
00360         time.sleep(1.0)
00361         # check that it's still there
00362         self.apiSuccess(node.publisherUpdate(self.caller_id, probe_topic,
00363                                                   []))
00364         
00365         # test bad args
00366         try:
00367             self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', 'bad'))
00368         except Fault:
00369             pass
00370         try:
00371             self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', 2))
00372         except Fault:
00373             pass
00374         try:
00375             self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', False))                
00376         except Fault:
00377             pass
00378         try:
00379             self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', ['bad']))
00380         except Fault:
00381             pass
00382 
00383         # test bad arity
00384         try:
00385             self.apiError(node.publisherUpdate())
00386         except Fault:
00387             pass
00388         try:
00389             self.apiError(node.getBusStats(self.caller_id, 'bad'))
00390         except Fault:
00391             pass
00392         try:
00393             self.apiError(node.getBusStats())
00394         except Fault:
00395             pass
00396 
00397     def check_TCPROS(self, protocol_params):
00398         self.assert_(protocol_params, "no protocol params returned")
00399         self.assert_(type(protocol_params) == list, "protocol params must be a list: %s"%protocol_params)
00400         self.assertEquals(3, len(protocol_params), "TCPROS params should have length 3: %s"%protocol_params)
00401         self.assertEquals(protocol_params[0], TCPROS)
00402         # expect ['TCPROS', 1.2.3.4, 1234]
00403         self.assertEquals(protocol_params[0], TCPROS)            
00404         
00405     def testRequestTopic(self):
00406         node = self.node
00407         protocols = [[TCPROS]]
00408 
00409         publications = node.getPublications(self.caller_id)
00410         
00411         topics = self.required_pubs.keys()
00412         probe_topic = topics[0] if topics else None
00413         fake_topic = rosgraph.names.ns_join(self.ns, 'fake_topic')
00414         
00415         # currently only support TCPROS as we require all clients to support this
00416         protocols = [[TCPROS]]
00417         for topic in topics:
00418             self.check_TCPROS(self.apiSuccess(node.requestTopic(self.caller_id, topic, protocols)))
00419         protocols = [['FakeTransport', 1234, 5678], [TCPROS], ['AnotherFakeTransport']]
00420         # try each one more time, this time with more protocol choices
00421         for topic in topics:
00422             self.check_TCPROS(self.apiSuccess(node.requestTopic(self.caller_id, topic, protocols)))
00423             
00424         # test bad arity
00425         if probe_topic:
00426             try:
00427                 self.apiError(node.requestTopic(self.caller_id, probe_topic, protocols, 'extra stuff'))
00428             except Fault:
00429                 pass
00430             try:
00431                 self.apiError(node.requestTopic(self.caller_id, probe_topic))
00432             except Fault:
00433                 pass
00434             try:
00435                 self.apiError(node.requestTopic(self.caller_id))
00436             except Fault:
00437                 pass
00438             try:
00439                 self.apiError(node.requestTopic())
00440             except Fault:
00441                 pass
00442 
00443         # test bad args
00444         try:
00445             self.apiError(node.requestTopic(self.caller_id, 1, protocols))
00446         except Fault:
00447             pass
00448         try:
00449             self.apiError(node.requestTopic(self.caller_id, '', protocols))
00450         except Fault:
00451             pass
00452         try:
00453             self.apiError(node.requestTopic(self.caller_id, fake_topic, protocols))
00454         except Fault:
00455             pass
00456         try:
00457             self.apiError(node.requestTopic(self.caller_id, probe_topic, 'fake-protocols')) 
00458         except Fault:
00459             pass
00460 
00461         
00462     def test_getBusInfo(self):
00463         #TODO: finish
00464         # there should be a connection to rosout
00465         
00466         # test bad arity
00467         try:
00468             self.apiError(self.node.getBusInfo(self.caller_id, 'bad'))
00469         except Fault:
00470             pass
00471         try:
00472             self.apiError(self.node.getBusInfo())
00473         except Fault:
00474             pass
00475 
00476         
00477     ## test the state of the master based on expected node registration
00478     def test_registrations(self):
00479         # setUp() ensures the node has registered with the master
00480 
00481         # check actual URIs
00482         node_name = self.test_node
00483         pubs, subs, srvs = self.master.getSystemState()
00484         pub_topics = [t for t, _ in pubs]
00485         sub_topics = [t for t, _ in subs]
00486         
00487         # make sure all required topics are registered
00488         for t in self.required_pubs:
00489             self.assert_(t in pub_topics, "node did not register publication %s on master"%(t))
00490         for t in self.required_subs:
00491             self.assert_(t in sub_topics, "node did not register subscription %s on master"%(t))
00492         
00493         # check for node URI on master
00494         for topic, node_list in pubs:
00495             if topic in self.required_pubs:
00496                 self.assert_(node_name in node_list, "%s not in %s"%(self.node_api, node_list))
00497         for topic, node_list in subs:
00498             if topic in self.required_subs:
00499                 self.assert_(node_name in node_list, "%s not in %s"%(self.node_api, node_list))
00500         for service, srv_list in srvs:
00501             #TODO: no service tests yet
00502             pass
00503 
00504 if __name__ == '__main__':
00505     rosunit.unitrun('test_rosmaster', sys.argv[0], TestSlaveApi)


test_rosmaster
Author(s): Ken Conley
autogenerated on Tue Mar 7 2017 03:45:38