34 from __future__
import print_function
43 from cStringIO
import StringIO
45 from io
import StringIO
49 import rosgraph.roslogging
56 from rospy.core
import parse_rosrpc_uri
57 valid = [(
'rosrpc://localhost:1234/',
'localhost', 1234),
58 (
'rosrpc://localhost2:1234',
'localhost2', 1234),
59 (
'rosrpc://third:1234/path/bar',
'third', 1234),
60 (
'rosrpc://foo.com:1/',
'foo.com', 1),
61 (
'rosrpc://foo.com:1/',
'foo.com', 1)]
62 for t, addr, port
in valid:
63 paddr, pport = rospy.core.parse_rosrpc_uri(t)
64 self.assertEquals(addr, paddr)
65 self.assertEquals(port, pport)
67 self.assertEquals(rospy.core.parse_rosrpc_uri(t), rospy.parse_rosrpc_uri(t))
68 invalid = [
'rosrpc://:1234/',
'rosrpc://localhost',
'http://localhost:1234/']
72 self.fail(
"%s was an invalid rosrpc uri"%i)
76 old_format = os.environ.get(
'ROSCONSOLE_FORMAT',
'')
80 os.environ[
'ROSCONSOLE_FORMAT'] =
' '.join([
94 self.assertRaises(rospy.exceptions.ROSException, rospy.core.configure_logging,
"/")
96 rospy.core.configure_logging(
"/node/name", logging.DEBUG)
98 testlogfile = rospy.core._log_filename
100 rosout_logger = logging.getLogger(
'rosout')
102 self.assertTrue(len(rosout_logger.handlers) == 1)
103 self.assertTrue(rosout_logger.handlers[0], rosgraph.roslogging.RosStreamHandler)
105 default_ros_handler = rosout_logger.handlers[0]
110 test_ros_handler = rosgraph.roslogging.RosStreamHandler(colorize=
False, stdout=lout, stderr=lerr)
112 lf = open(testlogfile,
'r') 114 this_file = os.path.abspath(__file__) 116 base, ext = os.path.splitext(this_file)
118 this_file = base +
'.py' 122 rosout_logger.removeHandler(default_ros_handler)
123 rosout_logger.addHandler(test_ros_handler)
126 rospy.core.logdebug(
'debug')
127 rospy.core.loginfo(
'info')
128 rospy.core.logwarn(
'warn')
129 rospy.core.logerror(
'error')
130 rospy.core.logfatal(
'fatal')
147 def lvl2loglvl_stream(lvl):
157 for lvl
in [
'debug',
'info',
'warn',
'err',
'fatal']:
158 logmthd = getattr(rospy,
'log' + lvl)
161 log_file =
' '.join([
162 '\[rosout\]\[' + lvl2loglvl(lvl) +
'\]',
163 '(\d+[-\/]\d+[-\/]\d+)',
164 '(\d+[:]\d+[:]\d+[,]\d+):',
167 fileline = lf.readline()
171 self.assertTrue(len(fileline) == 0)
173 self.assertTrue(bool(re.match(log_file, fileline)), msg=
"{0} doesn't match: {1}".format(fileline, log_file))
176 lvl2loglvl_stream(lvl),
183 'TestRospyCore.test_loggers',
187 outline = lout.getvalue().splitlines()
188 errline = lerr.getvalue().splitlines()
191 self.assertTrue(len(outline) > 0)
193 self.assertTrue(bool(re.match(log_out, outline[-1])), msg=
"{0}\n doesn't match: {1}".format(outline[-1], log_out))
195 elif lvl
in [
'warn',
'err',
'fatal']:
196 self.assertTrue(len(errline) > 0)
198 self.assertTrue(bool(re.match(log_out, errline[-1])), msg=
"{0}\n doesn't match: {1}".format(errline[-1], log_out))
204 rosout_logger.removeHandler(test_ros_handler)
207 rosout_logger.addHandler(default_ros_handler)
211 os.environ[
'ROSCONSOLE_FORMAT'] = old_format
217 rospy.core.add_shutdown_hook(handle)
219 rospy.core.add_shutdown_hook(1)
220 self.fail_(
"add_shutdown_hook is not protected against invalid args")
221 except TypeError:
pass 223 rospy.core.add_shutdown_hook(1)
224 self.fail_(
"add_shutdown_hook is not protected against invalid args")
225 except TypeError:
pass 235 rospy.core.add_shutdown_hook(handle1)
236 rospy.core.add_shutdown_hook(handle2)
237 rospy.core.add_preshutdown_hook(handle3)
238 self.assert_(handle3
in rospy.core._preshutdown_hooks)
239 self.assert_(handle2
in rospy.core._shutdown_hooks)
240 self.assert_(handle1
in rospy.core._shutdown_hooks)
242 rospy.core.add_preshutdown_hook(1)
243 self.fail_(
"add_preshutdown_hook is not protected against invalid args")
244 except TypeError:
pass 246 rospy.core.add_preshutdown_hook(1)
247 self.fail_(
"add_preshutdown_hook is not protected against invalid args")
248 except TypeError:
pass 252 rospy.core.get_ros_root(env={}, required=
True)
255 self.assertEquals(
None, rospy.core.get_ros_root(env={}, required=
False))
256 rr =
"%s"%time.time()
257 self.assertEquals(rr, rospy.core.get_ros_root(env={
'ROS_ROOT': rr}, required=
False))
258 self.assertEquals(rr, rospy.core.get_ros_root(env={
'ROS_ROOT': rr}, required=
True))
260 self.assertEquals(os.path.normpath(os.environ[
'ROS_ROOT']), rospy.core.get_ros_root(required=
False))
262 uri =
"http://localhost-%s:1234"%random.randint(1, 1000)
263 self.assertEquals(
None, rospy.core.get_node_uri())
264 rospy.core.set_node_uri(uri)
265 self.assertEquals(uri, rospy.core.get_node_uri())
268 self.failIf(rospy.core.is_initialized())
269 rospy.core.set_initialized(
True)
270 self.assert_(rospy.core.is_initialized())
273 rospy.core._shutdown_flag =
False 274 del rospy.core._shutdown_hooks[:]
277 rospy.core.add_shutdown_hook(shutdown_hook_exception)
278 rospy.core.signal_shutdown(
'test_exception')
279 rospy.core._shutdown_flag =
False 280 del rospy.core._shutdown_hooks[:]
283 rospy.core._shutdown_flag =
False 284 del rospy.core._shutdown_hooks[:]
285 global called, called2
286 called = called2 =
None 287 self.failIf(rospy.core.is_shutdown())
288 rospy.core.add_shutdown_hook(shutdown_hook1)
289 reason =
"reason %s"%time.time()
290 rospy.core.signal_shutdown(reason)
291 self.assertEquals(reason, called)
292 self.assert_(rospy.core.is_shutdown())
295 rospy.core.add_shutdown_hook(shutdown_hook2)
296 self.assert_(called2
is not None)
297 rospy.core._shutdown_flag =
False 302 tests = [
'/',
'srv',
'/service',
'/service1',
'serv/subserv']
305 self.assert_(rospy.core.valid_name(
'p')(t, caller_id))
306 failures = [
'ftp://foo',
'',
None, 1,
True,
'http:',
' spaced ',
' ']
309 rospy.core.valid_name(
'p')(f, caller_id)
311 except rospy.core.ParameterInvalid:
318 (
'topic',
'/node',
'/topic'),
319 (
'topic',
'/ns/node',
'/ns/topic'),
320 (
'/topic',
'/node',
'/topic'),
321 (
'~topic',
'/node',
'/node/topic'),
322 (
'/topic1',
'/node',
'/topic1'),
323 (
'top/sub',
'/node',
'/top/sub'),
324 (
'top/sub',
'/ns/node',
'/ns/top/sub'),
327 for t, caller_id, v
in tests:
328 self.assertEquals(v, rospy.core.is_topic(
'p')(t, caller_id))
329 failures = [
'/',
'ftp://foo',
'',
None, 1,
True,
'http:',
' spaced ',
' ']
332 rospy.core.is_topic(
'p')(f, caller_id)
334 except rospy.core.ParameterInvalid:
339 self.assert_(rospy.core.xmlrpcapi(
None)
is None)
340 self.assert_(rospy.core.xmlrpcapi(
'localhost:1234')
is None)
341 self.assert_(rospy.core.xmlrpcapi(
'http://')
is None)
342 api = rospy.core.xmlrpcapi(
'http://localhost:1234')
343 self.assert_(api
is not None)
345 from xmlrpc.client
import ServerProxy
347 from xmlrpclib
import ServerProxy
348 self.assert_(isinstance(api, ServerProxy))
354 print(
"HOOK", reason)
358 print(
"HOOK2", reason)
361 raise Exception(
"gotcha")
def shutdown_hook_exception(reason)
def test_initialized(self)
def test_add_shutdown_hook(self)
def test_add_preshutdown_hook(self)
def test_shutdown_hook_exception(self)
def test_get_ros_root(self)
def test_valid_name(self)
def test_parse_rosrpc_uri(self)
def shutdown_hook2(reason)
def shutdown_hook1(reason)