34 from __future__
import print_function
48 from rospy.core
import parse_rosrpc_uri
49 valid = [(
'rosrpc://localhost:1234/',
'localhost', 1234),
50 (
'rosrpc://localhost2:1234',
'localhost2', 1234),
51 (
'rosrpc://third:1234/path/bar',
'third', 1234),
52 (
'rosrpc://foo.com:1/',
'foo.com', 1),
53 (
'rosrpc://foo.com:1/',
'foo.com', 1)]
54 for t, addr, port
in valid:
55 paddr, pport = rospy.core.parse_rosrpc_uri(t)
56 self.assertEquals(addr, paddr)
57 self.assertEquals(port, pport)
59 self.assertEquals(rospy.core.parse_rosrpc_uri(t), rospy.parse_rosrpc_uri(t))
60 invalid = [
'rosrpc://:1234/',
'rosrpc://localhost',
'http://localhost:1234/']
64 self.fail(
"%s was an invalid rosrpc uri"%i)
70 rospy.core.logdebug(
'debug')
71 rospy.core.logwarn(
'warn')
72 rospy.core.logout(
'out')
73 rospy.core.logerr(
'err')
74 rospy.core.logfatal(
'fatal')
77 rospy.logdebug(
'debug')
81 rospy.logfatal(
'fatal')
87 rospy.core.add_shutdown_hook(handle)
89 rospy.core.add_shutdown_hook(1)
90 self.fail_(
"add_shutdown_hook is not protected against invalid args")
91 except TypeError:
pass 93 rospy.core.add_shutdown_hook(1)
94 self.fail_(
"add_shutdown_hook is not protected against invalid args")
95 except TypeError:
pass 105 rospy.core.add_shutdown_hook(handle1)
106 rospy.core.add_shutdown_hook(handle2)
107 rospy.core.add_preshutdown_hook(handle3)
108 self.assert_(handle3
in rospy.core._preshutdown_hooks)
109 self.assert_(handle2
in rospy.core._shutdown_hooks)
110 self.assert_(handle1
in rospy.core._shutdown_hooks)
112 rospy.core.add_preshutdown_hook(1)
113 self.fail_(
"add_preshutdown_hook is not protected against invalid args")
114 except TypeError:
pass 116 rospy.core.add_preshutdown_hook(1)
117 self.fail_(
"add_preshutdown_hook is not protected against invalid args")
118 except TypeError:
pass 122 rospy.core.get_ros_root(env={}, required=
True)
125 self.assertEquals(
None, rospy.core.get_ros_root(env={}, required=
False))
126 rr =
"%s"%time.time()
127 self.assertEquals(rr, rospy.core.get_ros_root(env={
'ROS_ROOT': rr}, required=
False))
128 self.assertEquals(rr, rospy.core.get_ros_root(env={
'ROS_ROOT': rr}, required=
True))
130 self.assertEquals(os.path.normpath(os.environ[
'ROS_ROOT']), rospy.core.get_ros_root(required=
False))
132 uri =
"http://localhost-%s:1234"%random.randint(1, 1000)
133 self.assertEquals(
None, rospy.core.get_node_uri())
134 rospy.core.set_node_uri(uri)
135 self.assertEquals(uri, rospy.core.get_node_uri())
138 self.failIf(rospy.core.is_initialized())
139 rospy.core.set_initialized(
True)
140 self.assert_(rospy.core.is_initialized())
143 rospy.core._shutdown_flag =
False 144 del rospy.core._shutdown_hooks[:]
147 rospy.core.add_shutdown_hook(shutdown_hook_exception)
148 rospy.core.signal_shutdown(
'test_exception')
149 rospy.core._shutdown_flag =
False 150 del rospy.core._shutdown_hooks[:]
153 rospy.core._shutdown_flag =
False 154 del rospy.core._shutdown_hooks[:]
155 global called, called2
156 called = called2 =
None 157 self.failIf(rospy.core.is_shutdown())
158 rospy.core.add_shutdown_hook(shutdown_hook1)
159 reason =
"reason %s"%time.time()
160 rospy.core.signal_shutdown(reason)
161 self.assertEquals(reason, called)
162 self.assert_(rospy.core.is_shutdown())
165 rospy.core.add_shutdown_hook(shutdown_hook2)
166 self.assert_(called2
is not None)
167 rospy.core._shutdown_flag =
False 172 tests = [
'/',
'srv',
'/service',
'/service1',
'serv/subserv']
175 self.assert_(rospy.core.valid_name(
'p')(t, caller_id))
176 failures = [
'ftp://foo',
'',
None, 1,
True,
'http:',
' spaced ',
' ']
179 rospy.core.valid_name(
'p')(f, caller_id)
181 except rospy.core.ParameterInvalid:
188 (
'topic',
'/node',
'/topic'),
189 (
'topic',
'/ns/node',
'/ns/topic'),
190 (
'/topic',
'/node',
'/topic'),
191 (
'~topic',
'/node',
'/node/topic'),
192 (
'/topic1',
'/node',
'/topic1'),
193 (
'top/sub',
'/node',
'/top/sub'),
194 (
'top/sub',
'/ns/node',
'/ns/top/sub'),
197 for t, caller_id, v
in tests:
198 self.assertEquals(v, rospy.core.is_topic(
'p')(t, caller_id))
199 failures = [
'/',
'ftp://foo',
'',
None, 1,
True,
'http:',
' spaced ',
' ']
202 rospy.core.is_topic(
'p')(f, caller_id)
204 except rospy.core.ParameterInvalid:
210 rospy.core.configure_logging(
"/")
211 self.fail(
"configure_logging should not accept a the root namespace as the node_name param")
213 rospy.core.configure_logging(
"/node/name")
217 self.assert_(rospy.core.xmlrpcapi(
None)
is None)
218 self.assert_(rospy.core.xmlrpcapi(
'localhost:1234')
is None)
219 self.assert_(rospy.core.xmlrpcapi(
'http://')
is None)
220 api = rospy.core.xmlrpcapi(
'http://localhost:1234')
221 self.assert_(api
is not None)
223 from xmlrpc.client
import ServerProxy
225 from xmlrpclib
import ServerProxy
226 self.assert_(isinstance(api, ServerProxy))
232 print(
"HOOK", reason)
236 print(
"HOOK2", reason)
239 raise Exception(
"gotcha")
def shutdown_hook_exception(reason)
def test_initialized(self)
def test_configure_logging(self)
def test_add_shutdown_hook(self)
def test_add_preshutdown_hook(self)
def test_shutdown_hook_exception(self)
def test_get_ros_root(self)
def test_valid_name(self)
def test_parse_rosrpc_uri(self)
def shutdown_hook2(reason)
def shutdown_hook1(reason)