test_rospy_core.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2008, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 
00034 import os
00035 import sys
00036 import struct
00037 import unittest
00038 import time
00039 import random
00040 
00041 import rospy
00042 
00043 class TestRospyCore(unittest.TestCase):
00044     
00045     def test_parse_rosrpc_uri(self):
00046         from rospy.core import parse_rosrpc_uri
00047         valid = [('rosrpc://localhost:1234/', 'localhost', 1234), 
00048                  ('rosrpc://localhost2:1234', 'localhost2', 1234), 
00049                  ('rosrpc://third:1234/path/bar', 'third', 1234), 
00050                  ('rosrpc://foo.com:1/', 'foo.com', 1),
00051                  ('rosrpc://foo.com:1/', 'foo.com', 1)]
00052         for t, addr, port in valid:
00053             paddr, pport = rospy.core.parse_rosrpc_uri(t)
00054             self.assertEquals(addr, paddr)
00055             self.assertEquals(port, pport)
00056             # validate that it's a top-level API method
00057             self.assertEquals(rospy.core.parse_rosrpc_uri(t), rospy.parse_rosrpc_uri(t))
00058         invalid = ['rosrpc://:1234/', 'rosrpc://localhost', 'http://localhost:1234/']
00059         for i in invalid:
00060             try:
00061                 parse_rosrpc_uri(i)
00062                 self.fail("%s was an invalid rosrpc uri"%i)
00063             except: pass
00064 
00065     def test_loggers(self):
00066         # trip wire tests
00067         import rospy.core
00068         rospy.core.logdebug('debug')
00069         rospy.core.logwarn('warn')
00070         rospy.core.logout('out')
00071         rospy.core.logerr('err')
00072         rospy.core.logfatal('fatal')
00073         # test that they are exposed via top-level api
00074         import rospy
00075         rospy.logdebug('debug')
00076         rospy.logwarn('warn')
00077         rospy.logout('out')
00078         rospy.logerr('err')
00079         rospy.logfatal('fatal')
00080         
00081     def test_add_shutdown_hook(self):
00082         def handle(reason):
00083             pass
00084         # cannot verify functionality, just crashing
00085         rospy.core.add_shutdown_hook(handle)
00086         try:
00087             rospy.core.add_shutdown_hook(1)
00088             self.fail_("add_shutdown_hook is not protected against invalid args")
00089         except TypeError: pass
00090         try:
00091             rospy.core.add_shutdown_hook(1)
00092             self.fail_("add_shutdown_hook is not protected against invalid args")
00093         except TypeError: pass
00094 
00095     def test_add_preshutdown_hook(self):
00096         def handle1(reason):
00097             pass
00098         def handle2(reason):
00099             pass
00100         def handle3(reason):
00101             pass
00102         # cannot verify functionality, just coverage as well as ordering
00103         rospy.core.add_shutdown_hook(handle1)
00104         rospy.core.add_shutdown_hook(handle2)
00105         rospy.core.add_preshutdown_hook(handle3)
00106         self.assert_(handle3 in rospy.core._preshutdown_hooks)
00107         self.assert_(handle2 in rospy.core._shutdown_hooks)        
00108         self.assert_(handle1 in rospy.core._shutdown_hooks)        
00109         try:
00110             rospy.core.add_preshutdown_hook(1)
00111             self.fail_("add_preshutdown_hook is not protected against invalid args")
00112         except TypeError: pass
00113         try:
00114             rospy.core.add_preshutdown_hook(1)
00115             self.fail_("add_preshutdown_hook is not protected against invalid args")
00116         except TypeError: pass
00117 
00118     def test_get_ros_root(self):
00119         try:
00120             rospy.core.get_ros_root(env={}, required=True)
00121         except:
00122             pass
00123         self.assertEquals(None, rospy.core.get_ros_root(env={}, required=False))
00124         rr = "%s"%time.time()
00125         self.assertEquals(rr, rospy.core.get_ros_root(env={'ROS_ROOT': rr}, required=False))
00126         self.assertEquals(rr, rospy.core.get_ros_root(env={'ROS_ROOT': rr}, required=True))        
00127 
00128         self.assertEquals(os.path.normpath(os.environ['ROS_ROOT']), rospy.core.get_ros_root(required=False))
00129     def test_node_uri(self):
00130         uri = "http://localhost-%s:1234"%random.randint(1, 1000)
00131         self.assertEquals(None, rospy.core.get_node_uri())
00132         rospy.core.set_node_uri(uri)
00133         self.assertEquals(uri, rospy.core.get_node_uri())
00134         
00135     def test_initialized(self):
00136         self.failIf(rospy.core.is_initialized())
00137         rospy.core.set_initialized(True)
00138         self.assert_(rospy.core.is_initialized())
00139 
00140     def test_shutdown_hook_exception(self):
00141         rospy.core._shutdown_flag = False
00142         del rospy.core._shutdown_hooks[:]
00143         # add a shutdown hook that throws an exception,
00144         # signal_shutdown should be robust to it
00145         rospy.core.add_shutdown_hook(shutdown_hook_exception)
00146         rospy.core.signal_shutdown('test_exception')
00147         rospy.core._shutdown_flag = False        
00148         del rospy.core._shutdown_hooks[:]
00149         
00150     def test_shutdown(self):
00151         rospy.core._shutdown_flag = False
00152         del rospy.core._shutdown_hooks[:]        
00153         global called, called2
00154         called = called2 = None
00155         self.failIf(rospy.core.is_shutdown())        
00156         rospy.core.add_shutdown_hook(shutdown_hook1)
00157         reason = "reason %s"%time.time()
00158         rospy.core.signal_shutdown(reason)
00159         self.assertEquals(reason, called)
00160         self.assert_(rospy.core.is_shutdown())
00161 
00162         # verify that shutdown hook is called immediately on add if already shutdown
00163         rospy.core.add_shutdown_hook(shutdown_hook2)
00164         self.assert_(called2 is not None)
00165         rospy.core._shutdown_flag = False
00166 
00167     #TODO: move to teset_rospy_names
00168     def test_valid_name(self):
00169         # not forcing rospy to be pedantic -- yet, just try and do sanity checks
00170         tests = ['/', 'srv', '/service', '/service1', 'serv/subserv']
00171         caller_id = '/me'
00172         for t in tests:
00173             self.assert_(rospy.core.valid_name('p')(t, caller_id))
00174         failures = ['ftp://foo', '', None, 1, True, 'http:', ' spaced ', ' ']
00175         for f in failures:
00176             try:
00177                 rospy.core.valid_name('p')(f, caller_id)
00178                 self.fail(f)
00179             except rospy.core.ParameterInvalid, e:
00180                 pass
00181     
00182     def test_is_topic(self):
00183         # not forcing rospy to be pedantic -- yet, just try and do sanity checks
00184         caller_id = '/me'
00185         tests = [
00186             ('topic', '/node', '/topic'),
00187             ('topic', '/ns/node', '/ns/topic'),
00188             ('/topic', '/node', '/topic'),
00189             ('~topic', '/node', '/node/topic'),
00190             ('/topic1', '/node', '/topic1'),
00191             ('top/sub', '/node', '/top/sub'),
00192             ('top/sub', '/ns/node', '/ns/top/sub'),
00193             ]
00194 
00195         for t, caller_id, v in tests:
00196             self.assertEquals(v, rospy.core.is_topic('p')(t, caller_id))
00197         failures = ['/', 'ftp://foo', '', None, 1, True, 'http:', ' spaced ', ' ']
00198         for f in failures:
00199             try:
00200                 rospy.core.is_topic('p')(f, caller_id)
00201                 self.fail(f)
00202             except rospy.core.ParameterInvalid, e:
00203                 pass
00204             
00205     def test_configure_logging(self):
00206         # can't test actual functionality
00207         try:
00208             rospy.core.configure_logging("/")
00209             self.fail("configure_logging should not accept a the root namespace as the node_name param")
00210         except: pass
00211         rospy.core.configure_logging("/node/name")
00212 
00213     def test_xmlrpcapi(self):
00214         # have to use 'is' so we don't accidentally invoke XMLRPC
00215         self.assert_(rospy.core.xmlrpcapi(None) is None)
00216         self.assert_(rospy.core.xmlrpcapi('localhost:1234') is None)
00217         self.assert_(rospy.core.xmlrpcapi('http://') is None)
00218         api = rospy.core.xmlrpcapi('http://localhost:1234')
00219         self.assert_(api is not None)
00220         import xmlrpclib
00221         self.assert_(isinstance(api, xmlrpclib.ServerProxy))
00222     
00223 called = None
00224 called2 = None
00225 def shutdown_hook1(reason):
00226     global called
00227     print "HOOK", reason
00228     called = reason
00229 def shutdown_hook2(reason):
00230     global called2
00231     print "HOOK2", reason
00232     called2 = reason
00233 def shutdown_hook_exception(reason):
00234     raise Exception("gotcha")


test_rospy
Author(s): Ken Conley
autogenerated on Mon Oct 6 2014 11:47:19