test_rospy_core.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2008, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 
00034 import os
00035 import sys
00036 import struct
00037 import unittest
00038 import time
00039 import random
00040 
00041 import rospy
00042 
00043 class TestRospyCore(unittest.TestCase):
00044     
00045     def test_parse_rosrpc_uri(self):
00046         from rospy.core import parse_rosrpc_uri
00047         valid = [('rosrpc://localhost:1234/', 'localhost', 1234), 
00048                  ('rosrpc://localhost2:1234', 'localhost2', 1234), 
00049                  ('rosrpc://third:1234/path/bar', 'third', 1234), 
00050                  ('rosrpc://foo.com:1/', 'foo.com', 1),
00051                  ('rosrpc://foo.com:1/', 'foo.com', 1)]
00052         for t, addr, port in valid:
00053             paddr, pport = rospy.core.parse_rosrpc_uri(t)
00054             self.assertEquals(addr, paddr)
00055             self.assertEquals(port, pport)
00056             # validate that it's a top-level API method
00057             self.assertEquals(rospy.core.parse_rosrpc_uri(t), rospy.parse_rosrpc_uri(t))
00058         invalid = ['rosrpc://:1234/', 'rosrpc://localhost', 'http://localhost:1234/']
00059         for i in invalid:
00060             try:
00061                 parse_rosrpc_uri(i)
00062                 self.fail("%s was an invalid rosrpc uri"%i)
00063             except: pass
00064         
00065     def test_add_log_handler(self):
00066         # tripwire test
00067         import rospy.core
00068         from rosgraph_msgs.msg import Log
00069         for level in [Log.DEBUG, 
00070                       Log.INFO,
00071                       Log.WARN,
00072                       Log.ERROR,
00073                       Log.FATAL]:
00074             rospy.core.add_log_handler(level, lambda x: x)
00075         import rospy.exceptions
00076         try:
00077             rospy.core.add_log_handler(lambda x: x, -1)
00078         except rospy.exceptions.ROSInternalException, e: pass
00079 
00080     def test_loggers(self):
00081         # trip wire tests
00082         import rospy.core
00083         rospy.core.logdebug('debug')
00084         rospy.core.logwarn('warn')
00085         rospy.core.logout('out')
00086         rospy.core.logerr('err')
00087         rospy.core.logfatal('fatal')
00088         # test that they are exposed via top-level api
00089         import rospy
00090         rospy.logdebug('debug')
00091         rospy.logwarn('warn')
00092         rospy.logout('out')
00093         rospy.logerr('err')
00094         rospy.logfatal('fatal')
00095         
00096     def test_add_shutdown_hook(self):
00097         def handle(reason):
00098             pass
00099         # cannot verify functionality, just crashing
00100         rospy.core.add_shutdown_hook(handle)
00101         try:
00102             rospy.core.add_shutdown_hook(1)
00103             self.fail_("add_shutdown_hook is not protected against invalid args")
00104         except TypeError: pass
00105         try:
00106             rospy.core.add_shutdown_hook(1)
00107             self.fail_("add_shutdown_hook is not protected against invalid args")
00108         except TypeError: pass
00109 
00110     def test_add_preshutdown_hook(self):
00111         def handle1(reason):
00112             pass
00113         def handle2(reason):
00114             pass
00115         def handle3(reason):
00116             pass
00117         # cannot verify functionality, just coverage as well as ordering
00118         rospy.core.add_shutdown_hook(handle1)
00119         rospy.core.add_shutdown_hook(handle2)
00120         rospy.core.add_preshutdown_hook(handle3)
00121         self.assert_(handle3 in rospy.core._preshutdown_hooks)
00122         self.assert_(handle2 in rospy.core._shutdown_hooks)        
00123         self.assert_(handle1 in rospy.core._shutdown_hooks)        
00124         try:
00125             rospy.core.add_preshutdown_hook(1)
00126             self.fail_("add_preshutdown_hook is not protected against invalid args")
00127         except TypeError: pass
00128         try:
00129             rospy.core.add_preshutdown_hook(1)
00130             self.fail_("add_preshutdown_hook is not protected against invalid args")
00131         except TypeError: pass
00132 
00133     def test_get_ros_root(self):
00134         try:
00135             rospy.core.get_ros_root(env={}, required=True)
00136         except:
00137             pass
00138         self.assertEquals(None, rospy.core.get_ros_root(env={}, required=False))
00139         rr = "%s"%time.time()
00140         self.assertEquals(rr, rospy.core.get_ros_root(env={'ROS_ROOT': rr}, required=False))
00141         self.assertEquals(rr, rospy.core.get_ros_root(env={'ROS_ROOT': rr}, required=True))        
00142 
00143         self.assertEquals(os.environ['ROS_ROOT'], rospy.core.get_ros_root(required=False))
00144     def test_node_uri(self):
00145         uri = "http://localhost-%s:1234"%random.randint(1, 1000)
00146         self.assertEquals(None, rospy.core.get_node_uri())
00147         rospy.core.set_node_uri(uri)
00148         self.assertEquals(uri, rospy.core.get_node_uri())
00149         
00150     def test_initialized(self):
00151         self.failIf(rospy.core.is_initialized())
00152         rospy.core.set_initialized(True)
00153         self.assert_(rospy.core.is_initialized())
00154 
00155     def test_shutdown_hook_exception(self):
00156         rospy.core._shutdown_flag = False
00157         del rospy.core._shutdown_hooks[:]
00158         # add a shutdown hook that throws an exception,
00159         # signal_shutdown should be robust to it
00160         rospy.core.add_shutdown_hook(shutdown_hook_exception)
00161         rospy.core.signal_shutdown('test_exception')
00162         rospy.core._shutdown_flag = False        
00163         del rospy.core._shutdown_hooks[:]
00164         
00165     def test_shutdown(self):
00166         rospy.core._shutdown_flag = False
00167         del rospy.core._shutdown_hooks[:]        
00168         global called, called2
00169         called = called2 = None
00170         self.failIf(rospy.core.is_shutdown())        
00171         rospy.core.add_shutdown_hook(shutdown_hook1)
00172         reason = "reason %s"%time.time()
00173         rospy.core.signal_shutdown(reason)
00174         self.assertEquals(reason, called)
00175         self.assert_(rospy.core.is_shutdown())
00176 
00177         # verify that shutdown hook is called immediately on add if already shutdown
00178         rospy.core.add_shutdown_hook(shutdown_hook2)
00179         self.assert_(called2 is not None)
00180         rospy.core._shutdown_flag = False
00181 
00182     #TODO: move to teset_rospy_names
00183     def test_valid_name(self):
00184         # not forcing rospy to be pedantic -- yet, just try and do sanity checks
00185         tests = ['/', 'srv', '/service', '/service1', 'serv/subserv']
00186         caller_id = '/me'
00187         for t in tests:
00188             self.assert_(rospy.core.valid_name('p')(t, caller_id))
00189         failures = ['ftp://foo', '', None, 1, True, 'http:', ' spaced ', ' ']
00190         for f in failures:
00191             try:
00192                 rospy.core.valid_name('p')(f, caller_id)
00193                 self.fail(f)
00194             except rospy.core.ParameterInvalid, e:
00195                 pass
00196     
00197     def test_is_topic(self):
00198         # not forcing rospy to be pedantic -- yet, just try and do sanity checks
00199         caller_id = '/me'
00200         tests = [
00201             ('topic', '/node', '/topic'),
00202             ('topic', '/ns/node', '/ns/topic'),
00203             ('/topic', '/node', '/topic'),
00204             ('~topic', '/node', '/node/topic'),
00205             ('/topic1', '/node', '/topic1'),
00206             ('top/sub', '/node', '/top/sub'),
00207             ('top/sub', '/ns/node', '/ns/top/sub'),
00208             ]
00209 
00210         for t, caller_id, v in tests:
00211             self.assertEquals(v, rospy.core.is_topic('p')(t, caller_id))
00212         failures = ['/', 'ftp://foo', '', None, 1, True, 'http:', ' spaced ', ' ']
00213         for f in failures:
00214             try:
00215                 rospy.core.is_topic('p')(f, caller_id)
00216                 self.fail(f)
00217             except rospy.core.ParameterInvalid, e:
00218                 pass
00219             
00220     def test_configure_logging(self):
00221         # can't test actual functionality
00222         try:
00223             rospy.core.configure_logging("/")
00224             self.fail("configure_logging should not accept a the root namespace as the node_name param")
00225         except: pass
00226         rospy.core.configure_logging("/node/name")
00227 
00228     def test_xmlrpcapi(self):
00229         # have to use 'is' so we don't accidentally invoke XMLRPC
00230         self.assert_(rospy.core.xmlrpcapi(None) is None)
00231         self.assert_(rospy.core.xmlrpcapi('localhost:1234') is None)
00232         self.assert_(rospy.core.xmlrpcapi('http://') is None)
00233         api = rospy.core.xmlrpcapi('http://localhost:1234')
00234         self.assert_(api is not None)
00235         import xmlrpclib
00236         self.assert_(isinstance(api, xmlrpclib.ServerProxy))
00237     
00238 called = None
00239 called2 = None
00240 def shutdown_hook1(reason):
00241     global called
00242     print "HOOK", reason
00243     called = reason
00244 def shutdown_hook2(reason):
00245     global called2
00246     print "HOOK2", reason
00247     called2 = reason
00248 def shutdown_hook_exception(reason):
00249     raise Exception("gotcha")


test_rospy
Author(s): Ken Conley/kwc@willowgarage.com
autogenerated on Sat Dec 28 2013 17:36:20