00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 import os
00035 import sys
00036 import struct
00037 import unittest
00038 import time
00039 import random
00040
00041 import rospy
00042
00043 class TestRospyCore(unittest.TestCase):
00044
00045 def test_parse_rosrpc_uri(self):
00046 from rospy.core import parse_rosrpc_uri
00047 valid = [('rosrpc://localhost:1234/', 'localhost', 1234),
00048 ('rosrpc://localhost2:1234', 'localhost2', 1234),
00049 ('rosrpc://third:1234/path/bar', 'third', 1234),
00050 ('rosrpc://foo.com:1/', 'foo.com', 1),
00051 ('rosrpc://foo.com:1/', 'foo.com', 1)]
00052 for t, addr, port in valid:
00053 paddr, pport = rospy.core.parse_rosrpc_uri(t)
00054 self.assertEquals(addr, paddr)
00055 self.assertEquals(port, pport)
00056
00057 self.assertEquals(rospy.core.parse_rosrpc_uri(t), rospy.parse_rosrpc_uri(t))
00058 invalid = ['rosrpc://:1234/', 'rosrpc://localhost', 'http://localhost:1234/']
00059 for i in invalid:
00060 try:
00061 parse_rosrpc_uri(i)
00062 self.fail("%s was an invalid rosrpc uri"%i)
00063 except: pass
00064
00065 def test_add_log_handler(self):
00066
00067 import rospy.core
00068 from rosgraph_msgs.msg import Log
00069 for level in [Log.DEBUG,
00070 Log.INFO,
00071 Log.WARN,
00072 Log.ERROR,
00073 Log.FATAL]:
00074 rospy.core.add_log_handler(level, lambda x: x)
00075 import rospy.exceptions
00076 try:
00077 rospy.core.add_log_handler(lambda x: x, -1)
00078 except rospy.exceptions.ROSInternalException, e: pass
00079
00080 def test_loggers(self):
00081
00082 import rospy.core
00083 rospy.core.logdebug('debug')
00084 rospy.core.logwarn('warn')
00085 rospy.core.logout('out')
00086 rospy.core.logerr('err')
00087 rospy.core.logfatal('fatal')
00088
00089 import rospy
00090 rospy.logdebug('debug')
00091 rospy.logwarn('warn')
00092 rospy.logout('out')
00093 rospy.logerr('err')
00094 rospy.logfatal('fatal')
00095
00096 def test_add_shutdown_hook(self):
00097 def handle(reason):
00098 pass
00099
00100 rospy.core.add_shutdown_hook(handle)
00101 try:
00102 rospy.core.add_shutdown_hook(1)
00103 self.fail_("add_shutdown_hook is not protected against invalid args")
00104 except TypeError: pass
00105 try:
00106 rospy.core.add_shutdown_hook(1)
00107 self.fail_("add_shutdown_hook is not protected against invalid args")
00108 except TypeError: pass
00109
00110 def test_add_preshutdown_hook(self):
00111 def handle1(reason):
00112 pass
00113 def handle2(reason):
00114 pass
00115 def handle3(reason):
00116 pass
00117
00118 rospy.core.add_shutdown_hook(handle1)
00119 rospy.core.add_shutdown_hook(handle2)
00120 rospy.core.add_preshutdown_hook(handle3)
00121 self.assert_(handle3 in rospy.core._preshutdown_hooks)
00122 self.assert_(handle2 in rospy.core._shutdown_hooks)
00123 self.assert_(handle1 in rospy.core._shutdown_hooks)
00124 try:
00125 rospy.core.add_preshutdown_hook(1)
00126 self.fail_("add_preshutdown_hook is not protected against invalid args")
00127 except TypeError: pass
00128 try:
00129 rospy.core.add_preshutdown_hook(1)
00130 self.fail_("add_preshutdown_hook is not protected against invalid args")
00131 except TypeError: pass
00132
00133 def test_get_ros_root(self):
00134 try:
00135 rospy.core.get_ros_root(env={}, required=True)
00136 except:
00137 pass
00138 self.assertEquals(None, rospy.core.get_ros_root(env={}, required=False))
00139 rr = "%s"%time.time()
00140 self.assertEquals(rr, rospy.core.get_ros_root(env={'ROS_ROOT': rr}, required=False))
00141 self.assertEquals(rr, rospy.core.get_ros_root(env={'ROS_ROOT': rr}, required=True))
00142
00143 self.assertEquals(os.environ['ROS_ROOT'], rospy.core.get_ros_root(required=False))
00144 def test_node_uri(self):
00145 uri = "http://localhost-%s:1234"%random.randint(1, 1000)
00146 self.assertEquals(None, rospy.core.get_node_uri())
00147 rospy.core.set_node_uri(uri)
00148 self.assertEquals(uri, rospy.core.get_node_uri())
00149
00150 def test_initialized(self):
00151 self.failIf(rospy.core.is_initialized())
00152 rospy.core.set_initialized(True)
00153 self.assert_(rospy.core.is_initialized())
00154
00155 def test_shutdown_hook_exception(self):
00156 rospy.core._shutdown_flag = False
00157 del rospy.core._shutdown_hooks[:]
00158
00159
00160 rospy.core.add_shutdown_hook(shutdown_hook_exception)
00161 rospy.core.signal_shutdown('test_exception')
00162 rospy.core._shutdown_flag = False
00163 del rospy.core._shutdown_hooks[:]
00164
00165 def test_shutdown(self):
00166 rospy.core._shutdown_flag = False
00167 del rospy.core._shutdown_hooks[:]
00168 global called, called2
00169 called = called2 = None
00170 self.failIf(rospy.core.is_shutdown())
00171 rospy.core.add_shutdown_hook(shutdown_hook1)
00172 reason = "reason %s"%time.time()
00173 rospy.core.signal_shutdown(reason)
00174 self.assertEquals(reason, called)
00175 self.assert_(rospy.core.is_shutdown())
00176
00177
00178 rospy.core.add_shutdown_hook(shutdown_hook2)
00179 self.assert_(called2 is not None)
00180 rospy.core._shutdown_flag = False
00181
00182
00183 def test_valid_name(self):
00184
00185 tests = ['/', 'srv', '/service', '/service1', 'serv/subserv']
00186 caller_id = '/me'
00187 for t in tests:
00188 self.assert_(rospy.core.valid_name('p')(t, caller_id))
00189 failures = ['ftp://foo', '', None, 1, True, 'http:', ' spaced ', ' ']
00190 for f in failures:
00191 try:
00192 rospy.core.valid_name('p')(f, caller_id)
00193 self.fail(f)
00194 except rospy.core.ParameterInvalid, e:
00195 pass
00196
00197 def test_is_topic(self):
00198
00199 caller_id = '/me'
00200 tests = [
00201 ('topic', '/node', '/topic'),
00202 ('topic', '/ns/node', '/ns/topic'),
00203 ('/topic', '/node', '/topic'),
00204 ('~topic', '/node', '/node/topic'),
00205 ('/topic1', '/node', '/topic1'),
00206 ('top/sub', '/node', '/top/sub'),
00207 ('top/sub', '/ns/node', '/ns/top/sub'),
00208 ]
00209
00210 for t, caller_id, v in tests:
00211 self.assertEquals(v, rospy.core.is_topic('p')(t, caller_id))
00212 failures = ['/', 'ftp://foo', '', None, 1, True, 'http:', ' spaced ', ' ']
00213 for f in failures:
00214 try:
00215 rospy.core.is_topic('p')(f, caller_id)
00216 self.fail(f)
00217 except rospy.core.ParameterInvalid, e:
00218 pass
00219
00220 def test_configure_logging(self):
00221
00222 try:
00223 rospy.core.configure_logging("/")
00224 self.fail("configure_logging should not accept a the root namespace as the node_name param")
00225 except: pass
00226 rospy.core.configure_logging("/node/name")
00227
00228 def test_xmlrpcapi(self):
00229
00230 self.assert_(rospy.core.xmlrpcapi(None) is None)
00231 self.assert_(rospy.core.xmlrpcapi('localhost:1234') is None)
00232 self.assert_(rospy.core.xmlrpcapi('http://') is None)
00233 api = rospy.core.xmlrpcapi('http://localhost:1234')
00234 self.assert_(api is not None)
00235 import xmlrpclib
00236 self.assert_(isinstance(api, xmlrpclib.ServerProxy))
00237
00238 called = None
00239 called2 = None
00240 def shutdown_hook1(reason):
00241 global called
00242 print "HOOK", reason
00243 called = reason
00244 def shutdown_hook2(reason):
00245 global called2
00246 print "HOOK2", reason
00247 called2 = reason
00248 def shutdown_hook_exception(reason):
00249 raise Exception("gotcha")