41 import rosgraph.roslogging
44 from cStringIO
import StringIO
46 from io
import StringIO
50 from threading
import Thread
61 print(
"STARTING TASK")
63 print(
"TASK HAS COMPLETED")
73 unittest.TestCase.__init__(self, *args)
75 rospy.init_node(
'test_rospy_online')
78 rosout_logger = logging.getLogger(
'rosout')
81 self.assertTrue(len(rosout_logger.handlers) == 2)
82 self.assertTrue(rosout_logger.handlers[0], rosgraph.roslogging.RosStreamHandler)
83 self.assertTrue(rosout_logger.handlers[1], rospy.impl.rosout.RosOutHandler)
85 default_ros_handler = rosout_logger.handlers[0]
90 test_ros_handler = rosgraph.roslogging.RosStreamHandler(colorize=
False, stdout=lout, stderr=lerr)
94 rosout_logger.removeHandler(default_ros_handler)
95 rosout_logger.addHandler(test_ros_handler)
98 rospy.loginfo(
"test 1")
99 lout_last = lout.getvalue().splitlines()[-1]
100 self.assert_(
"test 1" in lout_last)
102 rospy.logwarn(
"test 2")
103 lerr_last = lerr.getvalue().splitlines()[-1]
104 self.assert_(
"[WARN]" in lerr_last)
105 self.assert_(
"test 2" in lerr_last)
107 rospy.logerr(
"test 3")
108 lerr_last = lerr.getvalue().splitlines()[-1]
109 self.assert_(
"[ERROR]" in lerr_last)
110 self.assert_(
"test 3" in lerr_last)
112 rospy.logfatal(
"test 4")
113 lerr_last = lerr.getvalue().splitlines()[-1]
114 self.assert_(
"[FATAL]" in lerr_last)
115 self.assert_(
"test 4" in lerr_last)
120 rospy.loginfo_once(
"test 1")
121 lout_last = lout.getvalue().splitlines()[-1]
123 self.assert_(
"test 1" in lout_last)
124 lout_len = len(lout.getvalue())
126 self.assert_(lout_len == len(lout.getvalue()))
130 rospy.logwarn_once(
"test 2")
131 lerr_last = lerr.getvalue().splitlines()[-1]
133 self.assert_(
"test 2" in lerr_last)
134 lerr_len = len(lerr.getvalue())
136 self.assert_(lerr_len == len(lerr.getvalue()))
140 rospy.logerr_once(
"test 3")
141 lerr_last = lerr.getvalue().splitlines()[-1]
143 self.assert_(
"test 3" in lerr_last)
144 lerr_len = len(lerr.getvalue())
146 self.assert_(lerr_len == len(lerr.getvalue()))
150 rospy.logfatal_once(
"test 4")
151 lerr_last = lerr.getvalue().splitlines()[-1]
153 self.assert_(
"test 4" in lerr_last)
154 lerr_len = len(lerr.getvalue())
156 self.assert_(lerr_len == len(lerr.getvalue()))
161 rospy.loginfo_throttle(3,
"test 1")
162 lout_last = lout.getvalue().splitlines()[-1]
164 self.assert_(
"test 1" in lout_last)
165 lout_len = len(lout.getvalue())
166 rospy.sleep(rospy.Duration(1))
168 self.assert_(lout_len == len(lout.getvalue()))
169 rospy.sleep(rospy.Duration(2))
171 self.assert_(
"test 1" in lout_last)
175 rospy.logwarn_throttle(3,
"test 2")
176 lerr_last = lerr.getvalue().splitlines()[-1]
178 self.assert_(
"test 2" in lerr_last)
179 lerr_len = len(lerr.getvalue())
180 rospy.sleep(rospy.Duration(1))
182 self.assert_(lerr_len == len(lerr.getvalue()))
183 rospy.sleep(rospy.Duration(2))
185 self.assert_(
"test 2" in lerr_last)
189 rospy.logerr_throttle(3,
"test 3")
190 lerr_last = lerr.getvalue().splitlines()[-1]
192 self.assert_(
"test 3" in lerr_last)
193 lerr_len = len(lerr.getvalue())
194 rospy.sleep(rospy.Duration(1))
196 self.assert_(lerr_len == len(lerr.getvalue()))
197 rospy.sleep(rospy.Duration(2))
199 self.assert_(
"test 3" in lerr_last)
203 rospy.logfatal_throttle(3,
"test 4")
204 lerr_last = lerr.getvalue().splitlines()[-1]
206 self.assert_(
"test 4" in lerr_last)
207 lerr_len = len(lerr.getvalue())
208 rospy.sleep(rospy.Duration(1))
210 self.assert_(lerr_len == len(lerr.getvalue()))
211 rospy.sleep(rospy.Duration(2))
213 self.assert_(
"test 4" in lerr_last)
215 rospy.loginfo(
"test child logger 1", logger_name=
"log1")
216 lout_last = lout.getvalue().splitlines()[-1]
217 self.assert_(
"test child logger 1" in lout_last)
219 rospy.logwarn(
"test child logger 2", logger_name=
"log2")
220 lerr_last = lerr.getvalue().splitlines()[-1]
221 self.assert_(
"[WARN]" in lerr_last)
222 self.assert_(
"test child logger 2" in lerr_last)
224 rospy.logerr(
"test child logger 3", logger_name=
"log3")
225 lerr_last = lerr.getvalue().splitlines()[-1]
226 self.assert_(
"[ERROR]" in lerr_last)
227 self.assert_(
"test child logger 3" in lerr_last)
229 rospy.logfatal(
"test child logger 4", logger_name=
"log4")
230 lerr_last = lerr.getvalue().splitlines()[-1]
231 self.assert_(
"[FATAL]" in lerr_last)
232 self.assert_(
"test child logger 4" in lerr_last)
236 rosout_logger.removeHandler(test_ros_handler)
239 rosout_logger.addHandler(default_ros_handler)
248 rospy.wait_for_service(
'add_two_ints')
249 timeout_t = time.time() + 5.
252 while not t1.done
and time.time() < timeout_t:
254 self.assert_(t1.success)
258 rospy.wait_for_service(
'add_two_ints', timeout=1.)
259 timeout_t = time.time() + 5.
262 while not t2.done
and time.time() < timeout_t:
264 self.assert_(t2.success)
269 rospy.wait_for_service(
'fake_service', timeout=0.3)
270 timeout_t = time.time() + 2.
273 while not t3.done
and time.time() < timeout_t:
275 self.assert_(t3.done)
276 self.failIf(t3.success)
280 Test ServiceProxy.wait_for_service 285 import test_rosmaster.srv
288 proxy = rospy.ServiceProxy(
'add_two_ints', test_rosmaster.srv.AddTwoInts)
289 class ProxyTask(object):
290 def __init__(self, proxy, timeout=None):
295 self.proxy.wait_for_service()
297 self.proxy.wait_for_service(timeout=self.
timeout)
298 timeout_t = time.time() + 5.
301 while not t1.done
and time.time() < timeout_t:
303 self.assert_(t1.success)
306 timeout_t = time.time() + 5.
307 t2 =
TestTask(ProxyTask(proxy, timeout=1.))
309 while not t2.done
and time.time() < timeout_t:
311 self.assert_(t2.success)
314 fake_proxy = rospy.ServiceProxy(
'fake_service', test_rosmaster.srv.AddTwoInts)
315 timeout_t = time.time() + 2.
316 t3 =
TestTask(ProxyTask(fake_proxy, timeout=0.1))
318 while not t3.done
and time.time() < timeout_t:
320 self.assert_(t3.done)
321 self.failIf(t3.success)
328 dur = time.time() - t
332 self.assert_(abs(dur - 0.1) < 0.03, dur)
335 rospy.sleep(rospy.Duration.from_sec(0.1))
336 dur = time.time() - t
338 self.assert_(abs(dur - 0.1) < 0.03, dur)
342 rospy.sleep(rospy.Duration.from_sec(-10.))
343 dur = time.time() - t
345 self.assert_(abs(dur) < 0.1, dur)
355 dur = time.time() - t
357 self.assert_(abs(dur - 1.0) < 0.5, dur)
365 rospy.get_param(
'not_a_param')
366 self.fail(
"should have raised KeyError")
367 except KeyError:
pass 368 self.assertEquals(
'default_val', rospy.get_param(
'not_a_param',
'default_val') )
370 p = rospy.get_param(
'/param')
371 self.assertEquals(
"value", p)
372 p = rospy.get_param(
'param')
373 self.assertEquals(
"value", p)
374 p = rospy.get_param(
'/group/param')
375 self.assertEquals(
"group_value", p)
376 p = rospy.get_param(
'group/param')
377 self.assertEquals(
"group_value", p)
379 self.assertEquals(
'/param', rospy.search_param(
'param'))
381 names = rospy.get_param_names()
382 self.assert_(
'/param' in names)
383 self.assert_(
'/group/param' in names)
385 for p
in [
'/param',
'param',
'group/param',
'/group/param']:
386 self.assert_(rospy.has_param(p))
388 rospy.set_param(
'param2',
'value2')
389 self.assert_(rospy.has_param(
'param2'))
390 self.assertEquals(
'value2', rospy.get_param(
'param2'))
391 rospy.delete_param(
'param2')
392 self.failIf(rospy.has_param(
'param2'))
394 rospy.get_param(
'param2')
395 self.fail(
"should have raised KeyError")
396 except KeyError:
pass 406 return rospy.wait_for_message(
'chatter', std_msgs.msg.String)
407 timeout_t = time.time() + 5.
410 while not t1.done
and time.time() < timeout_t:
412 self.assert_(t1.success)
413 self.assert_(
'hello' in t1.value.data)
417 return rospy.wait_for_message(
'chatter', std_msgs.msg.String, timeout=2.)
418 timeout_t = time.time() + 5.
421 while not t2.done
and time.time() < timeout_t:
423 self.assert_(t2.success)
424 self.assert_(
'hello' in t2.value.data)
429 return rospy.wait_for_message(
'fake_topic', std_msgs.msg.String, timeout=.3)
430 timeout_t = time.time() + 2.
433 while not t3.done
and time.time() < timeout_t:
435 self.failIf(t3.success)
436 self.assert_(t3.done)
437 self.assert_(t3.value
is None)
439 if __name__ ==
'__main__':
440 rosunit.unitrun(
'test_rospy', sys.argv[0], TestRospyClientOnline, coverage_packages=[
'rospy.client',
'rospy.msproxy'])
def test_wait_for_message(self)
def test_param_server(self)
def test_wait_for_service(self)
def test_ServiceProxy_wait_for_service(self)