$search
00001 #!/usr/bin/env python 00002 # Software License Agreement (BSD License) 00003 # 00004 # Copyright (c) 2008, Willow Garage, Inc. 00005 # All rights reserved. 00006 # 00007 # Redistribution and use in source and binary forms, with or without 00008 # modification, are permitted provided that the following conditions 00009 # are met: 00010 # 00011 # * Redistributions of source code must retain the above copyright 00012 # notice, this list of conditions and the following disclaimer. 00013 # * Redistributions in binary form must reproduce the above 00014 # copyright notice, this list of conditions and the following 00015 # disclaimer in the documentation and/or other materials provided 00016 # with the distribution. 00017 # * Neither the name of Willow Garage, Inc. nor the names of its 00018 # contributors may be used to endorse or promote products derived 00019 # from this software without specific prior written permission. 00020 # 00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 # POSSIBILITY OF SUCH DAMAGE. 00033 # 00034 # Revision $Id$ 00035 00036 import roslib; roslib.load_manifest('test_rospy') 00037 00038 import os 00039 import sys 00040 import struct 00041 import unittest 00042 import time 00043 import random 00044 00045 import rostest 00046 00047 import rospy 00048 00049 00050 class TestRospyCore(unittest.TestCase): 00051 00052 def test_parse_rosrpc_uri(self): 00053 from rospy.core import parse_rosrpc_uri 00054 valid = [('rosrpc://localhost:1234/', 'localhost', 1234), 00055 ('rosrpc://localhost2:1234', 'localhost2', 1234), 00056 ('rosrpc://third:1234/path/bar', 'third', 1234), 00057 ('rosrpc://foo.com:1/', 'foo.com', 1), 00058 ('rosrpc://foo.com:1/', 'foo.com', 1)] 00059 for t, addr, port in valid: 00060 paddr, pport = rospy.core.parse_rosrpc_uri(t) 00061 self.assertEquals(addr, paddr) 00062 self.assertEquals(port, pport) 00063 # validate that it's a top-level API method 00064 self.assertEquals(rospy.core.parse_rosrpc_uri(t), rospy.parse_rosrpc_uri(t)) 00065 invalid = ['rosrpc://:1234/', 'rosrpc://localhost', 'http://localhost:1234/'] 00066 for i in invalid: 00067 try: 00068 parse_rosrpc_uri(i) 00069 self.fail("%s was an invalid rosrpc uri"%i) 00070 except: pass 00071 00072 def test_add_log_handler(self): 00073 # tripwire test 00074 import rospy.core 00075 from rosgraph_msgs.msg import Log 00076 for level in [Log.DEBUG, 00077 Log.INFO, 00078 Log.WARN, 00079 Log.ERROR, 00080 Log.FATAL]: 00081 rospy.core.add_log_handler(level, lambda x: x) 00082 import rospy.exceptions 00083 try: 00084 rospy.core.add_log_handler(lambda x: x, -1) 00085 except rospy.exceptions.ROSInternalException, e: pass 00086 00087 def test_loggers(self): 00088 # trip wire tests 00089 import rospy.core 00090 rospy.core.logdebug('debug') 00091 rospy.core.logwarn('warn') 00092 rospy.core.logout('out') 00093 rospy.core.logerr('err') 00094 rospy.core.logfatal('fatal') 00095 # test that they are exposed via top-level api 00096 import rospy 00097 rospy.logdebug('debug') 00098 rospy.logwarn('warn') 00099 rospy.logout('out') 00100 rospy.logerr('err') 00101 rospy.logfatal('fatal') 00102 00103 def test_add_shutdown_hook(self): 00104 def handle(reason): 00105 pass 00106 # cannot verify functionality, just crashing 00107 rospy.core.add_shutdown_hook(handle) 00108 try: 00109 rospy.core.add_shutdown_hook(1) 00110 self.fail_("add_shutdown_hook is not protected against invalid args") 00111 except TypeError: pass 00112 try: 00113 rospy.core.add_shutdown_hook(1) 00114 self.fail_("add_shutdown_hook is not protected against invalid args") 00115 except TypeError: pass 00116 00117 def test_add_preshutdown_hook(self): 00118 def handle1(reason): 00119 pass 00120 def handle2(reason): 00121 pass 00122 def handle3(reason): 00123 pass 00124 # cannot verify functionality, just coverage as well as ordering 00125 rospy.core.add_shutdown_hook(handle1) 00126 rospy.core.add_shutdown_hook(handle2) 00127 rospy.core.add_preshutdown_hook(handle3) 00128 self.assert_(handle3 in rospy.core._preshutdown_hooks) 00129 self.assert_(handle2 in rospy.core._shutdown_hooks) 00130 self.assert_(handle1 in rospy.core._shutdown_hooks) 00131 try: 00132 rospy.core.add_preshutdown_hook(1) 00133 self.fail_("add_preshutdown_hook is not protected against invalid args") 00134 except TypeError: pass 00135 try: 00136 rospy.core.add_preshutdown_hook(1) 00137 self.fail_("add_preshutdown_hook is not protected against invalid args") 00138 except TypeError: pass 00139 00140 def test_get_ros_root(self): 00141 try: 00142 rospy.core.get_ros_root(env={}, required=True) 00143 except: 00144 pass 00145 self.assertEquals(None, rospy.core.get_ros_root(env={}, required=False)) 00146 rr = "%s"%time.time() 00147 self.assertEquals(rr, rospy.core.get_ros_root(env={'ROS_ROOT': rr}, required=False)) 00148 self.assertEquals(rr, rospy.core.get_ros_root(env={'ROS_ROOT': rr}, required=True)) 00149 00150 self.assertEquals(os.environ['ROS_ROOT'], rospy.core.get_ros_root(required=False)) 00151 def test_node_uri(self): 00152 uri = "http://localhost-%s:1234"%random.randint(1, 1000) 00153 self.assertEquals(None, rospy.core.get_node_uri()) 00154 rospy.core.set_node_uri(uri) 00155 self.assertEquals(uri, rospy.core.get_node_uri()) 00156 00157 def test_initialized(self): 00158 self.failIf(rospy.core.is_initialized()) 00159 rospy.core.set_initialized(True) 00160 self.assert_(rospy.core.is_initialized()) 00161 00162 def test_shutdown_hook_exception(self): 00163 rospy.core._shutdown_flag = False 00164 del rospy.core._shutdown_hooks[:] 00165 # add a shutdown hook that throws an exception, 00166 # signal_shutdown should be robust to it 00167 rospy.core.add_shutdown_hook(test_shutdown_hook_exception) 00168 rospy.core.signal_shutdown('test_exception') 00169 rospy.core._shutdown_flag = False 00170 del rospy.core._shutdown_hooks[:] 00171 00172 def test_shutdown(self): 00173 rospy.core._shutdown_flag = False 00174 del rospy.core._shutdown_hooks[:] 00175 global called, called2 00176 called = called2 = None 00177 self.failIf(rospy.core.is_shutdown()) 00178 rospy.core.add_shutdown_hook(test_shutdown_hook) 00179 reason = "reason %s"%time.time() 00180 rospy.core.signal_shutdown(reason) 00181 self.assertEquals(reason, called) 00182 self.assert_(rospy.core.is_shutdown()) 00183 00184 # verify that shutdown hook is called immediately on add if already shutdown 00185 rospy.core.add_shutdown_hook(test_shutdown_hook2) 00186 self.assert_(called2 is not None) 00187 rospy.core._shutdown_flag = False 00188 00189 #TODO: move to teset_rospy_names 00190 def test_valid_name(self): 00191 # not forcing rospy to be pedantic -- yet, just try and do sanity checks 00192 tests = ['/', 'srv', '/service', '/service1', 'serv/subserv'] 00193 caller_id = '/me' 00194 for t in tests: 00195 self.assert_(rospy.core.valid_name('p')(t, caller_id)) 00196 failures = ['ftp://foo', '', None, 1, True, 'http:', ' spaced ', ' '] 00197 for f in failures: 00198 try: 00199 rospy.core.valid_name('p')(f, caller_id) 00200 self.fail(f) 00201 except rospy.core.ParameterInvalid, e: 00202 pass 00203 00204 def test_is_topic(self): 00205 # not forcing rospy to be pedantic -- yet, just try and do sanity checks 00206 caller_id = '/me' 00207 tests = [ 00208 ('topic', '/node', '/topic'), 00209 ('topic', '/ns/node', '/ns/topic'), 00210 ('/topic', '/node', '/topic'), 00211 ('~topic', '/node', '/node/topic'), 00212 ('/topic1', '/node', '/topic1'), 00213 ('top/sub', '/node', '/top/sub'), 00214 ('top/sub', '/ns/node', '/ns/top/sub'), 00215 ] 00216 00217 for t, caller_id, v in tests: 00218 self.assertEquals(v, rospy.core.is_topic('p')(t, caller_id)) 00219 failures = ['/', 'ftp://foo', '', None, 1, True, 'http:', ' spaced ', ' '] 00220 for f in failures: 00221 try: 00222 rospy.core.is_topic('p')(f, caller_id) 00223 self.fail(f) 00224 except rospy.core.ParameterInvalid, e: 00225 pass 00226 00227 def test_configure_logging(self): 00228 # can't test actual functionality 00229 try: 00230 rospy.core.configure_logging("/") 00231 self.fail("configure_logging should not accept a the root namespace as the node_name param") 00232 except: pass 00233 rospy.core.configure_logging("/node/name") 00234 00235 def test_xmlrpcapi(self): 00236 # have to use 'is' so we don't accidentally invoke XMLRPC 00237 self.assert_(rospy.core.xmlrpcapi(None) is None) 00238 self.assert_(rospy.core.xmlrpcapi('localhost:1234') is None) 00239 self.assert_(rospy.core.xmlrpcapi('http://') is None) 00240 api = rospy.core.xmlrpcapi('http://localhost:1234') 00241 self.assert_(api is not None) 00242 import xmlrpclib 00243 self.assert_(isinstance(api, xmlrpclib.ServerProxy)) 00244 00245 called = None 00246 called2 = None 00247 def test_shutdown_hook(reason): 00248 global called 00249 print "HOOK", reason 00250 called = reason 00251 def test_shutdown_hook2(reason): 00252 global called2 00253 print "HOOK2", reason 00254 called2 = reason 00255 def test_shutdown_hook_exception(reason): 00256 raise Exception("gotcha") 00257 00258 if __name__ == '__main__': 00259 rostest.unitrun('test_rospy', sys.argv[0], TestRospyCore, coverage_packages=['rospy.core'])