00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 import os
00035 import sys
00036 import struct
00037 import unittest
00038 import time
00039 import random
00040
00041 import rospy
00042
00043 class TestRospyCore(unittest.TestCase):
00044
00045 def test_parse_rosrpc_uri(self):
00046 from rospy.core import parse_rosrpc_uri
00047 valid = [('rosrpc://localhost:1234/', 'localhost', 1234),
00048 ('rosrpc://localhost2:1234', 'localhost2', 1234),
00049 ('rosrpc://third:1234/path/bar', 'third', 1234),
00050 ('rosrpc://foo.com:1/', 'foo.com', 1),
00051 ('rosrpc://foo.com:1/', 'foo.com', 1)]
00052 for t, addr, port in valid:
00053 paddr, pport = rospy.core.parse_rosrpc_uri(t)
00054 self.assertEquals(addr, paddr)
00055 self.assertEquals(port, pport)
00056
00057 self.assertEquals(rospy.core.parse_rosrpc_uri(t), rospy.parse_rosrpc_uri(t))
00058 invalid = ['rosrpc://:1234/', 'rosrpc://localhost', 'http://localhost:1234/']
00059 for i in invalid:
00060 try:
00061 parse_rosrpc_uri(i)
00062 self.fail("%s was an invalid rosrpc uri"%i)
00063 except: pass
00064
00065 def test_loggers(self):
00066
00067 import rospy.core
00068 rospy.core.logdebug('debug')
00069 rospy.core.logwarn('warn')
00070 rospy.core.logout('out')
00071 rospy.core.logerr('err')
00072 rospy.core.logfatal('fatal')
00073
00074 import rospy
00075 rospy.logdebug('debug')
00076 rospy.logwarn('warn')
00077 rospy.logout('out')
00078 rospy.logerr('err')
00079 rospy.logfatal('fatal')
00080
00081 def test_add_shutdown_hook(self):
00082 def handle(reason):
00083 pass
00084
00085 rospy.core.add_shutdown_hook(handle)
00086 try:
00087 rospy.core.add_shutdown_hook(1)
00088 self.fail_("add_shutdown_hook is not protected against invalid args")
00089 except TypeError: pass
00090 try:
00091 rospy.core.add_shutdown_hook(1)
00092 self.fail_("add_shutdown_hook is not protected against invalid args")
00093 except TypeError: pass
00094
00095 def test_add_preshutdown_hook(self):
00096 def handle1(reason):
00097 pass
00098 def handle2(reason):
00099 pass
00100 def handle3(reason):
00101 pass
00102
00103 rospy.core.add_shutdown_hook(handle1)
00104 rospy.core.add_shutdown_hook(handle2)
00105 rospy.core.add_preshutdown_hook(handle3)
00106 self.assert_(handle3 in rospy.core._preshutdown_hooks)
00107 self.assert_(handle2 in rospy.core._shutdown_hooks)
00108 self.assert_(handle1 in rospy.core._shutdown_hooks)
00109 try:
00110 rospy.core.add_preshutdown_hook(1)
00111 self.fail_("add_preshutdown_hook is not protected against invalid args")
00112 except TypeError: pass
00113 try:
00114 rospy.core.add_preshutdown_hook(1)
00115 self.fail_("add_preshutdown_hook is not protected against invalid args")
00116 except TypeError: pass
00117
00118 def test_get_ros_root(self):
00119 try:
00120 rospy.core.get_ros_root(env={}, required=True)
00121 except:
00122 pass
00123 self.assertEquals(None, rospy.core.get_ros_root(env={}, required=False))
00124 rr = "%s"%time.time()
00125 self.assertEquals(rr, rospy.core.get_ros_root(env={'ROS_ROOT': rr}, required=False))
00126 self.assertEquals(rr, rospy.core.get_ros_root(env={'ROS_ROOT': rr}, required=True))
00127
00128 self.assertEquals(os.path.normpath(os.environ['ROS_ROOT']), rospy.core.get_ros_root(required=False))
00129 def test_node_uri(self):
00130 uri = "http://localhost-%s:1234"%random.randint(1, 1000)
00131 self.assertEquals(None, rospy.core.get_node_uri())
00132 rospy.core.set_node_uri(uri)
00133 self.assertEquals(uri, rospy.core.get_node_uri())
00134
00135 def test_initialized(self):
00136 self.failIf(rospy.core.is_initialized())
00137 rospy.core.set_initialized(True)
00138 self.assert_(rospy.core.is_initialized())
00139
00140 def test_shutdown_hook_exception(self):
00141 rospy.core._shutdown_flag = False
00142 del rospy.core._shutdown_hooks[:]
00143
00144
00145 rospy.core.add_shutdown_hook(shutdown_hook_exception)
00146 rospy.core.signal_shutdown('test_exception')
00147 rospy.core._shutdown_flag = False
00148 del rospy.core._shutdown_hooks[:]
00149
00150 def test_shutdown(self):
00151 rospy.core._shutdown_flag = False
00152 del rospy.core._shutdown_hooks[:]
00153 global called, called2
00154 called = called2 = None
00155 self.failIf(rospy.core.is_shutdown())
00156 rospy.core.add_shutdown_hook(shutdown_hook1)
00157 reason = "reason %s"%time.time()
00158 rospy.core.signal_shutdown(reason)
00159 self.assertEquals(reason, called)
00160 self.assert_(rospy.core.is_shutdown())
00161
00162
00163 rospy.core.add_shutdown_hook(shutdown_hook2)
00164 self.assert_(called2 is not None)
00165 rospy.core._shutdown_flag = False
00166
00167
00168 def test_valid_name(self):
00169
00170 tests = ['/', 'srv', '/service', '/service1', 'serv/subserv']
00171 caller_id = '/me'
00172 for t in tests:
00173 self.assert_(rospy.core.valid_name('p')(t, caller_id))
00174 failures = ['ftp://foo', '', None, 1, True, 'http:', ' spaced ', ' ']
00175 for f in failures:
00176 try:
00177 rospy.core.valid_name('p')(f, caller_id)
00178 self.fail(f)
00179 except rospy.core.ParameterInvalid, e:
00180 pass
00181
00182 def test_is_topic(self):
00183
00184 caller_id = '/me'
00185 tests = [
00186 ('topic', '/node', '/topic'),
00187 ('topic', '/ns/node', '/ns/topic'),
00188 ('/topic', '/node', '/topic'),
00189 ('~topic', '/node', '/node/topic'),
00190 ('/topic1', '/node', '/topic1'),
00191 ('top/sub', '/node', '/top/sub'),
00192 ('top/sub', '/ns/node', '/ns/top/sub'),
00193 ]
00194
00195 for t, caller_id, v in tests:
00196 self.assertEquals(v, rospy.core.is_topic('p')(t, caller_id))
00197 failures = ['/', 'ftp://foo', '', None, 1, True, 'http:', ' spaced ', ' ']
00198 for f in failures:
00199 try:
00200 rospy.core.is_topic('p')(f, caller_id)
00201 self.fail(f)
00202 except rospy.core.ParameterInvalid, e:
00203 pass
00204
00205 def test_configure_logging(self):
00206
00207 try:
00208 rospy.core.configure_logging("/")
00209 self.fail("configure_logging should not accept a the root namespace as the node_name param")
00210 except: pass
00211 rospy.core.configure_logging("/node/name")
00212
00213 def test_xmlrpcapi(self):
00214
00215 self.assert_(rospy.core.xmlrpcapi(None) is None)
00216 self.assert_(rospy.core.xmlrpcapi('localhost:1234') is None)
00217 self.assert_(rospy.core.xmlrpcapi('http://') is None)
00218 api = rospy.core.xmlrpcapi('http://localhost:1234')
00219 self.assert_(api is not None)
00220 import xmlrpclib
00221 self.assert_(isinstance(api, xmlrpclib.ServerProxy))
00222
00223 called = None
00224 called2 = None
00225 def shutdown_hook1(reason):
00226 global called
00227 print "HOOK", reason
00228 called = reason
00229 def shutdown_hook2(reason):
00230 global called2
00231 print "HOOK2", reason
00232 called2 = reason
00233 def shutdown_hook_exception(reason):
00234 raise Exception("gotcha")