test_rospy_client_online.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2008, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 
34 import os
35 import sys
36 import struct
37 import unittest
38 import time
39 import random
40 import logging
41 import rosgraph.roslogging
42 
43 try:
44  from cStringIO import StringIO
45 except ImportError:
46  from io import StringIO
47 
48 import rosunit
49 
50 from threading import Thread
51 class TestTask(Thread):
52  def __init__(self, task):
53  Thread.__init__(self)
54  self.task = task
55  self.success = False
56  self.done = False
57  self.value = None
58 
59  def run(self):
60  try:
61  print("STARTING TASK")
62  self.value = self.task()
63  print("TASK HAS COMPLETED")
64  self.success = True
65  except:
66  import traceback
67  traceback.print_exc()
68  self.done = True
69 
70 class TestRospyClientOnline(unittest.TestCase):
71 
72  def __init__(self, *args):
73  unittest.TestCase.__init__(self, *args)
74  import rospy
75  rospy.init_node('test_rospy_online')
76 
77  def test_log(self):
78  rosout_logger = logging.getLogger('rosout')
79  import rospy
80 
81  self.assertTrue(len(rosout_logger.handlers) == 2)
82  self.assertTrue(rosout_logger.handlers[0], rosgraph.roslogging.RosStreamHandler)
83  self.assertTrue(rosout_logger.handlers[1], rospy.impl.rosout.RosOutHandler)
84 
85  default_ros_handler = rosout_logger.handlers[0]
86 
87  # Remap stdout for testing
88  lout = StringIO()
89  lerr = StringIO()
90  test_ros_handler = rosgraph.roslogging.RosStreamHandler(colorize=False, stdout=lout, stderr=lerr)
91 
92  try:
93  # hack to replace the stream handler with a debug version
94  rosout_logger.removeHandler(default_ros_handler)
95  rosout_logger.addHandler(test_ros_handler)
96 
97  import rospy
98  rospy.loginfo("test 1")
99  lout_last = lout.getvalue().splitlines()[-1]
100  self.assert_("test 1" in lout_last)
101 
102  rospy.logwarn("test 2")
103  lerr_last = lerr.getvalue().splitlines()[-1]
104  self.assert_("[WARN]" in lerr_last)
105  self.assert_("test 2" in lerr_last)
106 
107  rospy.logerr("test 3")
108  lerr_last = lerr.getvalue().splitlines()[-1]
109  self.assert_("[ERROR]" in lerr_last)
110  self.assert_("test 3" in lerr_last)
111 
112  rospy.logfatal("test 4")
113  lerr_last = lerr.getvalue().splitlines()[-1]
114  self.assert_("[FATAL]" in lerr_last)
115  self.assert_("test 4" in lerr_last)
116 
117  # logXXX_once
118  lout_len = -1
119  for i in range(3):
120  rospy.loginfo_once("test 1")
121  lout_last = lout.getvalue().splitlines()[-1]
122  if i == 0:
123  self.assert_("test 1" in lout_last)
124  lout_len = len(lout.getvalue())
125  else: # making sure the length of lout doesnt change
126  self.assert_(lout_len == len(lout.getvalue()))
127 
128  lerr_len = -1
129  for i in range(3):
130  rospy.logwarn_once("test 2")
131  lerr_last = lerr.getvalue().splitlines()[-1]
132  if i == 0:
133  self.assert_("test 2" in lerr_last)
134  lerr_len = len(lerr.getvalue())
135  else: # making sure the length of lerr doesnt change
136  self.assert_(lerr_len == len(lerr.getvalue()))
137 
138  lerr_len = -1
139  for i in range(3):
140  rospy.logerr_once("test 3")
141  lerr_last = lerr.getvalue().splitlines()[-1]
142  if i == 0:
143  self.assert_("test 3" in lerr_last)
144  lerr_len = len(lerr.getvalue())
145  else: # making sure the length of lerr doesnt change
146  self.assert_(lerr_len == len(lerr.getvalue()))
147 
148  lerr_len = -1
149  for i in range(3):
150  rospy.logfatal_once("test 4")
151  lerr_last = lerr.getvalue().splitlines()[-1]
152  if i == 0:
153  self.assert_("test 4" in lerr_last)
154  lerr_len = len(lerr.getvalue())
155  else: # making sure the length of lerr doesnt change
156  self.assert_(lerr_len == len(lerr.getvalue()))
157 
158  # logXXX_throttle
159  lout_len = -1
160  for i in range(3):
161  rospy.loginfo_throttle(3, "test 1")
162  lout_last = lout.getvalue().splitlines()[-1]
163  if i == 0:
164  self.assert_("test 1" in lout_last)
165  lout_len = len(lout.getvalue())
166  rospy.sleep(rospy.Duration(1))
167  elif i == 1: # making sure the length of lerr doesnt change
168  self.assert_(lout_len == len(lout.getvalue()))
169  rospy.sleep(rospy.Duration(2))
170  else:
171  self.assert_("test 1" in lout_last)
172 
173  lerr_len = -1
174  for i in range(3):
175  rospy.logwarn_throttle(3, "test 2")
176  lerr_last = lerr.getvalue().splitlines()[-1]
177  if i == 0:
178  self.assert_("test 2" in lerr_last)
179  lerr_len = len(lerr.getvalue())
180  rospy.sleep(rospy.Duration(1))
181  elif i == 1: # making sure the length of lerr doesnt change
182  self.assert_(lerr_len == len(lerr.getvalue()))
183  rospy.sleep(rospy.Duration(2))
184  else:
185  self.assert_("test 2" in lerr_last)
186 
187  lerr_len = -1
188  for i in range(3):
189  rospy.logerr_throttle(3, "test 3")
190  lerr_last = lerr.getvalue().splitlines()[-1]
191  if i == 0:
192  self.assert_("test 3" in lerr_last)
193  lerr_len = len(lerr.getvalue())
194  rospy.sleep(rospy.Duration(1))
195  elif i == 1: # making sure the length of lerr doesnt change
196  self.assert_(lerr_len == len(lerr.getvalue()))
197  rospy.sleep(rospy.Duration(2))
198  else:
199  self.assert_("test 3" in lerr_last)
200 
201  lerr_len = -1
202  for i in range(3):
203  rospy.logfatal_throttle(3, "test 4")
204  lerr_last = lerr.getvalue().splitlines()[-1]
205  if i == 0:
206  self.assert_("test 4" in lerr_last)
207  lerr_len = len(lerr.getvalue())
208  rospy.sleep(rospy.Duration(1))
209  elif i == 1: # making sure the length of lerr doesnt change
210  self.assert_(lerr_len == len(lerr.getvalue()))
211  rospy.sleep(rospy.Duration(2))
212  else:
213  self.assert_("test 4" in lerr_last)
214 
215  rospy.loginfo("test child logger 1", logger_name="log1")
216  lout_last = lout.getvalue().splitlines()[-1]
217  self.assert_("test child logger 1" in lout_last)
218 
219  rospy.logwarn("test child logger 2", logger_name="log2")
220  lerr_last = lerr.getvalue().splitlines()[-1]
221  self.assert_("[WARN]" in lerr_last)
222  self.assert_("test child logger 2" in lerr_last)
223 
224  rospy.logerr("test child logger 3", logger_name="log3")
225  lerr_last = lerr.getvalue().splitlines()[-1]
226  self.assert_("[ERROR]" in lerr_last)
227  self.assert_("test child logger 3" in lerr_last)
228 
229  rospy.logfatal("test child logger 4", logger_name="log4")
230  lerr_last = lerr.getvalue().splitlines()[-1]
231  self.assert_("[FATAL]" in lerr_last)
232  self.assert_("test child logger 4" in lerr_last)
233 
234  finally:
235  # restoring default ros handler
236  rosout_logger.removeHandler(test_ros_handler)
237  lout.close()
238  lerr.close()
239  rosout_logger.addHandler(default_ros_handler)
240 
242  # lazy-import for coverage
243  import rospy
244  import time
245 
246  # test wait for service in success case
247  def task1():
248  rospy.wait_for_service('add_two_ints')
249  timeout_t = time.time() + 5.
250  t1 = TestTask(task1)
251  t1.start()
252  while not t1.done and time.time() < timeout_t:
253  time.sleep(0.5)
254  self.assert_(t1.success)
255 
256  # test wait for service with timeout in success case
257  def task2():
258  rospy.wait_for_service('add_two_ints', timeout=1.)
259  timeout_t = time.time() + 5.
260  t2 = TestTask(task2)
261  t2.start()
262  while not t2.done and time.time() < timeout_t:
263  time.sleep(0.5)
264  self.assert_(t2.success)
265 
266  # test wait for service in failure case
267  def task3():
268  # #2842 raising bounds from .1 to .3 for amazon VM
269  rospy.wait_for_service('fake_service', timeout=0.3)
270  timeout_t = time.time() + 2.
271  t3 = TestTask(task3)
272  t3.start()
273  while not t3.done and time.time() < timeout_t:
274  time.sleep(0.5)
275  self.assert_(t3.done)
276  self.failIf(t3.success)
277 
279  """
280  Test ServiceProxy.wait_for_service
281  """
282  # lazy-import for coverage
283  import rospy
284  import time
285  import test_rosmaster.srv
286 
287  # test wait for service in success case
288  proxy = rospy.ServiceProxy('add_two_ints', test_rosmaster.srv.AddTwoInts)
289  class ProxyTask(object):
290  def __init__(self, proxy, timeout=None):
291  self.proxy = proxy
292  self.timeout = timeout
293  def __call__(self):
294  if self.timeout is None:
295  self.proxy.wait_for_service()
296  else:
297  self.proxy.wait_for_service(timeout=self.timeout)
298  timeout_t = time.time() + 5.
299  t1 = TestTask(ProxyTask(proxy))
300  t1.start()
301  while not t1.done and time.time() < timeout_t:
302  time.sleep(0.5)
303  self.assert_(t1.success)
304 
305  # test wait for service with timeout in success case
306  timeout_t = time.time() + 5.
307  t2 = TestTask(ProxyTask(proxy, timeout=1.))
308  t2.start()
309  while not t2.done and time.time() < timeout_t:
310  time.sleep(0.5)
311  self.assert_(t2.success)
312 
313  # test wait for service in failure case
314  fake_proxy = rospy.ServiceProxy('fake_service', test_rosmaster.srv.AddTwoInts)
315  timeout_t = time.time() + 2.
316  t3 = TestTask(ProxyTask(fake_proxy, timeout=0.1))
317  t3.start()
318  while not t3.done and time.time() < timeout_t:
319  time.sleep(0.5)
320  self.assert_(t3.done)
321  self.failIf(t3.success)
322 
323  def test_sleep(self):
324  import rospy
325  import time
326  t = time.time()
327  rospy.sleep(0.1)
328  dur = time.time() - t
329  # #2842 raising bounds from .01 to .03 for amazon VM
330 
331  # make sure sleep is approximately right
332  self.assert_(abs(dur - 0.1) < 0.03, dur)
333 
334  t = time.time()
335  rospy.sleep(rospy.Duration.from_sec(0.1))
336  dur = time.time() - t
337  # make sure sleep is approximately right
338  self.assert_(abs(dur - 0.1) < 0.03, dur)
339 
340  # sleep for neg duration
341  t = time.time()
342  rospy.sleep(rospy.Duration.from_sec(-10.))
343  dur = time.time() - t
344  # make sure returned immediately
345  self.assert_(abs(dur) < 0.1, dur)
346 
347  def test_Rate(self):
348  import rospy
349  import time
350  t = time.time()
351  count = 0
352  r = rospy.Rate(10)
353  for x in range(10):
354  r.sleep()
355  dur = time.time() - t
356  # make sure sleep is approximately right
357  self.assert_(abs(dur - 1.0) < 0.5, dur)
358 
359 
360  def test_param_server(self):
361  # this isn't a parameter server test, just checking that the rospy bindings work
362  import rospy
363 
364  try:
365  rospy.get_param('not_a_param')
366  self.fail("should have raised KeyError")
367  except KeyError: pass
368  self.assertEquals('default_val', rospy.get_param('not_a_param', 'default_val') )
369 
370  p = rospy.get_param('/param')
371  self.assertEquals("value", p)
372  p = rospy.get_param('param')
373  self.assertEquals("value", p)
374  p = rospy.get_param('/group/param')
375  self.assertEquals("group_value", p)
376  p = rospy.get_param('group/param')
377  self.assertEquals("group_value", p)
378 
379  self.assertEquals('/param', rospy.search_param('param'))
380 
381  names = rospy.get_param_names()
382  self.assert_('/param' in names)
383  self.assert_('/group/param' in names)
384 
385  for p in ['/param', 'param', 'group/param', '/group/param']:
386  self.assert_(rospy.has_param(p))
387 
388  rospy.set_param('param2', 'value2')
389  self.assert_(rospy.has_param('param2'))
390  self.assertEquals('value2', rospy.get_param('param2'))
391  rospy.delete_param('param2')
392  self.failIf(rospy.has_param('param2'))
393  try:
394  rospy.get_param('param2')
395  self.fail("should have raised KeyError")
396  except KeyError: pass
397 
399  # lazy-import for coverage
400  import rospy
401  import std_msgs.msg
402  import time
403 
404  # test standard wait for message
405  def task1():
406  return rospy.wait_for_message('chatter', std_msgs.msg.String)
407  timeout_t = time.time() + 5.
408  t1 = TestTask(task1)
409  t1.start()
410  while not t1.done and time.time() < timeout_t:
411  time.sleep(0.5)
412  self.assert_(t1.success)
413  self.assert_('hello' in t1.value.data)
414 
415  # test wait for message with timeout
416  def task2():
417  return rospy.wait_for_message('chatter', std_msgs.msg.String, timeout=2.)
418  timeout_t = time.time() + 5.
419  t2 = TestTask(task2)
420  t2.start()
421  while not t2.done and time.time() < timeout_t:
422  time.sleep(0.5)
423  self.assert_(t2.success)
424  self.assert_('hello' in t2.value.data)
425 
426  # test wait for message with timeout FAILURE
427  def task3():
428  # #2842 raising bounds from .1 to .3 for amazon VM
429  return rospy.wait_for_message('fake_topic', std_msgs.msg.String, timeout=.3)
430  timeout_t = time.time() + 2.
431  t3 = TestTask(task3)
432  t3.start()
433  while not t3.done and time.time() < timeout_t:
434  time.sleep(0.5)
435  self.failIf(t3.success)
436  self.assert_(t3.done)
437  self.assert_(t3.value is None)
438 
439 if __name__ == '__main__':
440  rosunit.unitrun('test_rospy', sys.argv[0], TestRospyClientOnline, coverage_packages=['rospy.client', 'rospy.msproxy'])


test_rospy
Author(s): Ken Conley
autogenerated on Sun Feb 3 2019 03:30:22