test_rospy_core.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2008, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 
00034 from __future__ import print_function
00035 
00036 import os
00037 import sys
00038 import struct
00039 import unittest
00040 import time
00041 import random
00042 
00043 import rospy
00044 
00045 class TestRospyCore(unittest.TestCase):
00046     
00047     def test_parse_rosrpc_uri(self):
00048         from rospy.core import parse_rosrpc_uri
00049         valid = [('rosrpc://localhost:1234/', 'localhost', 1234), 
00050                  ('rosrpc://localhost2:1234', 'localhost2', 1234), 
00051                  ('rosrpc://third:1234/path/bar', 'third', 1234), 
00052                  ('rosrpc://foo.com:1/', 'foo.com', 1),
00053                  ('rosrpc://foo.com:1/', 'foo.com', 1)]
00054         for t, addr, port in valid:
00055             paddr, pport = rospy.core.parse_rosrpc_uri(t)
00056             self.assertEquals(addr, paddr)
00057             self.assertEquals(port, pport)
00058             # validate that it's a top-level API method
00059             self.assertEquals(rospy.core.parse_rosrpc_uri(t), rospy.parse_rosrpc_uri(t))
00060         invalid = ['rosrpc://:1234/', 'rosrpc://localhost', 'http://localhost:1234/']
00061         for i in invalid:
00062             try:
00063                 parse_rosrpc_uri(i)
00064                 self.fail("%s was an invalid rosrpc uri"%i)
00065             except: pass
00066 
00067     def test_loggers(self):
00068         # trip wire tests
00069         import rospy.core
00070         rospy.core.logdebug('debug')
00071         rospy.core.logwarn('warn')
00072         rospy.core.logout('out')
00073         rospy.core.logerr('err')
00074         rospy.core.logfatal('fatal')
00075         # test that they are exposed via top-level api
00076         import rospy
00077         rospy.logdebug('debug')
00078         rospy.logwarn('warn')
00079         rospy.logout('out')
00080         rospy.logerr('err')
00081         rospy.logfatal('fatal')
00082         
00083     def test_add_shutdown_hook(self):
00084         def handle(reason):
00085             pass
00086         # cannot verify functionality, just crashing
00087         rospy.core.add_shutdown_hook(handle)
00088         try:
00089             rospy.core.add_shutdown_hook(1)
00090             self.fail_("add_shutdown_hook is not protected against invalid args")
00091         except TypeError: pass
00092         try:
00093             rospy.core.add_shutdown_hook(1)
00094             self.fail_("add_shutdown_hook is not protected against invalid args")
00095         except TypeError: pass
00096 
00097     def test_add_preshutdown_hook(self):
00098         def handle1(reason):
00099             pass
00100         def handle2(reason):
00101             pass
00102         def handle3(reason):
00103             pass
00104         # cannot verify functionality, just coverage as well as ordering
00105         rospy.core.add_shutdown_hook(handle1)
00106         rospy.core.add_shutdown_hook(handle2)
00107         rospy.core.add_preshutdown_hook(handle3)
00108         self.assert_(handle3 in rospy.core._preshutdown_hooks)
00109         self.assert_(handle2 in rospy.core._shutdown_hooks)        
00110         self.assert_(handle1 in rospy.core._shutdown_hooks)        
00111         try:
00112             rospy.core.add_preshutdown_hook(1)
00113             self.fail_("add_preshutdown_hook is not protected against invalid args")
00114         except TypeError: pass
00115         try:
00116             rospy.core.add_preshutdown_hook(1)
00117             self.fail_("add_preshutdown_hook is not protected against invalid args")
00118         except TypeError: pass
00119 
00120     def test_get_ros_root(self):
00121         try:
00122             rospy.core.get_ros_root(env={}, required=True)
00123         except:
00124             pass
00125         self.assertEquals(None, rospy.core.get_ros_root(env={}, required=False))
00126         rr = "%s"%time.time()
00127         self.assertEquals(rr, rospy.core.get_ros_root(env={'ROS_ROOT': rr}, required=False))
00128         self.assertEquals(rr, rospy.core.get_ros_root(env={'ROS_ROOT': rr}, required=True))        
00129 
00130         self.assertEquals(os.path.normpath(os.environ['ROS_ROOT']), rospy.core.get_ros_root(required=False))
00131     def test_node_uri(self):
00132         uri = "http://localhost-%s:1234"%random.randint(1, 1000)
00133         self.assertEquals(None, rospy.core.get_node_uri())
00134         rospy.core.set_node_uri(uri)
00135         self.assertEquals(uri, rospy.core.get_node_uri())
00136         
00137     def test_initialized(self):
00138         self.failIf(rospy.core.is_initialized())
00139         rospy.core.set_initialized(True)
00140         self.assert_(rospy.core.is_initialized())
00141 
00142     def test_shutdown_hook_exception(self):
00143         rospy.core._shutdown_flag = False
00144         del rospy.core._shutdown_hooks[:]
00145         # add a shutdown hook that throws an exception,
00146         # signal_shutdown should be robust to it
00147         rospy.core.add_shutdown_hook(shutdown_hook_exception)
00148         rospy.core.signal_shutdown('test_exception')
00149         rospy.core._shutdown_flag = False        
00150         del rospy.core._shutdown_hooks[:]
00151         
00152     def test_shutdown(self):
00153         rospy.core._shutdown_flag = False
00154         del rospy.core._shutdown_hooks[:]        
00155         global called, called2
00156         called = called2 = None
00157         self.failIf(rospy.core.is_shutdown())        
00158         rospy.core.add_shutdown_hook(shutdown_hook1)
00159         reason = "reason %s"%time.time()
00160         rospy.core.signal_shutdown(reason)
00161         self.assertEquals(reason, called)
00162         self.assert_(rospy.core.is_shutdown())
00163 
00164         # verify that shutdown hook is called immediately on add if already shutdown
00165         rospy.core.add_shutdown_hook(shutdown_hook2)
00166         self.assert_(called2 is not None)
00167         rospy.core._shutdown_flag = False
00168 
00169     #TODO: move to teset_rospy_names
00170     def test_valid_name(self):
00171         # not forcing rospy to be pedantic -- yet, just try and do sanity checks
00172         tests = ['/', 'srv', '/service', '/service1', 'serv/subserv']
00173         caller_id = '/me'
00174         for t in tests:
00175             self.assert_(rospy.core.valid_name('p')(t, caller_id))
00176         failures = ['ftp://foo', '', None, 1, True, 'http:', ' spaced ', ' ']
00177         for f in failures:
00178             try:
00179                 rospy.core.valid_name('p')(f, caller_id)
00180                 self.fail(f)
00181             except rospy.core.ParameterInvalid:
00182                 pass
00183     
00184     def test_is_topic(self):
00185         # not forcing rospy to be pedantic -- yet, just try and do sanity checks
00186         caller_id = '/me'
00187         tests = [
00188             ('topic', '/node', '/topic'),
00189             ('topic', '/ns/node', '/ns/topic'),
00190             ('/topic', '/node', '/topic'),
00191             ('~topic', '/node', '/node/topic'),
00192             ('/topic1', '/node', '/topic1'),
00193             ('top/sub', '/node', '/top/sub'),
00194             ('top/sub', '/ns/node', '/ns/top/sub'),
00195             ]
00196 
00197         for t, caller_id, v in tests:
00198             self.assertEquals(v, rospy.core.is_topic('p')(t, caller_id))
00199         failures = ['/', 'ftp://foo', '', None, 1, True, 'http:', ' spaced ', ' ']
00200         for f in failures:
00201             try:
00202                 rospy.core.is_topic('p')(f, caller_id)
00203                 self.fail(f)
00204             except rospy.core.ParameterInvalid:
00205                 pass
00206             
00207     def test_configure_logging(self):
00208         # can't test actual functionality
00209         try:
00210             rospy.core.configure_logging("/")
00211             self.fail("configure_logging should not accept a the root namespace as the node_name param")
00212         except: pass
00213         rospy.core.configure_logging("/node/name")
00214 
00215     def test_xmlrpcapi(self):
00216         # have to use 'is' so we don't accidentally invoke XMLRPC
00217         self.assert_(rospy.core.xmlrpcapi(None) is None)
00218         self.assert_(rospy.core.xmlrpcapi('localhost:1234') is None)
00219         self.assert_(rospy.core.xmlrpcapi('http://') is None)
00220         api = rospy.core.xmlrpcapi('http://localhost:1234')
00221         self.assert_(api is not None)
00222         try:
00223             from xmlrpc.client import ServerProxy
00224         except ImportError:
00225             from xmlrpclib import ServerProxy
00226         self.assert_(isinstance(api, ServerProxy))
00227     
00228 called = None
00229 called2 = None
00230 def shutdown_hook1(reason):
00231     global called
00232     print("HOOK", reason)
00233     called = reason
00234 def shutdown_hook2(reason):
00235     global called2
00236     print("HOOK2", reason)
00237     called2 = reason
00238 def shutdown_hook_exception(reason):
00239     raise Exception("gotcha")


test_rospy
Author(s): Ken Conley
autogenerated on Thu Jun 6 2019 21:10:57