node.py
Go to the documentation of this file.
00001 # Software License Agreement (BSD License)
00002 #
00003 # Copyright (c) 2008, Willow Garage, Inc.
00004 # All rights reserved.
00005 #
00006 # Redistribution and use in source and binary forms, with or without
00007 # modification, are permitted provided that the following conditions
00008 # are met:
00009 #
00010 #  * Redistributions of source code must retain the above copyright
00011 #    notice, this list of conditions and the following disclaimer.
00012 #  * Redistributions in binary form must reproduce the above
00013 #    copyright notice, this list of conditions and the following
00014 #    disclaimer in the documentation and/or other materials provided
00015 #    with the distribution.
00016 #  * Neither the name of Willow Garage, Inc. nor the names of its
00017 #    contributors may be used to endorse or promote products derived
00018 #    from this software without specific prior written permission.
00019 #
00020 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00021 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00022 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00023 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00024 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00025 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00026 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00027 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00028 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00029 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00030 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00031 # POSSIBILITY OF SUCH DAMAGE.
00032 #
00033 # Revision $Id: testSlave.py 1100 2008-05-29 20:23:54Z sfkwc $
00034 
00035 import os
00036 import sys
00037 import string
00038 import time
00039 try:
00040     from xmlrpc.client import ServerProxy
00041 except ImportError:
00042     from xmlrpclib import ServerProxy
00043 
00044 import rospy
00045 import rosgraph
00046 
00047 from rosclient import *
00048 
00049 NODE_INTEGRATION_NAME = "node_integration_test"
00050 
00051 
00052 _required_subscriptions = 'test_string_in', 'test_primitives_in', 'test_arrays_in', 'test_header_in', 'probe_topic'
00053 
00054 # only publishers determine topic type, so we test against their declared spec
00055 _required_publications_map = {
00056     'test_string_out': 'test_rosmaster/TestString',
00057     'test_primitives_out': 'test_rosmaster/TestPrimitives',
00058     'test_arrays_out': 'test_rosmaster/TestArrays',
00059     'test_header_out': 'test_rosmaster/TestHeader',
00060     }
00061 _required_publications  = _required_publications_map.keys()
00062 
00063 _TCPROS = 'TCPROS'
00064 
00065 # NOTE: probe_topic is a unpublished topic that merely exists to test
00066 # APIs that talk about subscriptions (e.g. publisherUpdate)
00067 
00068 _name = None
00069 ## set_node_name() must be called prior to the unit test so that the test harness knows its
00070 ## ROS name.
00071 def set_node_name(name):
00072     global _name
00073     _name = name
00074 
00075 # Have to try as hard as possible to not use rospy code in testing rospy code, so this is
00076 # a reimplementation of the caller ID spec so that NodeApiTestCase knows its name
00077 ## reimplementation of caller ID spec separately from rospy
00078 def get_caller_id():
00079     if _name is None:
00080         raise Exception("set_node_name has not been called yet")
00081     ros_ns = os.environ.get(rosgraph.ROS_NAMESPACE, rosgraph.names.GLOBALNS)
00082     return rosgraph.names.ns_join(ros_ns, _name)    
00083     
00084 ## Parent of node API and integration test cases. Performs common state setup
00085 class _NodeTestCase(TestRosClient):
00086 
00087     def __init__(self, *args):
00088         super(_NodeTestCase, self).__init__(*args)
00089         
00090         self.ns = os.environ.get(rosgraph.ROS_NAMESPACE, rosgraph.names.GLOBALNS)
00091         self.caller_id = get_caller_id()
00092 
00093         # load in name of test node
00094         self.test_node = 'test_node' #default
00095         for arg in sys.argv:
00096             if arg.startswith("--node="):
00097                 self.test_node = arg[len("--node="):]
00098         # resolve
00099         self.test_node = rosgraph.names.ns_join(self.ns, self.test_node)
00100                 
00101         
00102     def setUp(self):
00103         super(_NodeTestCase, self).setUp()
00104         # retrieve handle on node
00105         # give ourselves five seconds for node to appear
00106         import time
00107         timeout_t = 5.0 + time.time()
00108         self.node_api = None
00109         while time.time() < timeout_t and not self.node_api:
00110             code, msg, node_api = self.master.lookupNode(self.caller_id, self.test_node)
00111             if code == 1:
00112                 self.node_api = node_api
00113         if not self.node_api:
00114             self.fail("master did not return XML-RPC API for [%s, %s]"%(self.caller_id, self.test_node))
00115         print("[%s] API  = %s" %(self.test_node, self.node_api))
00116         self.assert_(self.node_api.startswith('http'))
00117         self.node = ServerProxy(self.node_api)
00118 
00119     ## validates a URI as being http(s)
00120     def _checkUri(self, uri):
00121         try:
00122             from urllib.parse import urlparse
00123         except ImportError:
00124             from urlparse import urlparse
00125         parsed = urlparse(uri)
00126         self.assert_(parsed[0] in ['http', 'https'], 'protocol [%s] in [%s] invalid'%(parsed[0], uri))
00127         self.assert_(parsed[1], 'host missing [%s]'%uri)
00128         if not sys.version.startswith('2.4'): #check not available on py24
00129             self.assert_(parsed.port, 'port missing/invalid [%s]'%uri)        
00130 
00131     ## dynamically create the expected topic->type map based on the current name resolution context
00132     def _createTopicTypeMap(self):
00133         new_map = {}
00134         for t in _required_publications_map.keys():
00135             new_map[rospy.resolve_name(t)] = _required_publications_map[t]
00136         return new_map
00137     
00138 ## Expects a single test node to be running with name 'test_node' and subscribed to 'test_string'
00139 class NodeApiTestCase(_NodeTestCase):
00140 
00141     ## validate node.getPid(caller_id)        
00142     def testGetPid(self):
00143         # test with bad arity
00144         self.apiError(self.node.getPid())
00145         # test success        
00146         pid = self.apiSuccess(self.node.getPid(self.caller_id))
00147         self.assert_(pid > 0)
00148 
00149     ## subroutine for testGetSubscriptions/testGetPublications
00150     def _checkTopics(self, required, actual):
00151         actual = [t for t, _ in actual]
00152         missing = set(required) - set(actual)
00153         self.failIf(len(missing), 'missing required topics: %s'%(','.join(missing)))
00154 
00155     ## validate node.getPublications(caller_id)
00156     def testGetPublications(self):
00157         # test with bad arity
00158         self.apiError(self.node.getPublications())
00159         self.apiError(self.node.getPublications(self.caller_id, 'something extra'))
00160         
00161         # test success
00162         self._checkTopics([rospy.resolve_name(t) for t in _required_publications],
00163                           self.apiSuccess(self.node.getPublications(self.caller_id)))
00164     ## validate node.getSubscriptions(caller_id)
00165     def testGetSubscriptions(self):
00166         # test with bad arity
00167         self.apiError(self.node.getSubscriptions())
00168         self.apiError(self.node.getSubscriptions(self.caller_id, 'something extra'))        
00169         # test success
00170         self._checkTopics([rospy.resolve_name(t) for t in _required_subscriptions],
00171                           self.apiSuccess(self.node.getSubscriptions(self.caller_id)))
00172 
00173     ## validate node.paramUpdate(caller_id, key, value)
00174     def testParamUpdate(self):
00175         node = self.node
00176         good_key = rosgraph.names.ns_join(self.ns, 'good_key')
00177         bad_key = rosgraph.names.ns_join(self.ns, 'bad_key')
00178         
00179         # test bad key
00180         self.apiError(node.paramUpdate(self.caller_id, '', 'bad'))
00181         self.apiError(node.paramUpdate(self.caller_id, 'no_namespace', 'bad'))
00182         # test with bad arity
00183         self.apiError(node.paramUpdate(self.caller_id, bad_key))     
00184         self.apiError(node.paramUpdate(self.caller_id)) 
00185 
00186         # node is not subscribed to good_key (yet)
00187         self.apiError(node.paramUpdate(self.caller_id, good_key, 'good_value'))
00188 
00189         # we can't actually test success cases without forcing node to subscribe
00190         #self.apiSuccess(node.paramUpdate(self.caller_id, good_key, 1))
00191         #self.apiSuccess(node.paramUpdate(self.caller_id, good_key, True))
00192         #self.apiSuccess(node.paramUpdate(self.caller_id, good_key, 10.0))
00193 
00194     ## validate node.getUri(caller_id)        
00195     def testGetUri(self):
00196         # test bad arity
00197         self.apiError(self.node.getUri(self.caller_id, 'bad'))
00198         self.apiError(self.node.getUri())
00199         # test success
00200         self._checkUri(self.apiSuccess(self.node.getUri(self.caller_id)))
00201 
00202     ## validate node.getName(caller_id)        
00203     def testGetName(self):
00204         # test bad arity
00205         self.apiError(self.node.getName(self.caller_id, 'bad'))
00206         self.apiError(self.node.getName())
00207         # test success
00208         val = self.apiSuccess(self.node.getName(self.caller_id))
00209         self.assert_(len(val), "empty name")
00210 
00211     ## validate node.getMasterUri(caller_id)                
00212     def testGetMasterUri(self):
00213         # test bad arity
00214         self.apiError(self.node.getMasterUri(self.caller_id, 'bad'))
00215         self.apiError(self.node.getMasterUri())
00216         # test success
00217         uri = self.apiSuccess(self.node.getMasterUri(self.caller_id))
00218         self._checkUri(uri)
00219         self.assertEquals(rosgraph.get_master_uri(), uri)
00220 
00221     ## validate node.publisherUpdate(caller_id, topic, uris) 
00222     def testPublisherUpdate(self):
00223         node = self.node
00224         probe_topic = rosgraph.names.ns_join(self.ns, 'probe_topic')
00225         fake_topic = rosgraph.names.ns_join(self.ns, 'fake_topic')        
00226         # test bad arity
00227         self.apiError(node.getBusStats(self.caller_id, 'bad'))
00228         self.apiError(node.getBusStats())
00229         # test bad args
00230         self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', 'bad'))
00231         self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', 2))
00232         self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', False))                
00233         self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', ['bad']))
00234         self.apiError(node.publisherUpdate())
00235         
00236         # test success
00237         # still success even if not actually interested in topic
00238         self.apiSuccess(node.publisherUpdate(self.caller_id, fake_topic,
00239                                              ['http://localhost:1234', 'http://localhost:5678']))
00240         self.apiSuccess(node.publisherUpdate(self.caller_id, fake_topic,
00241                                              []))
00242         # try it with it the /probe_topic, which will exercise some error branches in the client
00243         self.apiSuccess(node.publisherUpdate(self.caller_id, probe_topic,
00244                                              ['http://unroutablefakeservice:1234']))
00245         # give it some time to make sure it's attempted contact
00246         time.sleep(1.0)
00247         # check that it's still there
00248         self.apiSuccess(node.publisherUpdate(self.caller_id, probe_topic,
00249                                                   []))
00250 
00251     def _checkTCPROS(self, protocol_params):
00252         self.assert_(protocol_params, "no protocol params returned")
00253         self.assert_(type(protocol_params) == list, "protocol params must be a list: %s"%protocol_params)
00254         self.assertEquals(3, len(protocol_params), "TCPROS params should have length 3: %s"%protocol_params)
00255         self.assertEquals(protocol_params[0], _TCPROS)
00256         # expect ['TCPROS', 1.2.3.4, 1234]
00257         self.assertEquals(protocol_params[0], _TCPROS)            
00258         
00259     def testRequestTopic(self):
00260         node = self.node
00261         protocols = [[_TCPROS]]
00262         probe_topic = rosgraph.names.ns_join(self.ns, 'probe_topic')
00263         fake_topic = rosgraph.names.ns_join(self.ns, 'fake_topic')
00264         
00265         # test bad arity
00266         self.apiError(node.requestTopic(self.caller_id, probe_topic, protocols, 'extra stuff'))
00267         self.apiError(node.requestTopic(self.caller_id, probe_topic))
00268         self.apiError(node.requestTopic(self.caller_id))
00269         self.apiError(node.requestTopic())
00270         # test bad args
00271         self.apiError(node.requestTopic(self.caller_id, 1, protocols))
00272         self.apiError(node.requestTopic(self.caller_id, '', protocols))
00273         self.apiError(node.requestTopic(self.caller_id, fake_topic, protocols))
00274         self.apiError(node.requestTopic(self.caller_id, probe_topic, 'fake-protocols')) 
00275         
00276         topics = [rosgraph.names.ns_join(self.ns, t) for t in _required_publications]
00277         # currently only support TCPROS as we require all clients to support this
00278         protocols = [[_TCPROS]]
00279         for topic in topics:
00280             self._checkTCPROS(self.apiSuccess(node.requestTopic(self.caller_id, topic, protocols)))
00281         protocols = [['FakeTransport', 1234, 5678], [_TCPROS], ['AnotherFakeTransport']]
00282         # try each one more time, this time with more protocol choices
00283         for topic in topics:
00284             self._checkTCPROS(self.apiSuccess(node.requestTopic(self.caller_id, topic, protocols)))
00285             
00286     def testGetBusInfo(self):
00287         # test bad arity
00288         self.apiError(self.node.getBusInfo(self.caller_id, 'bad'))
00289         self.apiError(self.node.getBusInfo())
00290         #TODO: finish
00291         
00292     def testGetBusStats(self):
00293         # test bad arity
00294         self.apiError(self.node.getBusStats(self.caller_id, 'bad'))
00295         self.apiError(self.node.getBusStats())
00296         #TODO: finish
00297 
00298     ## test the state of the master based on expected node registration
00299     def testRegistrations(self):
00300         # setUp() ensures the node has registered with the master
00301         topics = self.apiSuccess(self.master.getPublishedTopics(self.caller_id, ''))
00302         topic_names = [t for t, type in topics]
00303         required_topic_pubs = [rospy.resolve_name(t) for t in _required_publications]
00304         required_topic_subs = [rospy.resolve_name(t) for t in _required_subscriptions]        
00305         self._checkTopics(required_topic_pubs, topics)
00306         
00307         # now check types
00308         topicTypeMap = self._createTopicTypeMap()
00309         for topic, type in topics:
00310             if topic in topicTypeMap:
00311                 self.assertEquals(type, topicTypeMap[topic], "topic [%s]: type [%s] does not match expected [%s]"%(type, topic, topicTypeMap[topic]))
00312 
00313         # now check actual URIs
00314         node_name = self.test_node
00315         systemState = self.apiSuccess(self.master.getSystemState(self.caller_id))
00316         pubs, subs, srvs = systemState
00317         for topic, list in pubs:
00318             if topic in required_topic_pubs:
00319                 self.assert_(node_name in list, "%s not in %s"%(self.node_api, list))
00320         for topic, list in subs:
00321             if topic in required_topic_subs:
00322                 self.assert_(node_name in list, "%s not in %s"%(self.node_api, list))
00323         for service, list in srvs:
00324             #TODO: no service tests yet
00325             pass
00326 
00327 
00328 ## Performs end-to-end integration tests of a test_node. NodeIntegrationTestCase
00329 ## itself is a rospy node and thus implicitly depends on rospy functionality.        
00330 class NodeIntegrationTestCase(_NodeTestCase):
00331 
00332     def __init__(self, *args):
00333         super(NodeIntegrationTestCase, self).__init__(*args)
00334         rospy.init_node(NODE_INTEGRATION_NAME)
00335         
00336     def testString(self):
00337         pub = rospy.Publisher('test_string_in', test_rosmaster.msg.String)
00338         sub = rospy.Subscriber('test_string_in', test_rosmaster.msg.String)
00339         #TODO: publish a bunch and check sequencing + caller_id
00340         pub.unregister()
00341         sub.unregister()
00342 
00343     def testPrimitives(self):
00344         pub = rospy.Publisher('test_primitives_in', test_rosmaster.msg.String)
00345         sub = rospy.Subscriber('test_primitives_out', test_rosmaster.msg.String) 
00346         #TODO: publish a bunch and check sequencing + caller_id
00347         pub.unregister()
00348         sub.unregister()
00349 
00350     def testArrays(self):
00351         pub = rospy.Publisher('test_header_in', test_rosmaster.msg.String)
00352         sub = rospy.Subscriber('test_header_out', test_rosmaster.msg.String)
00353         #TODO: publish a bunch and check sequencing + caller_id        
00354         pub.unregister()
00355         sub.unregister()
00356 
00357     def testHeader(self):
00358         #msg.auto_header = True
00359         pub = rospy.Publisher('test_header_in', test_rosmaster.msg.String)
00360         sub = rospy.Subscriber('test_header_out', test_rosmaster.msg.String)
00361         #TODO: publish a bunch and check sequencing + caller_id        
00362         pub.unregister()
00363         sub.unregister()
00364 
00365 


test_rosmaster
Author(s): Ken Conley
autogenerated on Tue Mar 7 2017 03:45:38