test_rospy_core.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2008, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 
34 from __future__ import print_function
35 
36 import os
37 import sys
38 import struct
39 import unittest
40 import time
41 import random
42 try:
43  from cStringIO import StringIO
44 except ImportError:
45  from io import StringIO
46 
47 import re
48 import logging
49 import rosgraph.roslogging
50 import rospy
51 
52 
53 class TestRospyCore(unittest.TestCase):
54 
56  from rospy.core import parse_rosrpc_uri
57  valid = [('rosrpc://localhost:1234/', 'localhost', 1234),
58  ('rosrpc://localhost2:1234', 'localhost2', 1234),
59  ('rosrpc://third:1234/path/bar', 'third', 1234),
60  ('rosrpc://foo.com:1/', 'foo.com', 1),
61  ('rosrpc://foo.com:1/', 'foo.com', 1)]
62  for t, addr, port in valid:
63  paddr, pport = rospy.core.parse_rosrpc_uri(t)
64  self.assertEquals(addr, paddr)
65  self.assertEquals(port, pport)
66  # validate that it's a top-level API method
67  self.assertEquals(rospy.core.parse_rosrpc_uri(t), rospy.parse_rosrpc_uri(t))
68  invalid = ['rosrpc://:1234/', 'rosrpc://localhost', 'http://localhost:1234/']
69  for i in invalid:
70  try:
71  parse_rosrpc_uri(i)
72  self.fail("%s was an invalid rosrpc uri"%i)
73  except: pass
74 
75  def test_loggers(self):
76  old_format = os.environ.get('ROSCONSOLE_FORMAT', '')
77 
78  try:
79  # detailed log format
80  os.environ['ROSCONSOLE_FORMAT'] = ' '.join([
81  '${severity}',
82  '${message}',
83  '${walltime}',
84  '${thread}',
85  '${logger}',
86  '${file}',
87  '${line}',
88  '${function}',
89  '${node}',
90  '${time}',
91  ])
92 
93  # configure_logging should not accept the root namespace as the node_name param
94  self.assertRaises(rospy.exceptions.ROSException, rospy.core.configure_logging, "/")
95 
96  rospy.core.configure_logging("/node/name", logging.DEBUG)
97  # access our own log file
98  testlogfile = rospy.core._log_filename
99 
100  rosout_logger = logging.getLogger('rosout')
101 
102  self.assertTrue(len(rosout_logger.handlers) == 1)
103  self.assertTrue(rosout_logger.handlers[0], rosgraph.roslogging.RosStreamHandler)
104 
105  default_ros_handler = rosout_logger.handlers[0]
106 
107  # Remap stdout for testing
108  lout = StringIO()
109  lerr = StringIO()
110  test_ros_handler = rosgraph.roslogging.RosStreamHandler(colorize=False, stdout=lout, stderr=lerr)
111 
112  lf = open(testlogfile, 'r')
113 
114  this_file = os.path.abspath(__file__)
115  # this is necessary to avoid test fails because of .pyc cache file
116  base, ext = os.path.splitext(this_file)
117  if ext == '.pyc':
118  this_file = base + '.py'
119 
120  try:
121  # hack to replace the stream handler with a debug version
122  rosout_logger.removeHandler(default_ros_handler)
123  rosout_logger.addHandler(test_ros_handler)
124 
125  # trip wire tests
126  rospy.core.logdebug('debug')
127  rospy.core.loginfo('info')
128  rospy.core.logwarn('warn')
129  rospy.core.logerror('error')
130  rospy.core.logfatal('fatal')
131 
132  lf.seek(0, 2)
133  lout.seek(0, 2)
134  lerr.seek(0, 2)
135 
136  # because we just cant do lvl.upper()...
137  def lvl2loglvl(lvl):
138  return {
139  'debug': 'DEBUG',
140  'info': 'INFO',
141  'warn': 'WARNING',
142  'err': 'ERROR',
143  'fatal': 'CRITICAL',
144  }.get(lvl)
145 
146  # and because it s not consistent
147  def lvl2loglvl_stream(lvl):
148  return {
149  'debug': 'DEBUG',
150  'info': 'INFO',
151  'warn': 'WARN',
152  'err': 'ERROR',
153  'fatal': 'FATAL',
154  }.get(lvl)
155 
156  # test that they are exposed via top-level api
157  for lvl in ['debug', 'info', 'warn', 'err', 'fatal']:
158  logmthd = getattr(rospy, 'log' + lvl)
159  logmthd(lvl)
160 
161  log_file = ' '.join([
162  '\[rosout\]\[' + lvl2loglvl(lvl) + '\]',
163  '(\d+[-\/]\d+[-\/]\d+)',
164  '(\d+[:]\d+[:]\d+[,]\d+):',
165  lvl
166  ])
167  fileline = lf.readline()
168  # print("lf.readline(): " + fileline)
169 
170  if lvl in ['debug']:
171  self.assertTrue(len(fileline) == 0) # no log in file with logdebug
172  else: # proper format
173  self.assertTrue(bool(re.match(log_file, fileline)), msg="{0} doesn't match: {1}".format(fileline, log_file))
174 
175  log_out = ' '.join([
176  lvl2loglvl_stream(lvl),
177  lvl,
178  '[0-9]*\.[0-9]*',
179  '[0-9]*',
180  'rosout',
181  this_file,
182  '[0-9]*',
183  'TestRospyCore.test_loggers',
184  '/unnamed',
185  '[0-9]*\.[0-9]*',
186  ])
187  outline = lout.getvalue().splitlines()
188  errline = lerr.getvalue().splitlines()
189 
190  if lvl in ['info']:
191  self.assertTrue(len(outline) > 0)
192  # print("lout.getvalue(): " + outline[-1])
193  self.assertTrue(bool(re.match(log_out, outline[-1])), msg="{0}\n doesn't match: {1}".format(outline[-1], log_out))
194 
195  elif lvl in ['warn', 'err', 'fatal']:
196  self.assertTrue(len(errline) > 0)
197  # print("lerr.getvalue(): " + errline[-1])
198  self.assertTrue(bool(re.match(log_out, errline[-1])), msg="{0}\n doesn't match: {1}".format(errline[-1], log_out))
199 
200  finally:
201  lf.close()
202 
203  # restoring default ros handler
204  rosout_logger.removeHandler(test_ros_handler)
205  lout.close()
206  lerr.close()
207  rosout_logger.addHandler(default_ros_handler)
208 
209  finally:
210  # restoring format
211  os.environ['ROSCONSOLE_FORMAT'] = old_format
212 
214  def handle(reason):
215  pass
216  # cannot verify functionality, just crashing
217  rospy.core.add_shutdown_hook(handle)
218  try:
219  rospy.core.add_shutdown_hook(1)
220  self.fail_("add_shutdown_hook is not protected against invalid args")
221  except TypeError: pass
222  try:
223  rospy.core.add_shutdown_hook(1)
224  self.fail_("add_shutdown_hook is not protected against invalid args")
225  except TypeError: pass
226 
228  def handle1(reason):
229  pass
230  def handle2(reason):
231  pass
232  def handle3(reason):
233  pass
234  # cannot verify functionality, just coverage as well as ordering
235  rospy.core.add_shutdown_hook(handle1)
236  rospy.core.add_shutdown_hook(handle2)
237  rospy.core.add_preshutdown_hook(handle3)
238  self.assert_(handle3 in rospy.core._preshutdown_hooks)
239  self.assert_(handle2 in rospy.core._shutdown_hooks)
240  self.assert_(handle1 in rospy.core._shutdown_hooks)
241  try:
242  rospy.core.add_preshutdown_hook(1)
243  self.fail_("add_preshutdown_hook is not protected against invalid args")
244  except TypeError: pass
245  try:
246  rospy.core.add_preshutdown_hook(1)
247  self.fail_("add_preshutdown_hook is not protected against invalid args")
248  except TypeError: pass
249 
250  def test_get_ros_root(self):
251  try:
252  rospy.core.get_ros_root(env={}, required=True)
253  except:
254  pass
255  self.assertEquals(None, rospy.core.get_ros_root(env={}, required=False))
256  rr = "%s"%time.time()
257  self.assertEquals(rr, rospy.core.get_ros_root(env={'ROS_ROOT': rr}, required=False))
258  self.assertEquals(rr, rospy.core.get_ros_root(env={'ROS_ROOT': rr}, required=True))
259 
260  self.assertEquals(os.path.normpath(os.environ['ROS_ROOT']), rospy.core.get_ros_root(required=False))
261  def test_node_uri(self):
262  uri = "http://localhost-%s:1234"%random.randint(1, 1000)
263  self.assertEquals(None, rospy.core.get_node_uri())
264  rospy.core.set_node_uri(uri)
265  self.assertEquals(uri, rospy.core.get_node_uri())
266 
267  def test_initialized(self):
268  self.failIf(rospy.core.is_initialized())
269  rospy.core.set_initialized(True)
270  self.assert_(rospy.core.is_initialized())
271 
273  rospy.core._shutdown_flag = False
274  del rospy.core._shutdown_hooks[:]
275  # add a shutdown hook that throws an exception,
276  # signal_shutdown should be robust to it
277  rospy.core.add_shutdown_hook(shutdown_hook_exception)
278  rospy.core.signal_shutdown('test_exception')
279  rospy.core._shutdown_flag = False
280  del rospy.core._shutdown_hooks[:]
281 
282  def test_shutdown(self):
283  rospy.core._shutdown_flag = False
284  del rospy.core._shutdown_hooks[:]
285  global called, called2
286  called = called2 = None
287  self.failIf(rospy.core.is_shutdown())
288  rospy.core.add_shutdown_hook(shutdown_hook1)
289  reason = "reason %s"%time.time()
290  rospy.core.signal_shutdown(reason)
291  self.assertEquals(reason, called)
292  self.assert_(rospy.core.is_shutdown())
293 
294  # verify that shutdown hook is called immediately on add if already shutdown
295  rospy.core.add_shutdown_hook(shutdown_hook2)
296  self.assert_(called2 is not None)
297  rospy.core._shutdown_flag = False
298 
299  #TODO: move to teset_rospy_names
300  def test_valid_name(self):
301  # not forcing rospy to be pedantic -- yet, just try and do sanity checks
302  tests = ['/', 'srv', '/service', '/service1', 'serv/subserv']
303  caller_id = '/me'
304  for t in tests:
305  self.assert_(rospy.core.valid_name('p')(t, caller_id))
306  failures = ['ftp://foo', '', None, 1, True, 'http:', ' spaced ', ' ']
307  for f in failures:
308  try:
309  rospy.core.valid_name('p')(f, caller_id)
310  self.fail(f)
311  except rospy.core.ParameterInvalid:
312  pass
313 
314  def test_is_topic(self):
315  # not forcing rospy to be pedantic -- yet, just try and do sanity checks
316  caller_id = '/me'
317  tests = [
318  ('topic', '/node', '/topic'),
319  ('topic', '/ns/node', '/ns/topic'),
320  ('/topic', '/node', '/topic'),
321  ('~topic', '/node', '/node/topic'),
322  ('/topic1', '/node', '/topic1'),
323  ('top/sub', '/node', '/top/sub'),
324  ('top/sub', '/ns/node', '/ns/top/sub'),
325  ]
326 
327  for t, caller_id, v in tests:
328  self.assertEquals(v, rospy.core.is_topic('p')(t, caller_id))
329  failures = ['/', 'ftp://foo', '', None, 1, True, 'http:', ' spaced ', ' ']
330  for f in failures:
331  try:
332  rospy.core.is_topic('p')(f, caller_id)
333  self.fail(f)
334  except rospy.core.ParameterInvalid:
335  pass
336 
337  def test_xmlrpcapi(self):
338  # have to use 'is' so we don't accidentally invoke XMLRPC
339  self.assert_(rospy.core.xmlrpcapi(None) is None)
340  self.assert_(rospy.core.xmlrpcapi('localhost:1234') is None)
341  self.assert_(rospy.core.xmlrpcapi('http://') is None)
342  api = rospy.core.xmlrpcapi('http://localhost:1234')
343  self.assert_(api is not None)
344  try:
345  from xmlrpc.client import ServerProxy
346  except ImportError:
347  from xmlrpclib import ServerProxy
348  self.assert_(isinstance(api, ServerProxy))
349 
350 called = None
351 called2 = None
352 def shutdown_hook1(reason):
353  global called
354  print("HOOK", reason)
355  called = reason
356 def shutdown_hook2(reason):
357  global called2
358  print("HOOK2", reason)
359  called2 = reason
361  raise Exception("gotcha")
def shutdown_hook_exception(reason)
def shutdown_hook2(reason)
def shutdown_hook1(reason)


test_rospy
Author(s): Ken Conley
autogenerated on Sun Feb 3 2019 03:30:22