test_slave_api.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2008, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 
00034 # This is a rewrite of the old node API tests, which focus too much on
00035 # a completed node API and don't facilitate easy bring up of a new
00036 # client library.
00037 
00038 import os
00039 import sys
00040 import string
00041 import time
00042 import unittest
00043 import xmlrpclib
00044 
00045 import rosunit
00046 import rosgraph
00047 
00048 TCPROS = 'TCPROS'
00049 
00050 CALLER_ID = '/test_harness'
00051 TEST_NODE_NAME = '/test_node' #default
00052 
00053 class TopicDescription(object):
00054     def __init__(self, topic_name, topic_type):
00055         self.topic_name = topic_name
00056         self.topic_type = topic_type
00057 
00058         #validate topic
00059         if not rosgraph.names.is_legal_name(topic_name):
00060             raise ValueError('topic name: %s'%(topic_name))
00061         
00062         # validate type
00063         p, t = topic_type.split('/')
00064 
00065 class TopicDescriptionList(object):
00066     
00067     def __init__(self, xmlrpcvalue):
00068         # [ [topic1, topicType1]...[topicN, topicTypeN]]]
00069         if not type(xmlrpcvalue) == list:
00070             raise ValueError("publications must be a list")
00071         self.topics = []
00072         for n, t in xmlrpcvalue:
00073             self.topics.append(TopicDescription(n, t))
00074 
00075     def as_dict(self):
00076         d = {}
00077         for t in self.topics:
00078             d[t.topic_name] = t.topic_type
00079         return d
00080 
00081 class TestSlaveApi(unittest.TestCase):
00082 
00083     def __init__(self, *args, **kwds):
00084         super(TestSlaveApi, self).__init__(*args)
00085         
00086         self.ns = os.environ.get(rosgraph.ROS_NAMESPACE, rosgraph.names.GLOBALNS)
00087 
00088         # load in name of test node
00089         self.test_node = 'test_node' #default
00090         self.required_pubs = TopicDescriptionList([])
00091         self.required_subs = TopicDescriptionList([])
00092 
00093         for arg in sys.argv:
00094             if arg.startswith("--node="):
00095                 self.test_node = arg[len("--node="):]
00096             if arg.startswith("--profile="):
00097                 self.test_node_profile = arg[len("--profile="):]
00098                 self.load_profile(self.test_node_profile)
00099                 
00100         # resolve
00101         self.test_node = rosgraph.names.ns_join(self.ns, self.test_node)
00102                 
00103     def load_profile(self, filename):
00104         import yaml
00105         with open(filename) as f:
00106             d = yaml.load(f)
00107         self.required_pubs = d.get('pubs', {})
00108         self.required_subs = d.get('subs', {})
00109         
00110     def setUp(self):
00111         self.caller_id = CALLER_ID
00112         # retrieve handle on node
00113         # give ourselves 10 seconds for node to appear
00114         timeout_t = 10.0 + time.time()
00115         self.node_api = None
00116         self.master = rosgraph.Master(self.caller_id)
00117         while time.time() < timeout_t and not self.node_api:
00118             try:
00119                 self.node_api = self.master.lookupNode(self.test_node)
00120             except:
00121                 time.sleep(0.1)
00122         if not self.node_api:
00123             self.fail("master did not return XML-RPC API for [%s, %s]"%(self.caller_id, self.test_node))
00124         print "[%s] API  = %s"%(self.test_node, self.node_api)
00125         self.assert_(self.node_api.startswith('http'))
00126         self.node = xmlrpclib.ServerProxy(self.node_api)
00127 
00128         # hack: sleep for a couple seconds just in case the node is
00129         # still registering with the master.
00130         time.sleep(2.)
00131 
00132     def apiSuccess(self, args):
00133         """
00134         unit test assertion that fails if status code is not 1 and otherwise returns the value parameter.
00135         @param args: returnv value from ROS API call
00136         @type  args: [int, str, val]
00137         @return: value parameter from args (arg[2] for master/slave API)
00138         """
00139         self.assert_(len(args) == 3, "invalid API return value triplet: %s"%str(args))
00140         self.last_code, self.last_msg, self.last_val = args
00141         assert self.last_code == 1, "status code is not 1: %s"%self.last_msg
00142         return self.last_val
00143 
00144     def apiFail(self, args):
00145         """
00146         unit test assertions that fails if status code is not 0 and otherwise returns true.
00147         @param args: returnv value from ROS API call
00148         @type  args: [int, str, val]
00149         @return: True if status code is 0
00150         """
00151         self.assert_(len(args) == 3, "invalid API return value triplet: %s"%str(args))
00152         self.last_code, self.last_msg, self.last_val = args
00153         assert self.last_code == 0, "Call should have failed with status code 0: %s"%self.last_msg
00154 
00155     def apiError(self, args, msg=None):
00156         """
00157         unit test assertion that fails if status code is not -1 and otherwise returns true.
00158         @param args: returnv value from ROS API call
00159         @type  args: [int, str, val]
00160         @return: True if status code is -1
00161         """
00162         self.assert_(len(args) == 3, "invalid API return value triplet: %s"%str(args))
00163         self.last_code, self.last_msg, self.last_val = args
00164         if msg:
00165             assert self.last_code == -1, "%s (return msg was %s)"%(msg, self.last_msg)
00166         else:
00167             assert self.last_code == -1, "Call should have returned error -1 code: %s"%self.last_msg
00168 
00169     def check_uri(self, uri):
00170         """
00171         validates a URI as being http(s)
00172         """
00173         import urlparse
00174         parsed = urlparse.urlparse(uri)
00175         self.assert_(parsed[0] in ['http', 'https'], 'protocol [%s] is [%s] invalid'%(parsed[0], uri))
00176         self.assert_(parsed[1], 'host missing [%s]'%uri)
00177         self.assert_(parsed.port, 'port missing/invalid [%s]'%uri)        
00178 
00179     def test_getPid(self):
00180         """
00181         validate node.getPid(caller_id)        
00182         """
00183         # test success        
00184         pid = self.apiSuccess(self.node.getPid(self.caller_id))
00185         self.assert_(pid > 0)
00186 
00187         # test with bad arity: accept error or fault
00188         try:
00189             self.apiError(self.node.getPid())
00190         except xmlrpclib.Fault:
00191             pass
00192 
00193     def test_rosout(self):
00194         """
00195         make sure rosout is in publication and connection list
00196         """
00197         val = self.apiSuccess(self.node.getPublications(self.caller_id))
00198         pubs_d = TopicDescriptionList(val).as_dict()
00199         self.assertTrue('/rosout' in pubs_d, "node is not publishing to rosout")
00200         self.assertEquals('rosgraph_msgs/Log', pubs_d['/rosout'], "/rosout is not correct type")
00201         
00202     def test_simtime(self):
00203         """
00204         test that node obeys simtime (/Clock) contract
00205 
00206         http://www.ros.org/wiki/Clock
00207         """
00208         try:
00209             use_sim_time = self.master.getParam('/use_sim_time')
00210         except:
00211             use_sim_time = False
00212             
00213         val = self.apiSuccess(self.node.getSubscriptions(self.caller_id))
00214         subs_d = TopicDescriptionList(val).as_dict()
00215         if use_sim_time:
00216             self.assertTrue('/clock' in subs_d, "node is not subscribing to clock")
00217             self.assertEquals('rosgraph_msgs/Clock', subs_d['/clock'], "/clock is not correct type")
00218         else:
00219             self.assertFalse('/clock' in subs_d, "node is subscribed to /clock even though /use_sim_time is false")
00220 
00221     def test_getPublications(self):
00222         """
00223         validate node.getPublications(caller_id)
00224         """
00225         # test success
00226         pubs_value = self.apiSuccess(self.node.getPublications(self.caller_id))
00227         pubs = TopicDescriptionList(pubs_value)
00228 
00229         pubs_dict = pubs.as_dict()
00230         # this is separately tested by test_rosout
00231         if '/rosout' in pubs_dict:
00232             del pubs_dict['/rosout']
00233         self.assertEquals(self.required_pubs, pubs_dict)
00234         
00235         # test with bad arity: accept error or fault
00236         try:
00237             self.apiError(self.node.getPublications())
00238         except xmlrpclib.Fault:
00239             pass
00240         try:
00241             self.apiError(self.node.getPublications(self.caller_id, 'something extra'))
00242         except xmlrpclib.Fault:
00243             pass
00244         
00245     def test_getSubscriptions(self):
00246         """
00247         validate node.getSubscriptions(caller_id)
00248         """
00249         
00250         # test success
00251         value = self.apiSuccess(self.node.getSubscriptions(self.caller_id))
00252         subs = TopicDescriptionList(value)
00253 
00254         subs_dict = subs.as_dict()
00255         self.assertEquals(self.required_subs, subs_dict)
00256 
00257         # test with bad arity: accept error or fault
00258         try:
00259             self.apiError(self.node.getSubscriptions())
00260         except xmlrpclib.Fault:
00261             pass
00262         try:
00263             self.apiError(self.node.getSubscriptions(self.caller_id, 'something extra'))        
00264         except xmlrpclib.Fault:
00265             pass
00266 
00267     ## validate node.paramUpdate(caller_id, key, value)
00268     def test_paramUpdate(self):
00269         node = self.node
00270         good_key = rosgraph.names.ns_join(self.ns, 'good_key')
00271         bad_key = rosgraph.names.ns_join(self.ns, 'bad_key')
00272         
00273         # node is not subscribed to good_key (yet)
00274         self.apiError(node.paramUpdate(self.caller_id, good_key, 'good_value'))
00275 
00276         # test bad key
00277         self.apiError(node.paramUpdate(self.caller_id, '', 'bad'))
00278         self.apiError(node.paramUpdate(self.caller_id, 'no_namespace', 'bad'))
00279 
00280         # test with bad arity: accept error or fault
00281         try:
00282             self.apiError(node.paramUpdate(self.caller_id, bad_key))     
00283         except xmlrpclib.Fault:
00284             pass
00285 
00286         try:
00287             self.apiError(node.paramUpdate(self.caller_id)) 
00288         except xmlrpclib.Fault:
00289             pass
00290 
00291         # we can't actually test success cases without forcing node to subscribe
00292         #self.apiSuccess(node.paramUpdate(self.caller_id, good_key, 1))
00293         #self.apiSuccess(node.paramUpdate(self.caller_id, good_key, True))
00294         #self.apiSuccess(node.paramUpdate(self.caller_id, good_key, 10.0))
00295 
00296     def xtest_getUri(self):
00297         """
00298         Future: validate node.getUri(caller_id).  It would be nice to
00299         make this official API as it provides some debugging info.
00300         """
00301         # test success
00302         self.check_uri(self.apiSuccess(self.node.getUri(self.caller_id)))
00303         
00304         # test bad arity
00305         try:
00306             self.apiError(self.node.getUri(self.caller_id, 'bad'))
00307         except xmlrpclib.Fault:
00308             pass
00309         try:
00310             self.apiError(self.node.getUri())
00311         except xmlrpclib.Fault:
00312             pass
00313             
00314     def test_getMasterUri(self):
00315         """
00316         validate node.getMasterUri(caller_id)                
00317         """
00318         # test success
00319         uri = self.apiSuccess(self.node.getMasterUri(self.caller_id))
00320         self.check_uri(uri)
00321 
00322         # check against env, canonicalize for comparison
00323         import urlparse
00324         master_env = rosgraph.get_master_uri()
00325         if not master_env.endswith('/'):
00326             master_env = master_env + '/'
00327         self.assertEquals(urlparse.urlparse(master_env), urlparse.urlparse(uri))
00328 
00329         # test bad arity
00330         try:
00331             self.apiError(self.node.getMasterUri(self.caller_id, 'bad'))
00332         except xmlrpclib.Fault:
00333             pass
00334         try:
00335             self.apiError(self.node.getMasterUri())
00336         except xmlrpclib.Fault:
00337             pass
00338 
00339     def test_publisherUpdate(self):
00340         """
00341         validate node.publisherUpdate(caller_id, topic, uris) 
00342         """
00343         node = self.node
00344         probe_topic = rosgraph.names.ns_join(self.ns, 'probe_topic')
00345         fake_topic = rosgraph.names.ns_join(self.ns, 'fake_topic')        
00346         
00347         # test success
00348         # still success even if not actually interested in topic
00349         self.apiSuccess(node.publisherUpdate(self.caller_id, fake_topic,
00350                                              ['http://localhost:1234', 'http://localhost:5678']))
00351         self.apiSuccess(node.publisherUpdate(self.caller_id, fake_topic,
00352                                              []))
00353         # try it with it the /probe_topic, which will exercise some error branches in the client
00354         self.apiSuccess(node.publisherUpdate(self.caller_id, probe_topic,
00355                                              ['http://unroutablefakeservice:1234']))
00356         # give it some time to make sure it's attempted contact
00357         time.sleep(1.0)
00358         # check that it's still there
00359         self.apiSuccess(node.publisherUpdate(self.caller_id, probe_topic,
00360                                                   []))
00361         
00362         # test bad args
00363         try:
00364             self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', 'bad'))
00365         except xmlrpclib.Fault:
00366             pass
00367         try:
00368             self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', 2))
00369         except xmlrpclib.Fault:
00370             pass
00371         try:
00372             self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', False))                
00373         except xmlrpclib.Fault:
00374             pass
00375         try:
00376             self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', ['bad']))
00377         except xmlrpclib.Fault:
00378             pass
00379 
00380         # test bad arity
00381         try:
00382             self.apiError(node.publisherUpdate())
00383         except xmlrpclib.Fault:
00384             pass
00385         try:
00386             self.apiError(node.getBusStats(self.caller_id, 'bad'))
00387         except xmlrpclib.Fault:
00388             pass
00389         try:
00390             self.apiError(node.getBusStats())
00391         except xmlrpclib.Fault:
00392             pass
00393 
00394     def check_TCPROS(self, protocol_params):
00395         self.assert_(protocol_params, "no protocol params returned")
00396         self.assert_(type(protocol_params) == list, "protocol params must be a list: %s"%protocol_params)
00397         self.assertEquals(3, len(protocol_params), "TCPROS params should have length 3: %s"%protocol_params)
00398         self.assertEquals(protocol_params[0], TCPROS)
00399         # expect ['TCPROS', 1.2.3.4, 1234]
00400         self.assertEquals(protocol_params[0], TCPROS)            
00401         
00402     def testRequestTopic(self):
00403         node = self.node
00404         protocols = [[TCPROS]]
00405 
00406         publications = node.getPublications(self.caller_id)
00407         
00408         topics = self.required_pubs.keys()
00409         probe_topic = topics[0] if topics else None
00410         fake_topic = rosgraph.names.ns_join(self.ns, 'fake_topic')
00411         
00412         # currently only support TCPROS as we require all clients to support this
00413         protocols = [[TCPROS]]
00414         for topic in topics:
00415             self.check_TCPROS(self.apiSuccess(node.requestTopic(self.caller_id, topic, protocols)))
00416         protocols = [['FakeTransport', 1234, 5678], [TCPROS], ['AnotherFakeTransport']]
00417         # try each one more time, this time with more protocol choices
00418         for topic in topics:
00419             self.check_TCPROS(self.apiSuccess(node.requestTopic(self.caller_id, topic, protocols)))
00420             
00421         # test bad arity
00422         if probe_topic:
00423             try:
00424                 self.apiError(node.requestTopic(self.caller_id, probe_topic, protocols, 'extra stuff'))
00425             except xmlrpclib.Fault:
00426                 pass
00427             try:
00428                 self.apiError(node.requestTopic(self.caller_id, probe_topic))
00429             except xmlrpclib.Fault:
00430                 pass
00431             try:
00432                 self.apiError(node.requestTopic(self.caller_id))
00433             except xmlrpclib.Fault:
00434                 pass
00435             try:
00436                 self.apiError(node.requestTopic())
00437             except xmlrpclib.Fault:
00438                 pass
00439 
00440         # test bad args
00441         try:
00442             self.apiError(node.requestTopic(self.caller_id, 1, protocols))
00443         except xmlrpclib.Fault:
00444             pass
00445         try:
00446             self.apiError(node.requestTopic(self.caller_id, '', protocols))
00447         except xmlrpclib.Fault:
00448             pass
00449         try:
00450             self.apiError(node.requestTopic(self.caller_id, fake_topic, protocols))
00451         except xmlrpclib.Fault:
00452             pass
00453         try:
00454             self.apiError(node.requestTopic(self.caller_id, probe_topic, 'fake-protocols')) 
00455         except xmlrpclib.Fault:
00456             pass
00457 
00458         
00459     def test_getBusInfo(self):
00460         #TODO: finish
00461         # there should be a connection to rosout
00462         
00463         # test bad arity
00464         try:
00465             self.apiError(self.node.getBusInfo(self.caller_id, 'bad'))
00466         except xmlrpclib.Fault:
00467             pass
00468         try:
00469             self.apiError(self.node.getBusInfo())
00470         except xmlrpclib.Fault:
00471             pass
00472 
00473         
00474     ## test the state of the master based on expected node registration
00475     def test_registrations(self):
00476         # setUp() ensures the node has registered with the master
00477 
00478         # check actual URIs
00479         node_name = self.test_node
00480         pubs, subs, srvs = self.master.getSystemState()
00481         pub_topics = [t for t, _ in pubs]
00482         sub_topics = [t for t, _ in subs]
00483         
00484         # make sure all required topics are registered
00485         for t in self.required_pubs:
00486             self.assert_(t in pub_topics, "node did not register publication %s on master"%(t))
00487         for t in self.required_subs:
00488             self.assert_(t in sub_topics, "node did not register subscription %s on master"%(t))
00489         
00490         # check for node URI on master
00491         for topic, node_list in pubs:
00492             if topic in self.required_pubs:
00493                 self.assert_(node_name in node_list, "%s not in %s"%(self.node_api, node_list))
00494         for topic, node_list in subs:
00495             if topic in self.required_subs:
00496                 self.assert_(node_name in node_list, "%s not in %s"%(self.node_api, node_list))
00497         for service, srv_list in srvs:
00498             #TODO: no service tests yet
00499             pass
00500 
00501 if __name__ == '__main__':
00502     rosunit.unitrun('test_rosmaster', sys.argv[0], TestSlaveApi)


test_rosmaster
Author(s): Ken Conley
autogenerated on Fri Aug 28 2015 12:33:45