00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 import roslib; roslib.load_manifest('test_rospy')
00037
00038 import os
00039 import sys
00040 import struct
00041 import unittest
00042 import time
00043 import random
00044
00045 import rostest
00046
00047 import rospy
00048
00049
00050 class TestRospyCore(unittest.TestCase):
00051
00052 def test_parse_rosrpc_uri(self):
00053 from rospy.core import parse_rosrpc_uri
00054 valid = [('rosrpc://localhost:1234/', 'localhost', 1234),
00055 ('rosrpc://localhost2:1234', 'localhost2', 1234),
00056 ('rosrpc://third:1234/path/bar', 'third', 1234),
00057 ('rosrpc://foo.com:1/', 'foo.com', 1),
00058 ('rosrpc://foo.com:1/', 'foo.com', 1)]
00059 for t, addr, port in valid:
00060 paddr, pport = rospy.core.parse_rosrpc_uri(t)
00061 self.assertEquals(addr, paddr)
00062 self.assertEquals(port, pport)
00063
00064 self.assertEquals(rospy.core.parse_rosrpc_uri(t), rospy.parse_rosrpc_uri(t))
00065 invalid = ['rosrpc://:1234/', 'rosrpc://localhost', 'http://localhost:1234/']
00066 for i in invalid:
00067 try:
00068 parse_rosrpc_uri(i)
00069 self.fail("%s was an invalid rosrpc uri"%i)
00070 except: pass
00071
00072 def test_add_log_handler(self):
00073
00074 import rospy.core
00075 from rosgraph_msgs.msg import Log
00076 for level in [Log.DEBUG,
00077 Log.INFO,
00078 Log.WARN,
00079 Log.ERROR,
00080 Log.FATAL]:
00081 rospy.core.add_log_handler(level, lambda x: x)
00082 import rospy.exceptions
00083 try:
00084 rospy.core.add_log_handler(lambda x: x, -1)
00085 except rospy.exceptions.ROSInternalException, e: pass
00086
00087 def test_loggers(self):
00088
00089 import rospy.core
00090 rospy.core.logdebug('debug')
00091 rospy.core.logwarn('warn')
00092 rospy.core.logout('out')
00093 rospy.core.logerr('err')
00094 rospy.core.logfatal('fatal')
00095
00096 import rospy
00097 rospy.logdebug('debug')
00098 rospy.logwarn('warn')
00099 rospy.logout('out')
00100 rospy.logerr('err')
00101 rospy.logfatal('fatal')
00102
00103 def test_add_shutdown_hook(self):
00104 def handle(reason):
00105 pass
00106
00107 rospy.core.add_shutdown_hook(handle)
00108 try:
00109 rospy.core.add_shutdown_hook(1)
00110 self.fail_("add_shutdown_hook is not protected against invalid args")
00111 except TypeError: pass
00112 try:
00113 rospy.core.add_shutdown_hook(1)
00114 self.fail_("add_shutdown_hook is not protected against invalid args")
00115 except TypeError: pass
00116
00117 def test_add_preshutdown_hook(self):
00118 def handle1(reason):
00119 pass
00120 def handle2(reason):
00121 pass
00122 def handle3(reason):
00123 pass
00124
00125 rospy.core.add_shutdown_hook(handle1)
00126 rospy.core.add_shutdown_hook(handle2)
00127 rospy.core.add_preshutdown_hook(handle3)
00128 self.assert_(handle3 in rospy.core._preshutdown_hooks)
00129 self.assert_(handle2 in rospy.core._shutdown_hooks)
00130 self.assert_(handle1 in rospy.core._shutdown_hooks)
00131 try:
00132 rospy.core.add_preshutdown_hook(1)
00133 self.fail_("add_preshutdown_hook is not protected against invalid args")
00134 except TypeError: pass
00135 try:
00136 rospy.core.add_preshutdown_hook(1)
00137 self.fail_("add_preshutdown_hook is not protected against invalid args")
00138 except TypeError: pass
00139
00140 def test_get_ros_root(self):
00141 try:
00142 rospy.core.get_ros_root(env={}, required=True)
00143 except:
00144 pass
00145 self.assertEquals(None, rospy.core.get_ros_root(env={}, required=False))
00146 rr = "%s"%time.time()
00147 self.assertEquals(rr, rospy.core.get_ros_root(env={'ROS_ROOT': rr}, required=False))
00148 self.assertEquals(rr, rospy.core.get_ros_root(env={'ROS_ROOT': rr}, required=True))
00149
00150 self.assertEquals(os.environ['ROS_ROOT'], rospy.core.get_ros_root(required=False))
00151 def test_node_uri(self):
00152 uri = "http://localhost-%s:1234"%random.randint(1, 1000)
00153 self.assertEquals(None, rospy.core.get_node_uri())
00154 rospy.core.set_node_uri(uri)
00155 self.assertEquals(uri, rospy.core.get_node_uri())
00156
00157 def test_initialized(self):
00158 self.failIf(rospy.core.is_initialized())
00159 rospy.core.set_initialized(True)
00160 self.assert_(rospy.core.is_initialized())
00161
00162 def test_shutdown_hook_exception(self):
00163 rospy.core._shutdown_flag = False
00164 del rospy.core._shutdown_hooks[:]
00165
00166
00167 rospy.core.add_shutdown_hook(test_shutdown_hook_exception)
00168 rospy.core.signal_shutdown('test_exception')
00169 rospy.core._shutdown_flag = False
00170 del rospy.core._shutdown_hooks[:]
00171
00172 def test_shutdown(self):
00173 rospy.core._shutdown_flag = False
00174 del rospy.core._shutdown_hooks[:]
00175 global called, called2
00176 called = called2 = None
00177 self.failIf(rospy.core.is_shutdown())
00178 rospy.core.add_shutdown_hook(test_shutdown_hook)
00179 reason = "reason %s"%time.time()
00180 rospy.core.signal_shutdown(reason)
00181 self.assertEquals(reason, called)
00182 self.assert_(rospy.core.is_shutdown())
00183
00184
00185 rospy.core.add_shutdown_hook(test_shutdown_hook2)
00186 self.assert_(called2 is not None)
00187 rospy.core._shutdown_flag = False
00188
00189
00190 def test_valid_name(self):
00191
00192 tests = ['/', 'srv', '/service', '/service1', 'serv/subserv']
00193 caller_id = '/me'
00194 for t in tests:
00195 self.assert_(rospy.core.valid_name('p')(t, caller_id))
00196 failures = ['ftp://foo', '', None, 1, True, 'http:', ' spaced ', ' ']
00197 for f in failures:
00198 try:
00199 rospy.core.valid_name('p')(f, caller_id)
00200 self.fail(f)
00201 except rospy.core.ParameterInvalid, e:
00202 pass
00203
00204 def test_is_topic(self):
00205
00206 caller_id = '/me'
00207 tests = [
00208 ('topic', '/node', '/topic'),
00209 ('topic', '/ns/node', '/ns/topic'),
00210 ('/topic', '/node', '/topic'),
00211 ('~topic', '/node', '/node/topic'),
00212 ('/topic1', '/node', '/topic1'),
00213 ('top/sub', '/node', '/top/sub'),
00214 ('top/sub', '/ns/node', '/ns/top/sub'),
00215 ]
00216
00217 for t, caller_id, v in tests:
00218 self.assertEquals(v, rospy.core.is_topic('p')(t, caller_id))
00219 failures = ['/', 'ftp://foo', '', None, 1, True, 'http:', ' spaced ', ' ']
00220 for f in failures:
00221 try:
00222 rospy.core.is_topic('p')(f, caller_id)
00223 self.fail(f)
00224 except rospy.core.ParameterInvalid, e:
00225 pass
00226
00227 def test_configure_logging(self):
00228
00229 try:
00230 rospy.core.configure_logging("/")
00231 self.fail("configure_logging should not accept a the root namespace as the node_name param")
00232 except: pass
00233 rospy.core.configure_logging("/node/name")
00234
00235 def test_xmlrpcapi(self):
00236
00237 self.assert_(rospy.core.xmlrpcapi(None) is None)
00238 self.assert_(rospy.core.xmlrpcapi('localhost:1234') is None)
00239 self.assert_(rospy.core.xmlrpcapi('http://') is None)
00240 api = rospy.core.xmlrpcapi('http://localhost:1234')
00241 self.assert_(api is not None)
00242 import xmlrpclib
00243 self.assert_(isinstance(api, xmlrpclib.ServerProxy))
00244
00245 called = None
00246 called2 = None
00247 def test_shutdown_hook(reason):
00248 global called
00249 print "HOOK", reason
00250 called = reason
00251 def test_shutdown_hook2(reason):
00252 global called2
00253 print "HOOK2", reason
00254 called2 = reason
00255 def test_shutdown_hook_exception(reason):
00256 raise Exception("gotcha")
00257
00258 if __name__ == '__main__':
00259 rostest.unitrun('test_rospy', sys.argv[0], TestRospyCore, coverage_packages=['rospy.core'])