00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 from __future__ import print_function
00035
00036 import os
00037 import sys
00038 import struct
00039 import unittest
00040 import time
00041 import random
00042
00043 import rospy
00044
00045 class TestRospyCore(unittest.TestCase):
00046
00047 def test_parse_rosrpc_uri(self):
00048 from rospy.core import parse_rosrpc_uri
00049 valid = [('rosrpc://localhost:1234/', 'localhost', 1234),
00050 ('rosrpc://localhost2:1234', 'localhost2', 1234),
00051 ('rosrpc://third:1234/path/bar', 'third', 1234),
00052 ('rosrpc://foo.com:1/', 'foo.com', 1),
00053 ('rosrpc://foo.com:1/', 'foo.com', 1)]
00054 for t, addr, port in valid:
00055 paddr, pport = rospy.core.parse_rosrpc_uri(t)
00056 self.assertEquals(addr, paddr)
00057 self.assertEquals(port, pport)
00058
00059 self.assertEquals(rospy.core.parse_rosrpc_uri(t), rospy.parse_rosrpc_uri(t))
00060 invalid = ['rosrpc://:1234/', 'rosrpc://localhost', 'http://localhost:1234/']
00061 for i in invalid:
00062 try:
00063 parse_rosrpc_uri(i)
00064 self.fail("%s was an invalid rosrpc uri"%i)
00065 except: pass
00066
00067 def test_loggers(self):
00068
00069 import rospy.core
00070 rospy.core.logdebug('debug')
00071 rospy.core.logwarn('warn')
00072 rospy.core.logout('out')
00073 rospy.core.logerr('err')
00074 rospy.core.logfatal('fatal')
00075
00076 import rospy
00077 rospy.logdebug('debug')
00078 rospy.logwarn('warn')
00079 rospy.logout('out')
00080 rospy.logerr('err')
00081 rospy.logfatal('fatal')
00082
00083 def test_add_shutdown_hook(self):
00084 def handle(reason):
00085 pass
00086
00087 rospy.core.add_shutdown_hook(handle)
00088 try:
00089 rospy.core.add_shutdown_hook(1)
00090 self.fail_("add_shutdown_hook is not protected against invalid args")
00091 except TypeError: pass
00092 try:
00093 rospy.core.add_shutdown_hook(1)
00094 self.fail_("add_shutdown_hook is not protected against invalid args")
00095 except TypeError: pass
00096
00097 def test_add_preshutdown_hook(self):
00098 def handle1(reason):
00099 pass
00100 def handle2(reason):
00101 pass
00102 def handle3(reason):
00103 pass
00104
00105 rospy.core.add_shutdown_hook(handle1)
00106 rospy.core.add_shutdown_hook(handle2)
00107 rospy.core.add_preshutdown_hook(handle3)
00108 self.assert_(handle3 in rospy.core._preshutdown_hooks)
00109 self.assert_(handle2 in rospy.core._shutdown_hooks)
00110 self.assert_(handle1 in rospy.core._shutdown_hooks)
00111 try:
00112 rospy.core.add_preshutdown_hook(1)
00113 self.fail_("add_preshutdown_hook is not protected against invalid args")
00114 except TypeError: pass
00115 try:
00116 rospy.core.add_preshutdown_hook(1)
00117 self.fail_("add_preshutdown_hook is not protected against invalid args")
00118 except TypeError: pass
00119
00120 def test_get_ros_root(self):
00121 try:
00122 rospy.core.get_ros_root(env={}, required=True)
00123 except:
00124 pass
00125 self.assertEquals(None, rospy.core.get_ros_root(env={}, required=False))
00126 rr = "%s"%time.time()
00127 self.assertEquals(rr, rospy.core.get_ros_root(env={'ROS_ROOT': rr}, required=False))
00128 self.assertEquals(rr, rospy.core.get_ros_root(env={'ROS_ROOT': rr}, required=True))
00129
00130 self.assertEquals(os.path.normpath(os.environ['ROS_ROOT']), rospy.core.get_ros_root(required=False))
00131 def test_node_uri(self):
00132 uri = "http://localhost-%s:1234"%random.randint(1, 1000)
00133 self.assertEquals(None, rospy.core.get_node_uri())
00134 rospy.core.set_node_uri(uri)
00135 self.assertEquals(uri, rospy.core.get_node_uri())
00136
00137 def test_initialized(self):
00138 self.failIf(rospy.core.is_initialized())
00139 rospy.core.set_initialized(True)
00140 self.assert_(rospy.core.is_initialized())
00141
00142 def test_shutdown_hook_exception(self):
00143 rospy.core._shutdown_flag = False
00144 del rospy.core._shutdown_hooks[:]
00145
00146
00147 rospy.core.add_shutdown_hook(shutdown_hook_exception)
00148 rospy.core.signal_shutdown('test_exception')
00149 rospy.core._shutdown_flag = False
00150 del rospy.core._shutdown_hooks[:]
00151
00152 def test_shutdown(self):
00153 rospy.core._shutdown_flag = False
00154 del rospy.core._shutdown_hooks[:]
00155 global called, called2
00156 called = called2 = None
00157 self.failIf(rospy.core.is_shutdown())
00158 rospy.core.add_shutdown_hook(shutdown_hook1)
00159 reason = "reason %s"%time.time()
00160 rospy.core.signal_shutdown(reason)
00161 self.assertEquals(reason, called)
00162 self.assert_(rospy.core.is_shutdown())
00163
00164
00165 rospy.core.add_shutdown_hook(shutdown_hook2)
00166 self.assert_(called2 is not None)
00167 rospy.core._shutdown_flag = False
00168
00169
00170 def test_valid_name(self):
00171
00172 tests = ['/', 'srv', '/service', '/service1', 'serv/subserv']
00173 caller_id = '/me'
00174 for t in tests:
00175 self.assert_(rospy.core.valid_name('p')(t, caller_id))
00176 failures = ['ftp://foo', '', None, 1, True, 'http:', ' spaced ', ' ']
00177 for f in failures:
00178 try:
00179 rospy.core.valid_name('p')(f, caller_id)
00180 self.fail(f)
00181 except rospy.core.ParameterInvalid:
00182 pass
00183
00184 def test_is_topic(self):
00185
00186 caller_id = '/me'
00187 tests = [
00188 ('topic', '/node', '/topic'),
00189 ('topic', '/ns/node', '/ns/topic'),
00190 ('/topic', '/node', '/topic'),
00191 ('~topic', '/node', '/node/topic'),
00192 ('/topic1', '/node', '/topic1'),
00193 ('top/sub', '/node', '/top/sub'),
00194 ('top/sub', '/ns/node', '/ns/top/sub'),
00195 ]
00196
00197 for t, caller_id, v in tests:
00198 self.assertEquals(v, rospy.core.is_topic('p')(t, caller_id))
00199 failures = ['/', 'ftp://foo', '', None, 1, True, 'http:', ' spaced ', ' ']
00200 for f in failures:
00201 try:
00202 rospy.core.is_topic('p')(f, caller_id)
00203 self.fail(f)
00204 except rospy.core.ParameterInvalid:
00205 pass
00206
00207 def test_configure_logging(self):
00208
00209 try:
00210 rospy.core.configure_logging("/")
00211 self.fail("configure_logging should not accept a the root namespace as the node_name param")
00212 except: pass
00213 rospy.core.configure_logging("/node/name")
00214
00215 def test_xmlrpcapi(self):
00216
00217 self.assert_(rospy.core.xmlrpcapi(None) is None)
00218 self.assert_(rospy.core.xmlrpcapi('localhost:1234') is None)
00219 self.assert_(rospy.core.xmlrpcapi('http://') is None)
00220 api = rospy.core.xmlrpcapi('http://localhost:1234')
00221 self.assert_(api is not None)
00222 try:
00223 from xmlrpc.client import ServerProxy
00224 except ImportError:
00225 from xmlrpclib import ServerProxy
00226 self.assert_(isinstance(api, ServerProxy))
00227
00228 called = None
00229 called2 = None
00230 def shutdown_hook1(reason):
00231 global called
00232 print("HOOK", reason)
00233 called = reason
00234 def shutdown_hook2(reason):
00235 global called2
00236 print("HOOK2", reason)
00237 called2 = reason
00238 def shutdown_hook_exception(reason):
00239 raise Exception("gotcha")