node.py
Go to the documentation of this file.
00001 # Software License Agreement (BSD License)
00002 #
00003 # Copyright (c) 2008, Willow Garage, Inc.
00004 # All rights reserved.
00005 #
00006 # Redistribution and use in source and binary forms, with or without
00007 # modification, are permitted provided that the following conditions
00008 # are met:
00009 #
00010 #  * Redistributions of source code must retain the above copyright
00011 #    notice, this list of conditions and the following disclaimer.
00012 #  * Redistributions in binary form must reproduce the above
00013 #    copyright notice, this list of conditions and the following
00014 #    disclaimer in the documentation and/or other materials provided
00015 #    with the distribution.
00016 #  * Neither the name of Willow Garage, Inc. nor the names of its
00017 #    contributors may be used to endorse or promote products derived
00018 #    from this software without specific prior written permission.
00019 #
00020 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00021 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00022 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00023 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00024 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00025 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00026 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00027 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00028 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00029 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00030 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00031 # POSSIBILITY OF SUCH DAMAGE.
00032 #
00033 # Revision $Id: testSlave.py 1100 2008-05-29 20:23:54Z sfkwc $
00034 
00035 import os
00036 import sys
00037 import string
00038 import time
00039 import xmlrpclib
00040 
00041 import rospy
00042 import rosgraph
00043 
00044 from test_ros.rosclient import *
00045 
00046 NODE_INTEGRATION_NAME = "node_integration_test"
00047 
00048 
00049 _required_subscriptions = 'test_string_in', 'test_primitives_in', 'test_arrays_in', 'test_header_in', 'probe_topic'
00050 
00051 # only publishers determine topic type, so we test against their declared spec
00052 _required_publications_map = {
00053     'test_string_out': 'test_ros/TestString',
00054     'test_primitives_out': 'test_ros/TestPrimitives',
00055     'test_arrays_out': 'test_ros/TestArrays',
00056     'test_header_out': 'test_ros/TestHeader',
00057     }
00058 _required_publications  = _required_publications_map.keys()
00059 
00060 _TCPROS = 'TCPROS'
00061 
00062 # NOTE: probe_topic is a unpublished topic that merely exists to test
00063 # APIs that talk about subscriptions (e.g. publisherUpdate)
00064 
00065 _name = None
00066 ## set_node_name() must be called prior to the unit test so that the test harness knows its
00067 ## ROS name.
00068 def set_node_name(name):
00069     global _name
00070     _name = name
00071 
00072 # Have to try as hard as possible to not use rospy code in testing rospy code, so this is
00073 # a reimplementation of the caller ID spec so that NodeApiTestCase knows its name
00074 ## reimplementation of caller ID spec separately from rospy
00075 def get_caller_id():
00076     if _name is None:
00077         raise Exception("set_node_name has not been called yet")
00078     ros_ns = os.environ.get(rosgraph.ROS_NAMESPACE, rosgraph.names.GLOBALNS)
00079     return rosgraph.names.ns_join(ros_ns, _name)    
00080     
00081 ## Parent of node API and integration test cases. Performs common state setup
00082 class _NodeTestCase(TestRosClient):
00083 
00084     def __init__(self, *args):
00085         super(_NodeTestCase, self).__init__(*args)
00086         
00087         self.ns = os.environ.get(rosgraph.ROS_NAMESPACE, rosgraph.names.GLOBALNS)
00088         self.caller_id = get_caller_id()
00089 
00090         # load in name of test node
00091         self.test_node = 'test_node' #default
00092         for arg in sys.argv:
00093             if arg.startswith("--node="):
00094                 self.test_node = arg[len("--node="):]
00095         # resolve
00096         self.test_node = rosgraph.names.ns_join(self.ns, self.test_node)
00097                 
00098         
00099     def setUp(self):
00100         super(_NodeTestCase, self).setUp()
00101         # retrieve handle on node
00102         # give ourselves five seconds for node to appear
00103         import time
00104         timeout_t = 5.0 + time.time()
00105         self.node_api = None
00106         while time.time() < timeout_t and not self.node_api:
00107             code, msg, node_api = self.master.lookupNode(self.caller_id, self.test_node)
00108             if code == 1:
00109                 self.node_api = node_api
00110         if not self.node_api:
00111             self.fail("master did not return XML-RPC API for [%s, %s]"%(self.caller_id, self.test_node))
00112         print "[%s] API  = %s"%(self.test_node, self.node_api)
00113         self.assert_(self.node_api.startswith('http'))
00114         self.node = xmlrpclib.ServerProxy(self.node_api)
00115 
00116     ## validates a URI as being http(s)
00117     def _checkUri(self, uri):
00118         import urlparse
00119         parsed = urlparse.urlparse(uri)
00120         self.assert_(parsed[0] in ['http', 'https'], 'protocol [%s] in [%s] invalid'%(parsed[0], uri))
00121         self.assert_(parsed[1], 'host missing [%s]'%uri)
00122         if not sys.version.startswith('2.4'): #check not available on py24
00123             self.assert_(parsed.port, 'port missing/invalid [%s]'%uri)        
00124 
00125     ## dynamically create the expected topic->type map based on the current name resolution context
00126     def _createTopicTypeMap(self):
00127         new_map = {}
00128         for t in _required_publications_map.iterkeys():
00129             new_map[rospy.resolve_name(t)] = _required_publications_map[t]
00130         return new_map
00131     
00132 ## Expects a single test node to be running with name 'test_node' and subscribed to 'test_string'
00133 class NodeApiTestCase(_NodeTestCase):
00134 
00135     ## validate node.getPid(caller_id)        
00136     def testGetPid(self):
00137         # test with bad arity
00138         self.apiError(self.node.getPid())
00139         # test success        
00140         pid = self.apiSuccess(self.node.getPid(self.caller_id))
00141         self.assert_(pid > 0)
00142 
00143     ## subroutine for testGetSubscriptions/testGetPublications
00144     def _checkTopics(self, required, actual):
00145         actual = [t for t, _ in actual]
00146         missing = set(required) - set(actual)
00147         self.failIf(len(missing), 'missing required topics: %s'%(','.join(missing)))
00148 
00149     ## validate node.getPublications(caller_id)
00150     def testGetPublications(self):
00151         # test with bad arity
00152         self.apiError(self.node.getPublications())
00153         self.apiError(self.node.getPublications(self.caller_id, 'something extra'))
00154         
00155         # test success
00156         self._checkTopics([rospy.resolve_name(t) for t in _required_publications],
00157                           self.apiSuccess(self.node.getPublications(self.caller_id)))
00158     ## validate node.getSubscriptions(caller_id)
00159     def testGetSubscriptions(self):
00160         # test with bad arity
00161         self.apiError(self.node.getSubscriptions())
00162         self.apiError(self.node.getSubscriptions(self.caller_id, 'something extra'))        
00163         # test success
00164         self._checkTopics([rospy.resolve_name(t) for t in _required_subscriptions],
00165                           self.apiSuccess(self.node.getSubscriptions(self.caller_id)))
00166 
00167     ## validate node.paramUpdate(caller_id, key, value)
00168     def testParamUpdate(self):
00169         node = self.node
00170         good_key = rosgraph.names.ns_join(self.ns, 'good_key')
00171         bad_key = rosgraph.names.ns_join(self.ns, 'bad_key')
00172         
00173         # test bad key
00174         self.apiError(node.paramUpdate(self.caller_id, '', 'bad'))
00175         self.apiError(node.paramUpdate(self.caller_id, 'no_namespace', 'bad'))
00176         # test with bad arity
00177         self.apiError(node.paramUpdate(self.caller_id, bad_key))     
00178         self.apiError(node.paramUpdate(self.caller_id)) 
00179 
00180         # node is not subscribed to good_key (yet)
00181         self.apiError(node.paramUpdate(self.caller_id, good_key, 'good_value'))
00182 
00183         # we can't actually test success cases without forcing node to subscribe
00184         #self.apiSuccess(node.paramUpdate(self.caller_id, good_key, 1))
00185         #self.apiSuccess(node.paramUpdate(self.caller_id, good_key, True))
00186         #self.apiSuccess(node.paramUpdate(self.caller_id, good_key, 10.0))
00187 
00188     ## validate node.getUri(caller_id)        
00189     def testGetUri(self):
00190         # test bad arity
00191         self.apiError(self.node.getUri(self.caller_id, 'bad'))
00192         self.apiError(self.node.getUri())
00193         # test success
00194         self._checkUri(self.apiSuccess(self.node.getUri(self.caller_id)))
00195 
00196     ## validate node.getName(caller_id)        
00197     def testGetName(self):
00198         # test bad arity
00199         self.apiError(self.node.getName(self.caller_id, 'bad'))
00200         self.apiError(self.node.getName())
00201         # test success
00202         val = self.apiSuccess(self.node.getName(self.caller_id))
00203         self.assert_(len(val), "empty name")
00204 
00205     ## validate node.getMasterUri(caller_id)                
00206     def testGetMasterUri(self):
00207         # test bad arity
00208         self.apiError(self.node.getMasterUri(self.caller_id, 'bad'))
00209         self.apiError(self.node.getMasterUri())
00210         # test success
00211         uri = self.apiSuccess(self.node.getMasterUri(self.caller_id))
00212         self._checkUri(uri)
00213         self.assertEquals(rosgraph.get_master_uri(), uri)
00214 
00215     ## validate node.publisherUpdate(caller_id, topic, uris) 
00216     def testPublisherUpdate(self):
00217         node = self.node
00218         probe_topic = rosgraph.names.ns_join(self.ns, 'probe_topic')
00219         fake_topic = rosgraph.names.ns_join(self.ns, 'fake_topic')        
00220         # test bad arity
00221         self.apiError(node.getBusStats(self.caller_id, 'bad'))
00222         self.apiError(node.getBusStats())
00223         # test bad args
00224         self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', 'bad'))
00225         self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', 2))
00226         self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', False))                
00227         self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', ['bad']))
00228         self.apiError(node.publisherUpdate())
00229         
00230         # test success
00231         # still success even if not actually interested in topic
00232         self.apiSuccess(node.publisherUpdate(self.caller_id, fake_topic,
00233                                              ['http://localhost:1234', 'http://localhost:5678']))
00234         self.apiSuccess(node.publisherUpdate(self.caller_id, fake_topic,
00235                                              []))
00236         # try it with it the /probe_topic, which will exercise some error branches in the client
00237         self.apiSuccess(node.publisherUpdate(self.caller_id, probe_topic,
00238                                              ['http://unroutablefakeservice:1234']))
00239         # give it some time to make sure it's attempted contact
00240         time.sleep(1.0)
00241         # check that it's still there
00242         self.apiSuccess(node.publisherUpdate(self.caller_id, probe_topic,
00243                                                   []))
00244 
00245     def _checkTCPROS(self, protocol_params):
00246         self.assert_(protocol_params, "no protocol params returned")
00247         self.assert_(type(protocol_params) == list, "protocol params must be a list: %s"%protocol_params)
00248         self.assertEquals(3, len(protocol_params), "TCPROS params should have length 3: %s"%protocol_params)
00249         self.assertEquals(protocol_params[0], _TCPROS)
00250         # expect ['TCPROS', 1.2.3.4, 1234]
00251         self.assertEquals(protocol_params[0], _TCPROS)            
00252         
00253     def testRequestTopic(self):
00254         node = self.node
00255         protocols = [[_TCPROS]]
00256         probe_topic = rosgraph.names.ns_join(self.ns, 'probe_topic')
00257         fake_topic = rosgraph.names.ns_join(self.ns, 'fake_topic')
00258         
00259         # test bad arity
00260         self.apiError(node.requestTopic(self.caller_id, probe_topic, protocols, 'extra stuff'))
00261         self.apiError(node.requestTopic(self.caller_id, probe_topic))
00262         self.apiError(node.requestTopic(self.caller_id))
00263         self.apiError(node.requestTopic())
00264         # test bad args
00265         self.apiError(node.requestTopic(self.caller_id, 1, protocols))
00266         self.apiError(node.requestTopic(self.caller_id, '', protocols))
00267         self.apiError(node.requestTopic(self.caller_id, fake_topic, protocols))
00268         self.apiError(node.requestTopic(self.caller_id, probe_topic, 'fake-protocols')) 
00269         
00270         topics = [rosgraph.names.ns_join(self.ns, t) for t in _required_publications]
00271         # currently only support TCPROS as we require all clients to support this
00272         protocols = [[_TCPROS]]
00273         for topic in topics:
00274             self._checkTCPROS(self.apiSuccess(node.requestTopic(self.caller_id, topic, protocols)))
00275         protocols = [['FakeTransport', 1234, 5678], [_TCPROS], ['AnotherFakeTransport']]
00276         # try each one more time, this time with more protocol choices
00277         for topic in topics:
00278             self._checkTCPROS(self.apiSuccess(node.requestTopic(self.caller_id, topic, protocols)))
00279             
00280     def testGetBusInfo(self):
00281         # test bad arity
00282         self.apiError(self.node.getBusInfo(self.caller_id, 'bad'))
00283         self.apiError(self.node.getBusInfo())
00284         #TODO: finish
00285         
00286     def testGetBusStats(self):
00287         # test bad arity
00288         self.apiError(self.node.getBusStats(self.caller_id, 'bad'))
00289         self.apiError(self.node.getBusStats())
00290         #TODO: finish
00291 
00292     ## test the state of the master based on expected node registration
00293     def testRegistrations(self):
00294         # setUp() ensures the node has registered with the master
00295         topics = self.apiSuccess(self.master.getPublishedTopics(self.caller_id, ''))
00296         topic_names = [t for t, type in topics]
00297         required_topic_pubs = [rospy.resolve_name(t) for t in _required_publications]
00298         required_topic_subs = [rospy.resolve_name(t) for t in _required_subscriptions]        
00299         self._checkTopics(required_topic_pubs, topics)
00300         
00301         # now check types
00302         topicTypeMap = self._createTopicTypeMap()
00303         for topic, type in topics:
00304             if topic in topicTypeMap:
00305                 self.assertEquals(type, topicTypeMap[topic], "topic [%s]: type [%s] does not match expected [%s]"%(type, topic, topicTypeMap[topic]))
00306 
00307         # now check actual URIs
00308         node_name = self.test_node
00309         systemState = self.apiSuccess(self.master.getSystemState(self.caller_id))
00310         pubs, subs, srvs = systemState
00311         for topic, list in pubs:
00312             if topic in required_topic_pubs:
00313                 self.assert_(node_name in list, "%s not in %s"%(self.node_api, list))
00314         for topic, list in subs:
00315             if topic in required_topic_subs:
00316                 self.assert_(node_name in list, "%s not in %s"%(self.node_api, list))
00317         for service, list in srvs:
00318             #TODO: no service tests yet
00319             pass
00320 
00321 
00322 ## Performs end-to-end integration tests of a test_node. NodeIntegrationTestCase
00323 ## itself is a rospy node and thus implicitly depends on rospy functionality.        
00324 class NodeIntegrationTestCase(_NodeTestCase):
00325 
00326     def __init__(self, *args):
00327         super(NodeIntegrationTestCase, self).__init__(*args)
00328         rospy.init_node(NODE_INTEGRATION_NAME)
00329         
00330     def testString(self):
00331         pub = rospy.Publisher('test_string_in', std_msgs.msg.String)
00332         sub = rospy.Subscriber('test_string_in', std_msgs.msg.String)
00333         #TODO: publish a bunch and check sequencing + caller_id
00334         pub.unregister()
00335         sub.unregister()
00336 
00337     def testPrimitives(self):
00338         pub = rospy.Publisher('test_primitives_in', std_msgs.msg.String)
00339         sub = rospy.Subscriber('test_primitives_out', std_msgs.msg.String) 
00340         #TODO: publish a bunch and check sequencing + caller_id
00341         pub.unregister()
00342         sub.unregister()
00343 
00344     def testArrays(self):
00345         pub = rospy.Publisher('test_header_in', std_msgs.msg.String)
00346         sub = rospy.Subscriber('test_header_out', std_msgs.msg.String)
00347         #TODO: publish a bunch and check sequencing + caller_id        
00348         pub.unregister()
00349         sub.unregister()
00350 
00351     def testHeader(self):
00352         #msg.auto_header = True
00353         pub = rospy.Publisher('test_header_in', std_msgs.msg.String)
00354         sub = rospy.Subscriber('test_header_out', std_msgs.msg.String)
00355         #TODO: publish a bunch and check sequencing + caller_id        
00356         pub.unregister()
00357         sub.unregister()
00358 
00359 


test_ros
Author(s): Ken Conley/kwc@willowgarage.com
autogenerated on Sat Dec 28 2013 17:35:49