test_rospy_core.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2008, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 
34 from __future__ import print_function
35 
36 import os
37 import sys
38 import struct
39 import unittest
40 import time
41 import random
42 
43 import rospy
44 
45 class TestRospyCore(unittest.TestCase):
46 
48  from rospy.core import parse_rosrpc_uri
49  valid = [('rosrpc://localhost:1234/', 'localhost', 1234),
50  ('rosrpc://localhost2:1234', 'localhost2', 1234),
51  ('rosrpc://third:1234/path/bar', 'third', 1234),
52  ('rosrpc://foo.com:1/', 'foo.com', 1),
53  ('rosrpc://foo.com:1/', 'foo.com', 1)]
54  for t, addr, port in valid:
55  paddr, pport = rospy.core.parse_rosrpc_uri(t)
56  self.assertEquals(addr, paddr)
57  self.assertEquals(port, pport)
58  # validate that it's a top-level API method
59  self.assertEquals(rospy.core.parse_rosrpc_uri(t), rospy.parse_rosrpc_uri(t))
60  invalid = ['rosrpc://:1234/', 'rosrpc://localhost', 'http://localhost:1234/']
61  for i in invalid:
62  try:
63  parse_rosrpc_uri(i)
64  self.fail("%s was an invalid rosrpc uri"%i)
65  except: pass
66 
67  def test_loggers(self):
68  # trip wire tests
69  import rospy.core
70  rospy.core.logdebug('debug')
71  rospy.core.logwarn('warn')
72  rospy.core.logout('out')
73  rospy.core.logerr('err')
74  rospy.core.logfatal('fatal')
75  # test that they are exposed via top-level api
76  import rospy
77  rospy.logdebug('debug')
78  rospy.logwarn('warn')
79  rospy.logout('out')
80  rospy.logerr('err')
81  rospy.logfatal('fatal')
82 
84  def handle(reason):
85  pass
86  # cannot verify functionality, just crashing
87  rospy.core.add_shutdown_hook(handle)
88  try:
89  rospy.core.add_shutdown_hook(1)
90  self.fail_("add_shutdown_hook is not protected against invalid args")
91  except TypeError: pass
92  try:
93  rospy.core.add_shutdown_hook(1)
94  self.fail_("add_shutdown_hook is not protected against invalid args")
95  except TypeError: pass
96 
98  def handle1(reason):
99  pass
100  def handle2(reason):
101  pass
102  def handle3(reason):
103  pass
104  # cannot verify functionality, just coverage as well as ordering
105  rospy.core.add_shutdown_hook(handle1)
106  rospy.core.add_shutdown_hook(handle2)
107  rospy.core.add_preshutdown_hook(handle3)
108  self.assert_(handle3 in rospy.core._preshutdown_hooks)
109  self.assert_(handle2 in rospy.core._shutdown_hooks)
110  self.assert_(handle1 in rospy.core._shutdown_hooks)
111  try:
112  rospy.core.add_preshutdown_hook(1)
113  self.fail_("add_preshutdown_hook is not protected against invalid args")
114  except TypeError: pass
115  try:
116  rospy.core.add_preshutdown_hook(1)
117  self.fail_("add_preshutdown_hook is not protected against invalid args")
118  except TypeError: pass
119 
120  def test_get_ros_root(self):
121  try:
122  rospy.core.get_ros_root(env={}, required=True)
123  except:
124  pass
125  self.assertEquals(None, rospy.core.get_ros_root(env={}, required=False))
126  rr = "%s"%time.time()
127  self.assertEquals(rr, rospy.core.get_ros_root(env={'ROS_ROOT': rr}, required=False))
128  self.assertEquals(rr, rospy.core.get_ros_root(env={'ROS_ROOT': rr}, required=True))
129 
130  self.assertEquals(os.path.normpath(os.environ['ROS_ROOT']), rospy.core.get_ros_root(required=False))
131  def test_node_uri(self):
132  uri = "http://localhost-%s:1234"%random.randint(1, 1000)
133  self.assertEquals(None, rospy.core.get_node_uri())
134  rospy.core.set_node_uri(uri)
135  self.assertEquals(uri, rospy.core.get_node_uri())
136 
137  def test_initialized(self):
138  self.failIf(rospy.core.is_initialized())
139  rospy.core.set_initialized(True)
140  self.assert_(rospy.core.is_initialized())
141 
143  rospy.core._shutdown_flag = False
144  del rospy.core._shutdown_hooks[:]
145  # add a shutdown hook that throws an exception,
146  # signal_shutdown should be robust to it
147  rospy.core.add_shutdown_hook(shutdown_hook_exception)
148  rospy.core.signal_shutdown('test_exception')
149  rospy.core._shutdown_flag = False
150  del rospy.core._shutdown_hooks[:]
151 
152  def test_shutdown(self):
153  rospy.core._shutdown_flag = False
154  del rospy.core._shutdown_hooks[:]
155  global called, called2
156  called = called2 = None
157  self.failIf(rospy.core.is_shutdown())
158  rospy.core.add_shutdown_hook(shutdown_hook1)
159  reason = "reason %s"%time.time()
160  rospy.core.signal_shutdown(reason)
161  self.assertEquals(reason, called)
162  self.assert_(rospy.core.is_shutdown())
163 
164  # verify that shutdown hook is called immediately on add if already shutdown
165  rospy.core.add_shutdown_hook(shutdown_hook2)
166  self.assert_(called2 is not None)
167  rospy.core._shutdown_flag = False
168 
169  #TODO: move to teset_rospy_names
170  def test_valid_name(self):
171  # not forcing rospy to be pedantic -- yet, just try and do sanity checks
172  tests = ['/', 'srv', '/service', '/service1', 'serv/subserv']
173  caller_id = '/me'
174  for t in tests:
175  self.assert_(rospy.core.valid_name('p')(t, caller_id))
176  failures = ['ftp://foo', '', None, 1, True, 'http:', ' spaced ', ' ']
177  for f in failures:
178  try:
179  rospy.core.valid_name('p')(f, caller_id)
180  self.fail(f)
181  except rospy.core.ParameterInvalid:
182  pass
183 
184  def test_is_topic(self):
185  # not forcing rospy to be pedantic -- yet, just try and do sanity checks
186  caller_id = '/me'
187  tests = [
188  ('topic', '/node', '/topic'),
189  ('topic', '/ns/node', '/ns/topic'),
190  ('/topic', '/node', '/topic'),
191  ('~topic', '/node', '/node/topic'),
192  ('/topic1', '/node', '/topic1'),
193  ('top/sub', '/node', '/top/sub'),
194  ('top/sub', '/ns/node', '/ns/top/sub'),
195  ]
196 
197  for t, caller_id, v in tests:
198  self.assertEquals(v, rospy.core.is_topic('p')(t, caller_id))
199  failures = ['/', 'ftp://foo', '', None, 1, True, 'http:', ' spaced ', ' ']
200  for f in failures:
201  try:
202  rospy.core.is_topic('p')(f, caller_id)
203  self.fail(f)
204  except rospy.core.ParameterInvalid:
205  pass
206 
208  # can't test actual functionality
209  try:
210  rospy.core.configure_logging("/")
211  self.fail("configure_logging should not accept a the root namespace as the node_name param")
212  except: pass
213  rospy.core.configure_logging("/node/name")
214 
215  def test_xmlrpcapi(self):
216  # have to use 'is' so we don't accidentally invoke XMLRPC
217  self.assert_(rospy.core.xmlrpcapi(None) is None)
218  self.assert_(rospy.core.xmlrpcapi('localhost:1234') is None)
219  self.assert_(rospy.core.xmlrpcapi('http://') is None)
220  api = rospy.core.xmlrpcapi('http://localhost:1234')
221  self.assert_(api is not None)
222  try:
223  from xmlrpc.client import ServerProxy
224  except ImportError:
225  from xmlrpclib import ServerProxy
226  self.assert_(isinstance(api, ServerProxy))
227 
228 called = None
229 called2 = None
230 def shutdown_hook1(reason):
231  global called
232  print("HOOK", reason)
233  called = reason
234 def shutdown_hook2(reason):
235  global called2
236  print("HOOK2", reason)
237  called2 = reason
239  raise Exception("gotcha")
def shutdown_hook_exception(reason)
def shutdown_hook2(reason)
def shutdown_hook1(reason)


test_rospy
Author(s): Ken Conley, Dirk Thomas
autogenerated on Mon Nov 2 2020 03:52:56