test_rospy_client_online.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2008, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 
34 import os
35 import sys
36 import struct
37 import unittest
38 import time
39 import random
40 import logging
41 import rosgraph.roslogging
42 
43 try:
44  from cStringIO import StringIO
45 except ImportError:
46  from io import StringIO
47 
48 import rosunit
49 
50 from threading import Thread
51 class TestTask(Thread):
52  def __init__(self, task):
53  Thread.__init__(self)
54  self.task = task
55  self.success = False
56  self.done = False
57  self.value = None
58 
59  def run(self):
60  try:
61  print("STARTING TASK")
62  self.value = self.task()
63  print("TASK HAS COMPLETED")
64  self.success = True
65  except:
66  import traceback
67  traceback.print_exc()
68  self.done = True
69 
70 class TestRospyClientOnline(unittest.TestCase):
71 
72  def __init__(self, *args):
73  unittest.TestCase.__init__(self, *args)
74  import rospy
75  rospy.init_node('test_rospy_online')
76 
77  def test_log(self):
78  rosout_logger = logging.getLogger('rosout')
79  import rospy
80 
81  self.assertTrue(len(rosout_logger.handlers) == 2)
82  self.assertTrue(rosout_logger.handlers[0], rosgraph.roslogging.RosStreamHandler)
83  self.assertTrue(rosout_logger.handlers[1], rospy.impl.rosout.RosOutHandler)
84 
85  default_ros_handler = rosout_logger.handlers[0]
86 
87  # Remap stdout for testing
88  lout = StringIO()
89  lerr = StringIO()
90  test_ros_handler = rosgraph.roslogging.RosStreamHandler(colorize=False, stdout=lout, stderr=lerr)
91 
92  try:
93  # hack to replace the stream handler with a debug version
94  rosout_logger.removeHandler(default_ros_handler)
95  rosout_logger.addHandler(test_ros_handler)
96 
97  import rospy
98  rospy.loginfo("test 1")
99  lout_last = lout.getvalue().splitlines()[-1]
100  self.assertTrue("test 1" in lout_last)
101 
102  rospy.logwarn("test 2")
103  lerr_last = lerr.getvalue().splitlines()[-1]
104  self.assertTrue("[WARN]" in lerr_last)
105  self.assertTrue("test 2" in lerr_last)
106 
107  rospy.logerr("test 3")
108  lerr_last = lerr.getvalue().splitlines()[-1]
109  self.assertTrue("[ERROR]" in lerr_last)
110  self.assertTrue("test 3" in lerr_last)
111 
112  rospy.logfatal("test 4")
113  lerr_last = lerr.getvalue().splitlines()[-1]
114  self.assertTrue("[FATAL]" in lerr_last)
115  self.assertTrue("test 4" in lerr_last)
116 
117  # logXXX_once
118  lout_len = -1
119  for i in range(3):
120  rospy.loginfo_once("test 1")
121  lout_last = lout.getvalue().splitlines()[-1]
122  if i == 0:
123  self.assertTrue("test 1" in lout_last)
124  lout_len = len(lout.getvalue())
125  else: # making sure the length of lout doesnt change
126  self.assertTrue(lout_len == len(lout.getvalue()))
127 
128  lerr_len = -1
129  for i in range(3):
130  rospy.logwarn_once("test 2")
131  lerr_last = lerr.getvalue().splitlines()[-1]
132  if i == 0:
133  self.assertTrue("test 2" in lerr_last)
134  lerr_len = len(lerr.getvalue())
135  else: # making sure the length of lerr doesnt change
136  self.assertTrue(lerr_len == len(lerr.getvalue()))
137 
138  lerr_len = -1
139  for i in range(3):
140  rospy.logerr_once("test 3")
141  lerr_last = lerr.getvalue().splitlines()[-1]
142  if i == 0:
143  self.assertTrue("test 3" in lerr_last)
144  lerr_len = len(lerr.getvalue())
145  else: # making sure the length of lerr doesnt change
146  self.assertTrue(lerr_len == len(lerr.getvalue()))
147 
148  lerr_len = -1
149  for i in range(3):
150  rospy.logfatal_once("test 4")
151  lerr_last = lerr.getvalue().splitlines()[-1]
152  if i == 0:
153  self.assertTrue("test 4" in lerr_last)
154  lerr_len = len(lerr.getvalue())
155  else: # making sure the length of lerr doesnt change
156  self.assertTrue(lerr_len == len(lerr.getvalue()))
157 
158  # logXXX_throttle
159  lout_len = -1
160  for i in range(3):
161  rospy.loginfo_throttle(3, "test 1")
162  lout_last = lout.getvalue().splitlines()[-1]
163  if i == 0:
164  self.assertTrue("test 1" in lout_last)
165  lout_len = len(lout.getvalue())
166  rospy.sleep(rospy.Duration(1))
167  elif i == 1: # making sure the length of lout doesnt change
168  self.assertTrue(lout_len == len(lout.getvalue()))
169  rospy.sleep(rospy.Duration(2))
170  else:
171  self.assertTrue("test 1" in lout_last)
172 
173  lerr_len = -1
174  for i in range(3):
175  rospy.logwarn_throttle(3, "test 2")
176  lerr_last = lerr.getvalue().splitlines()[-1]
177  if i == 0:
178  self.assertTrue("test 2" in lerr_last)
179  lerr_len = len(lerr.getvalue())
180  rospy.sleep(rospy.Duration(1))
181  elif i == 1: # making sure the length of lerr doesnt change
182  self.assertTrue(lerr_len == len(lerr.getvalue()))
183  rospy.sleep(rospy.Duration(2))
184  else:
185  self.assertTrue("test 2" in lerr_last)
186 
187  lerr_len = -1
188  for i in range(3):
189  rospy.logerr_throttle(3, "test 3")
190  lerr_last = lerr.getvalue().splitlines()[-1]
191  if i == 0:
192  self.assertTrue("test 3" in lerr_last)
193  lerr_len = len(lerr.getvalue())
194  rospy.sleep(rospy.Duration(1))
195  elif i == 1: # making sure the length of lerr doesnt change
196  self.assertTrue(lerr_len == len(lerr.getvalue()))
197  rospy.sleep(rospy.Duration(2))
198  else:
199  self.assertTrue("test 3" in lerr_last)
200 
201  lerr_len = -1
202  for i in range(3):
203  rospy.logfatal_throttle(3, "test 4")
204  lerr_last = lerr.getvalue().splitlines()[-1]
205  if i == 0:
206  self.assertTrue("test 4" in lerr_last)
207  lerr_len = len(lerr.getvalue())
208  rospy.sleep(rospy.Duration(1))
209  elif i == 1: # making sure the length of lerr doesnt change
210  self.assertTrue(lerr_len == len(lerr.getvalue()))
211  rospy.sleep(rospy.Duration(2))
212  else:
213  self.assertTrue("test 4" in lerr_last)
214 
215  # logXXX_throttle_identical
216  lout_len = -1
217  for i in range(5):
218  if i < 2:
219  test_text = "test 1"
220  else:
221  test_text = "test 1a"
222  rospy.loginfo_throttle_identical(2, test_text)
223  lout_last = lout.getvalue().splitlines()[-1]
224  if i == 0: # Confirm test text was logged
225  self.assertTrue(test_text in lout_last)
226  lout_len = len(lout.getvalue())
227  elif i == 1: # Confirm the length of lout hasn't changed
228  self.assertTrue(lout_len == len(lout.getvalue()))
229  elif i == 2: # Confirm the new message was logged correctly
230  self.assertTrue(test_text in lout_last)
231  lout_len = len(lout.getvalue())
232  rospy.sleep(rospy.Duration(2))
233  elif i == 3: # Confirm the length of lout has changed
234  self.assertTrue(lout_len != len(lout.getvalue()))
235  else: # Confirm new test text is in last log
236  self.assertTrue(test_text in lout_last)
237 
238  lerr_len = -1
239  for i in range(5):
240  if i < 2:
241  test_text = "test 2"
242  else:
243  test_text = "test 2a"
244  rospy.logwarn_throttle_identical(2, test_text)
245  lerr_last = lerr.getvalue().splitlines()[-1]
246  if i == 0: # Confirm test text was logged
247  self.assertTrue(test_text in lerr_last)
248  lerr_len = len(lerr.getvalue())
249  elif i == 1: # Confirm the length of lerr hasn't changed
250  self.assertTrue(lerr_len == len(lerr.getvalue()))
251  elif i == 2: # Confirm the new message was logged correctly
252  self.assertTrue(test_text in lerr_last)
253  lerr_len = len(lerr.getvalue())
254  rospy.sleep(rospy.Duration(2))
255  elif i == 3: # Confirm the length of lerr has incremented
256  self.assertTrue(lerr_len != len(lerr.getvalue()))
257  else: # Confirm new test text is in last log
258  self.assertTrue(test_text in lerr_last)
259 
260  lerr_len = -1
261  for i in range(5):
262  if i < 2:
263  test_text = "test 3"
264  else:
265  test_text = "test 3a"
266  rospy.logerr_throttle_identical(2, test_text)
267  lerr_last = lerr.getvalue().splitlines()[-1]
268  if i == 0: # Confirm test text was logged
269  self.assertTrue(test_text in lerr_last)
270  lerr_len = len(lerr.getvalue())
271  elif i == 1: # Confirm the length of lerr hasn't changed
272  self.assertTrue(lerr_len == len(lerr.getvalue()))
273  elif i == 2: # Confirm the new message was logged correctly
274  self.assertTrue(test_text in lerr_last)
275  lerr_len = len(lerr.getvalue())
276  rospy.sleep(rospy.Duration(2))
277  elif i == 3: # Confirm the length of lerr has incremented
278  self.assertTrue(lerr_len != len(lerr.getvalue()))
279  else: # Confirm new test text is in last log
280  self.assertTrue(test_text in lerr_last)
281 
282  lerr_len = -1
283  for i in range(5):
284  if i < 2:
285  test_text = "test 4"
286  else:
287  test_text = "test 4a"
288  rospy.logfatal_throttle_identical(2, test_text)
289  lerr_last = lerr.getvalue().splitlines()[-1]
290  if i == 0: # Confirm test text was logged
291  self.assertTrue(test_text in lerr_last)
292  lerr_len = len(lerr.getvalue())
293  elif i == 1: # Confirm the length of lerr hasn't changed
294  self.assertTrue(lerr_len == len(lerr.getvalue()))
295  elif i == 2: # Confirm the new message was logged correctly
296  self.assertTrue(test_text in lerr_last)
297  lerr_len = len(lerr.getvalue())
298  rospy.sleep(rospy.Duration(2))
299  elif i == 3: # Confirm the length of lerr has incremented
300  self.assertTrue(lerr_len != len(lerr.getvalue()))
301  else: # Confirm new test text is in last log
302  self.assertTrue(test_text in lerr_last)
303 
304  rospy.loginfo("test child logger 1", logger_name="log1")
305  lout_last = lout.getvalue().splitlines()[-1]
306  self.assertTrue("test child logger 1" in lout_last)
307 
308  rospy.logwarn("test child logger 2", logger_name="log2")
309  lerr_last = lerr.getvalue().splitlines()[-1]
310  self.assertTrue("[WARN]" in lerr_last)
311  self.assertTrue("test child logger 2" in lerr_last)
312 
313  rospy.logerr("test child logger 3", logger_name="log3")
314  lerr_last = lerr.getvalue().splitlines()[-1]
315  self.assertTrue("[ERROR]" in lerr_last)
316  self.assertTrue("test child logger 3" in lerr_last)
317 
318  rospy.logfatal("test child logger 4", logger_name="log4")
319  lerr_last = lerr.getvalue().splitlines()[-1]
320  self.assertTrue("[FATAL]" in lerr_last)
321  self.assertTrue("test child logger 4" in lerr_last)
322 
323  finally:
324  # restoring default ros handler
325  rosout_logger.removeHandler(test_ros_handler)
326  lout.close()
327  lerr.close()
328  rosout_logger.addHandler(default_ros_handler)
329 
331  # lazy-import for coverage
332  import rospy
333  import time
334 
335  # test wait for service in success case
336  def task1():
337  rospy.wait_for_service('add_two_ints')
338  timeout_t = time.time() + 5.
339  t1 = TestTask(task1)
340  t1.start()
341  while not t1.done and time.time() < timeout_t:
342  time.sleep(0.5)
343  self.assertTrue(t1.success)
344 
345  # test wait for service with timeout in success case
346  def task2():
347  rospy.wait_for_service('add_two_ints', timeout=1.)
348  timeout_t = time.time() + 5.
349  t2 = TestTask(task2)
350  t2.start()
351  while not t2.done and time.time() < timeout_t:
352  time.sleep(0.5)
353  self.assertTrue(t2.success)
354 
355  # test wait for service in failure case
356  def task3():
357  # #2842 raising bounds from .1 to .3 for amazon VM
358  rospy.wait_for_service('fake_service', timeout=0.3)
359  timeout_t = time.time() + 2.
360  t3 = TestTask(task3)
361  t3.start()
362  while not t3.done and time.time() < timeout_t:
363  time.sleep(0.5)
364  self.assertTrue(t3.done)
365  self.assertFalse(t3.success)
366 
368  # lazy-import for coverage
369  import rospy
370  import time
371 
372  # test wait for service with timeout in success case
373  def task2():
374  rospy.wait_for_service('add_two_ints',
375  timeout=rospy.Duration.from_sec(1.0))
376  timeout_t = time.time() + 5.
377  t2 = TestTask(task2)
378  t2.start()
379  while not t2.done and time.time() < timeout_t:
380  time.sleep(0.5)
381  self.assertTrue(t2.success)
382 
383  # test wait for service in failure case
384  def task3():
385  # #2842 raising bounds from .1 to .3 for amazon VM
386  rospy.wait_for_service('fake_service',
387  timeout=rospy.Duration.from_sec(0.3))
388  timeout_t = time.time() + 2.
389  t3 = TestTask(task3)
390  t3.start()
391  while not t3.done and time.time() < timeout_t:
392  time.sleep(0.5)
393  self.assertTrue(t3.done)
394  self.assertFalse(t3.success)
395 
397  """
398  Test ServiceProxy.wait_for_service
399  """
400  # lazy-import for coverage
401  import rospy
402  import time
403  import test_rosmaster.srv
404 
405  # test wait for service in success case
406  proxy = rospy.ServiceProxy('add_two_ints', test_rosmaster.srv.AddTwoInts)
407  class ProxyTask(object):
408  def __init__(self, proxy, timeout=None):
409  self.proxy = proxy
410  self.timeout = timeout
411  def __call__(self):
412  if self.timeout is None:
413  self.proxy.wait_for_service()
414  else:
415  self.proxy.wait_for_service(timeout=self.timeout)
416  timeout_t = time.time() + 5.
417  t1 = TestTask(ProxyTask(proxy))
418  t1.start()
419  while not t1.done and time.time() < timeout_t:
420  time.sleep(0.5)
421  self.assertTrue(t1.success)
422 
423  # test wait for service with timeout in success case
424  timeout_t = time.time() + 5.
425  t2 = TestTask(ProxyTask(proxy, timeout=1.))
426  t2.start()
427  while not t2.done and time.time() < timeout_t:
428  time.sleep(0.5)
429  self.assertTrue(t2.success)
430 
431  # test wait for service in failure case
432  fake_proxy = rospy.ServiceProxy('fake_service', test_rosmaster.srv.AddTwoInts)
433  timeout_t = time.time() + 2.
434  t3 = TestTask(ProxyTask(fake_proxy, timeout=0.1))
435  t3.start()
436  while not t3.done and time.time() < timeout_t:
437  time.sleep(0.5)
438  self.assertTrue(t3.done)
439  self.assertFalse(t3.success)
440 
442  """
443  Test ServiceProxy.wait_for_service
444  """
445  # lazy-import for coverage
446  import rospy
447  import time
448  import test_rosmaster.srv
449 
450  # test wait for service in success case
451  proxy = rospy.ServiceProxy('add_two_ints', test_rosmaster.srv.AddTwoInts)
452  class ProxyTask(object):
453  def __init__(self, proxy, timeout=None):
454  self.proxy = proxy
455  self.timeout = timeout
456  def __call__(self):
457  if self.timeout is None:
458  self.proxy.wait_for_service()
459  else:
460  self.proxy.wait_for_service(timeout=self.timeout)
461 
462  # test wait for service with timeout in success case
463  timeout_t = time.time() + 5.
464  t2 = TestTask(ProxyTask(proxy, timeout=rospy.Duration.from_sec(1.)))
465  t2.start()
466  while not t2.done and time.time() < timeout_t:
467  time.sleep(0.5)
468  self.assertTrue(t2.success)
469 
470  # test wait for service in failure case
471  fake_proxy = rospy.ServiceProxy('fake_service', test_rosmaster.srv.AddTwoInts)
472  timeout_t = time.time() + 2.
473  t3 = TestTask(ProxyTask(fake_proxy, timeout=rospy.Duration.from_sec(0.1)))
474  t3.start()
475  while not t3.done and time.time() < timeout_t:
476  time.sleep(0.5)
477  self.assertTrue(t3.done)
478  self.assertFalse(t3.success)
479 
480  def test_sleep(self):
481  import rospy
482  import time
483  t = time.time()
484  rospy.sleep(0.1)
485  dur = time.time() - t
486  # #2842 raising bounds from .01 to .03 for amazon VM
487 
488  # make sure sleep is approximately right
489  self.assertTrue(abs(dur - 0.1) < 0.03, dur)
490 
491  t = time.time()
492  rospy.sleep(rospy.Duration.from_sec(0.1))
493  dur = time.time() - t
494  # make sure sleep is approximately right
495  self.assertTrue(abs(dur - 0.1) < 0.03, dur)
496 
497  # sleep for neg duration
498  t = time.time()
499  rospy.sleep(rospy.Duration.from_sec(-10.))
500  dur = time.time() - t
501  # make sure returned immediately
502  self.assertTrue(abs(dur) < 0.1, dur)
503 
504  def test_Rate(self):
505  import rospy
506  import time
507  t = time.time()
508  count = 0
509  r = rospy.Rate(10)
510  for x in range(10):
511  r.sleep()
512  dur = time.time() - t
513  # make sure sleep is approximately right
514  self.assertTrue(abs(dur - 1.0) < 0.5, dur)
515 
516 
517  def test_param_server(self):
518  # this isn't a parameter server test, just checking that the rospy bindings work
519  import rospy
520 
521  try:
522  rospy.get_param('not_a_param')
523  self.fail("should have raised KeyError")
524  except KeyError: pass
525  self.assertEqual('default_val', rospy.get_param('not_a_param', 'default_val') )
526 
527  p = rospy.get_param('/param')
528  self.assertEqual("value", p)
529  p = rospy.get_param('param')
530  self.assertEqual("value", p)
531  p = rospy.get_param('/group/param')
532  self.assertEqual("group_value", p)
533  p = rospy.get_param('group/param')
534  self.assertEqual("group_value", p)
535 
536  self.assertEqual('/param', rospy.search_param('param'))
537 
538  names = rospy.get_param_names()
539  self.assertTrue('/param' in names)
540  self.assertTrue('/group/param' in names)
541 
542  for p in ['/param', 'param', 'group/param', '/group/param']:
543  self.assertTrue(rospy.has_param(p))
544 
545  rospy.set_param('param2', 'value2')
546  self.assertTrue(rospy.has_param('param2'))
547  self.assertEqual('value2', rospy.get_param('param2'))
548  rospy.delete_param('param2')
549  self.assertFalse(rospy.has_param('param2'))
550  try:
551  rospy.get_param('param2')
552  self.fail("should have raised KeyError")
553  except KeyError: pass
554 
556  # lazy-import for coverage
557  import rospy
558  import std_msgs.msg
559  import time
560 
561  # test standard wait for message
562  def task1():
563  return rospy.wait_for_message('chatter', std_msgs.msg.String)
564  timeout_t = time.time() + 5.
565  t1 = TestTask(task1)
566  t1.start()
567  while not t1.done and time.time() < timeout_t:
568  time.sleep(0.5)
569  self.assertTrue(t1.success)
570  self.assertTrue('hello' in t1.value.data)
571 
572  # test wait for message with timeout
573  def task2():
574  return rospy.wait_for_message('chatter', std_msgs.msg.String, timeout=2.)
575  timeout_t = time.time() + 5.
576  t2 = TestTask(task2)
577  t2.start()
578  while not t2.done and time.time() < timeout_t:
579  time.sleep(0.5)
580  self.assertTrue(t2.success)
581  self.assertTrue('hello' in t2.value.data)
582 
583  # test wait for message with timeout FAILURE
584  def task3():
585  # #2842 raising bounds from .1 to .3 for amazon VM
586  return rospy.wait_for_message('fake_topic', std_msgs.msg.String, timeout=.3)
587  timeout_t = time.time() + 2.
588  t3 = TestTask(task3)
589  t3.start()
590  while not t3.done and time.time() < timeout_t:
591  time.sleep(0.5)
592  self.assertFalse(t3.success)
593  self.assertTrue(t3.done)
594  self.assertTrue(t3.value is None)
595 
597  # lazy-import for coverage
598  import rospy
599  import std_msgs.msg
600  import time
601 
602  # test wait for message with timeout
603  def task2():
604  return rospy.wait_for_message('chatter', std_msgs.msg.String,
605  timeout=rospy.Duration.from_sec(2.))
606  timeout_t = time.time() + 5.
607  t2 = TestTask(task2)
608  t2.start()
609  while not t2.done and time.time() < timeout_t:
610  time.sleep(0.5)
611  self.assertTrue(t2.success)
612  self.assertTrue('hello' in t2.value.data)
613 
614  # test wait for message with timeout FAILURE
615  def task3():
616  # #2842 raising bounds from .1 to .3 for amazon VM
617  return rospy.wait_for_message('fake_topic', std_msgs.msg.String,
618  timeout=rospy.Duration.from_sec(.3))
619  timeout_t = time.time() + 2.
620  t3 = TestTask(task3)
621  t3.start()
622  while not t3.done and time.time() < timeout_t:
623  time.sleep(0.5)
624  self.assertFalse(t3.success)
625  self.assertTrue(t3.done)
626  self.assertTrue(t3.value is None)
627 
628 if __name__ == '__main__':
629  rosunit.unitrun('test_rospy', sys.argv[0], TestRospyClientOnline, coverage_packages=['rospy.client', 'rospy.msproxy'])
msg
test_rospy_client_online.TestRospyClientOnline.__init__
def __init__(self, *args)
Definition: test_rospy_client_online.py:72
test_rospy_client_online.TestRospyClientOnline.test_Rate
def test_Rate(self)
Definition: test_rospy_client_online.py:504
test_rospy_client_online.TestRospyClientOnline
Definition: test_rospy_client_online.py:70
test_rospy_client_online.TestRospyClientOnline.test_ServiceProxy_wait_for_service
def test_ServiceProxy_wait_for_service(self)
Definition: test_rospy_client_online.py:396
test_rospy_client_online.TestTask.run
def run(self)
Definition: test_rospy_client_online.py:59
test_rospy_client_online.TestRospyClientOnline.test_sleep
def test_sleep(self)
Definition: test_rospy_client_online.py:480
test_rospy_client_online.TestRospyClientOnline.timeout
timeout
Definition: test_rospy_client_online.py:410
test_rospy_client_online.TestRospyClientOnline.test_wait_for_message_duration
def test_wait_for_message_duration(self)
Definition: test_rospy_client_online.py:596
test_rospy_client_online.TestRospyClientOnline.test_ServiceProxy_wait_for_service_duration
def test_ServiceProxy_wait_for_service_duration(self)
Definition: test_rospy_client_online.py:441
test_rospy_client_online.TestRospyClientOnline.test_wait_for_message
def test_wait_for_message(self)
Definition: test_rospy_client_online.py:555
test_rospy_client_online.TestTask.__init__
def __init__(self, task)
Definition: test_rospy_client_online.py:52
test_rospy_client_online.TestTask
Definition: test_rospy_client_online.py:51
test_rospy_client_online.TestTask.value
value
Definition: test_rospy_client_online.py:57
test_rospy_client_online.TestRospyClientOnline.test_wait_for_service_duration
def test_wait_for_service_duration(self)
Definition: test_rospy_client_online.py:367
test_rospy_client_online.TestTask.task
task
Definition: test_rospy_client_online.py:54
test_rospy_client_online.TestRospyClientOnline.test_param_server
def test_param_server(self)
Definition: test_rospy_client_online.py:517
test_rospy_client_online.TestRospyClientOnline.test_log
def test_log(self)
Definition: test_rospy_client_online.py:77
test_rospy_client_online.TestRospyClientOnline.test_wait_for_service
def test_wait_for_service(self)
Definition: test_rospy_client_online.py:330
test_rospy_client_online.TestTask.success
success
Definition: test_rospy_client_online.py:55
test_rospy_client_online.TestTask.done
done
Definition: test_rospy_client_online.py:56
test_rospy_client_online.TestRospyClientOnline.proxy
proxy
Definition: test_rospy_client_online.py:409


test_rospy
Author(s): Ken Conley, Dirk Thomas , Jacob Perron
autogenerated on Tue May 20 2025 03:00:44