41 import rosgraph.roslogging
44 from cStringIO
import StringIO
46 from io
import StringIO
50 from threading
import Thread
61 print(
"STARTING TASK")
63 print(
"TASK HAS COMPLETED")
73 unittest.TestCase.__init__(self, *args)
75 rospy.init_node(
'test_rospy_online')
78 rosout_logger = logging.getLogger(
'rosout')
81 self.assertTrue(len(rosout_logger.handlers) == 2)
82 self.assertTrue(rosout_logger.handlers[0], rosgraph.roslogging.RosStreamHandler)
83 self.assertTrue(rosout_logger.handlers[1], rospy.impl.rosout.RosOutHandler)
85 default_ros_handler = rosout_logger.handlers[0]
90 test_ros_handler = rosgraph.roslogging.RosStreamHandler(colorize=
False, stdout=lout, stderr=lerr)
94 rosout_logger.removeHandler(default_ros_handler)
95 rosout_logger.addHandler(test_ros_handler)
98 rospy.loginfo(
"test 1")
99 lout_last = lout.getvalue().splitlines()[-1]
100 self.assertTrue(
"test 1" in lout_last)
102 rospy.logwarn(
"test 2")
103 lerr_last = lerr.getvalue().splitlines()[-1]
104 self.assertTrue(
"[WARN]" in lerr_last)
105 self.assertTrue(
"test 2" in lerr_last)
107 rospy.logerr(
"test 3")
108 lerr_last = lerr.getvalue().splitlines()[-1]
109 self.assertTrue(
"[ERROR]" in lerr_last)
110 self.assertTrue(
"test 3" in lerr_last)
112 rospy.logfatal(
"test 4")
113 lerr_last = lerr.getvalue().splitlines()[-1]
114 self.assertTrue(
"[FATAL]" in lerr_last)
115 self.assertTrue(
"test 4" in lerr_last)
120 rospy.loginfo_once(
"test 1")
121 lout_last = lout.getvalue().splitlines()[-1]
123 self.assertTrue(
"test 1" in lout_last)
124 lout_len = len(lout.getvalue())
126 self.assertTrue(lout_len == len(lout.getvalue()))
130 rospy.logwarn_once(
"test 2")
131 lerr_last = lerr.getvalue().splitlines()[-1]
133 self.assertTrue(
"test 2" in lerr_last)
134 lerr_len = len(lerr.getvalue())
136 self.assertTrue(lerr_len == len(lerr.getvalue()))
140 rospy.logerr_once(
"test 3")
141 lerr_last = lerr.getvalue().splitlines()[-1]
143 self.assertTrue(
"test 3" in lerr_last)
144 lerr_len = len(lerr.getvalue())
146 self.assertTrue(lerr_len == len(lerr.getvalue()))
150 rospy.logfatal_once(
"test 4")
151 lerr_last = lerr.getvalue().splitlines()[-1]
153 self.assertTrue(
"test 4" in lerr_last)
154 lerr_len = len(lerr.getvalue())
156 self.assertTrue(lerr_len == len(lerr.getvalue()))
161 rospy.loginfo_throttle(3,
"test 1")
162 lout_last = lout.getvalue().splitlines()[-1]
164 self.assertTrue(
"test 1" in lout_last)
165 lout_len = len(lout.getvalue())
166 rospy.sleep(rospy.Duration(1))
168 self.assertTrue(lout_len == len(lout.getvalue()))
169 rospy.sleep(rospy.Duration(2))
171 self.assertTrue(
"test 1" in lout_last)
175 rospy.logwarn_throttle(3,
"test 2")
176 lerr_last = lerr.getvalue().splitlines()[-1]
178 self.assertTrue(
"test 2" in lerr_last)
179 lerr_len = len(lerr.getvalue())
180 rospy.sleep(rospy.Duration(1))
182 self.assertTrue(lerr_len == len(lerr.getvalue()))
183 rospy.sleep(rospy.Duration(2))
185 self.assertTrue(
"test 2" in lerr_last)
189 rospy.logerr_throttle(3,
"test 3")
190 lerr_last = lerr.getvalue().splitlines()[-1]
192 self.assertTrue(
"test 3" in lerr_last)
193 lerr_len = len(lerr.getvalue())
194 rospy.sleep(rospy.Duration(1))
196 self.assertTrue(lerr_len == len(lerr.getvalue()))
197 rospy.sleep(rospy.Duration(2))
199 self.assertTrue(
"test 3" in lerr_last)
203 rospy.logfatal_throttle(3,
"test 4")
204 lerr_last = lerr.getvalue().splitlines()[-1]
206 self.assertTrue(
"test 4" in lerr_last)
207 lerr_len = len(lerr.getvalue())
208 rospy.sleep(rospy.Duration(1))
210 self.assertTrue(lerr_len == len(lerr.getvalue()))
211 rospy.sleep(rospy.Duration(2))
213 self.assertTrue(
"test 4" in lerr_last)
221 test_text =
"test 1a"
222 rospy.loginfo_throttle_identical(2, test_text)
223 lout_last = lout.getvalue().splitlines()[-1]
225 self.assertTrue(test_text
in lout_last)
226 lout_len = len(lout.getvalue())
228 self.assertTrue(lout_len == len(lout.getvalue()))
230 self.assertTrue(test_text
in lout_last)
231 lout_len = len(lout.getvalue())
232 rospy.sleep(rospy.Duration(2))
234 self.assertTrue(lout_len != len(lout.getvalue()))
236 self.assertTrue(test_text
in lout_last)
243 test_text =
"test 2a"
244 rospy.logwarn_throttle_identical(2, test_text)
245 lerr_last = lerr.getvalue().splitlines()[-1]
247 self.assertTrue(test_text
in lerr_last)
248 lerr_len = len(lerr.getvalue())
250 self.assertTrue(lerr_len == len(lerr.getvalue()))
252 self.assertTrue(test_text
in lerr_last)
253 lerr_len = len(lerr.getvalue())
254 rospy.sleep(rospy.Duration(2))
256 self.assertTrue(lerr_len != len(lerr.getvalue()))
258 self.assertTrue(test_text
in lerr_last)
265 test_text =
"test 3a"
266 rospy.logerr_throttle_identical(2, test_text)
267 lerr_last = lerr.getvalue().splitlines()[-1]
269 self.assertTrue(test_text
in lerr_last)
270 lerr_len = len(lerr.getvalue())
272 self.assertTrue(lerr_len == len(lerr.getvalue()))
274 self.assertTrue(test_text
in lerr_last)
275 lerr_len = len(lerr.getvalue())
276 rospy.sleep(rospy.Duration(2))
278 self.assertTrue(lerr_len != len(lerr.getvalue()))
280 self.assertTrue(test_text
in lerr_last)
287 test_text =
"test 4a"
288 rospy.logfatal_throttle_identical(2, test_text)
289 lerr_last = lerr.getvalue().splitlines()[-1]
291 self.assertTrue(test_text
in lerr_last)
292 lerr_len = len(lerr.getvalue())
294 self.assertTrue(lerr_len == len(lerr.getvalue()))
296 self.assertTrue(test_text
in lerr_last)
297 lerr_len = len(lerr.getvalue())
298 rospy.sleep(rospy.Duration(2))
300 self.assertTrue(lerr_len != len(lerr.getvalue()))
302 self.assertTrue(test_text
in lerr_last)
304 rospy.loginfo(
"test child logger 1", logger_name=
"log1")
305 lout_last = lout.getvalue().splitlines()[-1]
306 self.assertTrue(
"test child logger 1" in lout_last)
308 rospy.logwarn(
"test child logger 2", logger_name=
"log2")
309 lerr_last = lerr.getvalue().splitlines()[-1]
310 self.assertTrue(
"[WARN]" in lerr_last)
311 self.assertTrue(
"test child logger 2" in lerr_last)
313 rospy.logerr(
"test child logger 3", logger_name=
"log3")
314 lerr_last = lerr.getvalue().splitlines()[-1]
315 self.assertTrue(
"[ERROR]" in lerr_last)
316 self.assertTrue(
"test child logger 3" in lerr_last)
318 rospy.logfatal(
"test child logger 4", logger_name=
"log4")
319 lerr_last = lerr.getvalue().splitlines()[-1]
320 self.assertTrue(
"[FATAL]" in lerr_last)
321 self.assertTrue(
"test child logger 4" in lerr_last)
325 rosout_logger.removeHandler(test_ros_handler)
328 rosout_logger.addHandler(default_ros_handler)
337 rospy.wait_for_service(
'add_two_ints')
338 timeout_t = time.time() + 5.
341 while not t1.done
and time.time() < timeout_t:
343 self.assertTrue(t1.success)
347 rospy.wait_for_service(
'add_two_ints', timeout=1.)
348 timeout_t = time.time() + 5.
351 while not t2.done
and time.time() < timeout_t:
353 self.assertTrue(t2.success)
358 rospy.wait_for_service(
'fake_service', timeout=0.3)
359 timeout_t = time.time() + 2.
362 while not t3.done
and time.time() < timeout_t:
364 self.assertTrue(t3.done)
365 self.assertFalse(t3.success)
374 rospy.wait_for_service(
'add_two_ints',
375 timeout=rospy.Duration.from_sec(1.0))
376 timeout_t = time.time() + 5.
379 while not t2.done
and time.time() < timeout_t:
381 self.assertTrue(t2.success)
386 rospy.wait_for_service(
'fake_service',
387 timeout=rospy.Duration.from_sec(0.3))
388 timeout_t = time.time() + 2.
391 while not t3.done
and time.time() < timeout_t:
393 self.assertTrue(t3.done)
394 self.assertFalse(t3.success)
398 Test ServiceProxy.wait_for_service
403 import test_rosmaster.srv
406 proxy = rospy.ServiceProxy(
'add_two_ints', test_rosmaster.srv.AddTwoInts)
407 class ProxyTask(object):
408 def __init__(self, proxy, timeout=None):
413 self.
proxy.wait_for_service()
416 timeout_t = time.time() + 5.
419 while not t1.done
and time.time() < timeout_t:
421 self.assertTrue(t1.success)
424 timeout_t = time.time() + 5.
425 t2 =
TestTask(ProxyTask(proxy, timeout=1.))
427 while not t2.done
and time.time() < timeout_t:
429 self.assertTrue(t2.success)
432 fake_proxy = rospy.ServiceProxy(
'fake_service', test_rosmaster.srv.AddTwoInts)
433 timeout_t = time.time() + 2.
434 t3 =
TestTask(ProxyTask(fake_proxy, timeout=0.1))
436 while not t3.done
and time.time() < timeout_t:
438 self.assertTrue(t3.done)
439 self.assertFalse(t3.success)
443 Test ServiceProxy.wait_for_service
448 import test_rosmaster.srv
451 proxy = rospy.ServiceProxy(
'add_two_ints', test_rosmaster.srv.AddTwoInts)
452 class ProxyTask(object):
453 def __init__(self, proxy, timeout=None):
458 self.
proxy.wait_for_service()
463 timeout_t = time.time() + 5.
464 t2 =
TestTask(ProxyTask(proxy, timeout=rospy.Duration.from_sec(1.)))
466 while not t2.done
and time.time() < timeout_t:
468 self.assertTrue(t2.success)
471 fake_proxy = rospy.ServiceProxy(
'fake_service', test_rosmaster.srv.AddTwoInts)
472 timeout_t = time.time() + 2.
473 t3 =
TestTask(ProxyTask(fake_proxy, timeout=rospy.Duration.from_sec(0.1)))
475 while not t3.done
and time.time() < timeout_t:
477 self.assertTrue(t3.done)
478 self.assertFalse(t3.success)
485 dur = time.time() - t
489 self.assertTrue(abs(dur - 0.1) < 0.03, dur)
492 rospy.sleep(rospy.Duration.from_sec(0.1))
493 dur = time.time() - t
495 self.assertTrue(abs(dur - 0.1) < 0.03, dur)
499 rospy.sleep(rospy.Duration.from_sec(-10.))
500 dur = time.time() - t
502 self.assertTrue(abs(dur) < 0.1, dur)
512 dur = time.time() - t
514 self.assertTrue(abs(dur - 1.0) < 0.5, dur)
522 rospy.get_param(
'not_a_param')
523 self.fail(
"should have raised KeyError")
524 except KeyError:
pass
525 self.assertEqual(
'default_val', rospy.get_param(
'not_a_param',
'default_val') )
527 p = rospy.get_param(
'/param')
528 self.assertEqual(
"value", p)
529 p = rospy.get_param(
'param')
530 self.assertEqual(
"value", p)
531 p = rospy.get_param(
'/group/param')
532 self.assertEqual(
"group_value", p)
533 p = rospy.get_param(
'group/param')
534 self.assertEqual(
"group_value", p)
536 self.assertEqual(
'/param', rospy.search_param(
'param'))
538 names = rospy.get_param_names()
539 self.assertTrue(
'/param' in names)
540 self.assertTrue(
'/group/param' in names)
542 for p
in [
'/param',
'param',
'group/param',
'/group/param']:
543 self.assertTrue(rospy.has_param(p))
545 rospy.set_param(
'param2',
'value2')
546 self.assertTrue(rospy.has_param(
'param2'))
547 self.assertEqual(
'value2', rospy.get_param(
'param2'))
548 rospy.delete_param(
'param2')
549 self.assertFalse(rospy.has_param(
'param2'))
551 rospy.get_param(
'param2')
552 self.fail(
"should have raised KeyError")
553 except KeyError:
pass
563 return rospy.wait_for_message(
'chatter', std_msgs.msg.String)
564 timeout_t = time.time() + 5.
567 while not t1.done
and time.time() < timeout_t:
569 self.assertTrue(t1.success)
570 self.assertTrue(
'hello' in t1.value.data)
574 return rospy.wait_for_message(
'chatter', std_msgs.msg.String, timeout=2.)
575 timeout_t = time.time() + 5.
578 while not t2.done
and time.time() < timeout_t:
580 self.assertTrue(t2.success)
581 self.assertTrue(
'hello' in t2.value.data)
586 return rospy.wait_for_message(
'fake_topic', std_msgs.msg.String, timeout=.3)
587 timeout_t = time.time() + 2.
590 while not t3.done
and time.time() < timeout_t:
592 self.assertFalse(t3.success)
593 self.assertTrue(t3.done)
594 self.assertTrue(t3.value
is None)
604 return rospy.wait_for_message(
'chatter', std_msgs.msg.String,
605 timeout=rospy.Duration.from_sec(2.))
606 timeout_t = time.time() + 5.
609 while not t2.done
and time.time() < timeout_t:
611 self.assertTrue(t2.success)
612 self.assertTrue(
'hello' in t2.value.data)
617 return rospy.wait_for_message(
'fake_topic', std_msgs.msg.String,
618 timeout=rospy.Duration.from_sec(.3))
619 timeout_t = time.time() + 2.
622 while not t3.done
and time.time() < timeout_t:
624 self.assertFalse(t3.success)
625 self.assertTrue(t3.done)
626 self.assertTrue(t3.value
is None)
628 if __name__ ==
'__main__':
629 rosunit.unitrun(
'test_rospy', sys.argv[0], TestRospyClientOnline, coverage_packages=[
'rospy.client',
'rospy.msproxy'])