43 from threading
import Thread
54 print(
"STARTING TASK")
56 print(
"TASK HAS COMPLETED")
66 unittest.TestCase.__init__(self, *args)
68 rospy.init_node(
'test_rospy_online')
75 real_stdout = sys.stdout
76 real_stderr = sys.stderr
80 from cStringIO
import StringIO
82 from io
import StringIO
83 sys.stdout = StringIO()
84 sys.stderr = StringIO()
87 rospy.loginfo(
"test 1")
88 self.assert_(
"test 1" in sys.stdout.getvalue())
90 rospy.logwarn(
"test 2")
91 self.assert_(
"[WARN]" in sys.stderr.getvalue())
92 self.assert_(
"test 2" in sys.stderr.getvalue())
94 sys.stderr = StringIO()
95 rospy.logerr(
"test 3")
96 self.assert_(
"[ERROR]" in sys.stderr.getvalue())
97 self.assert_(
"test 3" in sys.stderr.getvalue())
99 sys.stderr = StringIO()
100 rospy.logfatal(
"test 4")
101 self.assert_(
"[FATAL]" in sys.stderr.getvalue())
102 self.assert_(
"test 4" in sys.stderr.getvalue())
106 sys.stdout = StringIO()
107 rospy.loginfo_throttle(3,
"test 1")
109 self.assert_(
"test 1" in sys.stdout.getvalue())
110 rospy.sleep(rospy.Duration(1))
112 self.assert_(
"" == sys.stdout.getvalue())
113 rospy.sleep(rospy.Duration(2))
115 self.assert_(
"test 1" in sys.stdout.getvalue())
118 sys.stderr = StringIO()
119 rospy.logwarn_throttle(3,
"test 2")
121 self.assert_(
"test 2" in sys.stderr.getvalue())
122 rospy.sleep(rospy.Duration(1))
124 self.assert_(
"" == sys.stderr.getvalue())
125 rospy.sleep(rospy.Duration(2))
127 self.assert_(
"test 2" in sys.stderr.getvalue())
130 sys.stderr = StringIO()
131 rospy.logerr_throttle(3,
"test 3")
133 self.assert_(
"test 3" in sys.stderr.getvalue())
134 rospy.sleep(rospy.Duration(1))
136 self.assert_(
"" == sys.stderr.getvalue())
137 rospy.sleep(rospy.Duration(2))
139 self.assert_(
"test 3" in sys.stderr.getvalue())
142 sys.stderr = StringIO()
143 rospy.logfatal_throttle(3,
"test 4")
145 self.assert_(
"test 4" in sys.stderr.getvalue())
146 rospy.sleep(rospy.Duration(1))
148 self.assert_(
"" == sys.stderr.getvalue())
149 rospy.sleep(rospy.Duration(2))
151 self.assert_(
"test 4" in sys.stderr.getvalue())
153 sys.stdout = real_stdout
154 sys.stderr = real_stderr
163 rospy.wait_for_service(
'add_two_ints')
164 timeout_t = time.time() + 5.
167 while not t1.done
and time.time() < timeout_t:
169 self.assert_(t1.success)
173 rospy.wait_for_service(
'add_two_ints', timeout=1.)
174 timeout_t = time.time() + 5.
177 while not t2.done
and time.time() < timeout_t:
179 self.assert_(t2.success)
184 rospy.wait_for_service(
'fake_service', timeout=0.3)
185 timeout_t = time.time() + 2.
188 while not t3.done
and time.time() < timeout_t:
190 self.assert_(t3.done)
191 self.failIf(t3.success)
195 Test ServiceProxy.wait_for_service 200 import test_rosmaster.srv
203 proxy = rospy.ServiceProxy(
'add_two_ints', test_rosmaster.srv.AddTwoInts)
204 class ProxyTask(object):
205 def __init__(self, proxy, timeout=None):
210 self.proxy.wait_for_service()
212 self.proxy.wait_for_service(timeout=self.
timeout)
213 timeout_t = time.time() + 5.
216 while not t1.done
and time.time() < timeout_t:
218 self.assert_(t1.success)
221 timeout_t = time.time() + 5.
222 t2 =
TestTask(ProxyTask(proxy, timeout=1.))
224 while not t2.done
and time.time() < timeout_t:
226 self.assert_(t2.success)
229 fake_proxy = rospy.ServiceProxy(
'fake_service', test_rosmaster.srv.AddTwoInts)
230 timeout_t = time.time() + 2.
231 t3 =
TestTask(ProxyTask(fake_proxy, timeout=0.1))
233 while not t3.done
and time.time() < timeout_t:
235 self.assert_(t3.done)
236 self.failIf(t3.success)
243 dur = time.time() - t
247 self.assert_(abs(dur - 0.1) < 0.03, dur)
250 rospy.sleep(rospy.Duration.from_sec(0.1))
251 dur = time.time() - t
253 self.assert_(abs(dur - 0.1) < 0.03, dur)
257 rospy.sleep(rospy.Duration.from_sec(-10.))
258 dur = time.time() - t
260 self.assert_(abs(dur) < 0.1, dur)
270 dur = time.time() - t
272 self.assert_(abs(dur - 1.0) < 0.5, dur)
280 rospy.get_param(
'not_a_param')
281 self.fail(
"should have raised KeyError")
282 except KeyError:
pass 283 self.assertEquals(
'default_val', rospy.get_param(
'not_a_param',
'default_val') )
285 p = rospy.get_param(
'/param')
286 self.assertEquals(
"value", p)
287 p = rospy.get_param(
'param')
288 self.assertEquals(
"value", p)
289 p = rospy.get_param(
'/group/param')
290 self.assertEquals(
"group_value", p)
291 p = rospy.get_param(
'group/param')
292 self.assertEquals(
"group_value", p)
294 self.assertEquals(
'/param', rospy.search_param(
'param'))
296 names = rospy.get_param_names()
297 self.assert_(
'/param' in names)
298 self.assert_(
'/group/param' in names)
300 for p
in [
'/param',
'param',
'group/param',
'/group/param']:
301 self.assert_(rospy.has_param(p))
303 rospy.set_param(
'param2',
'value2')
304 self.assert_(rospy.has_param(
'param2'))
305 self.assertEquals(
'value2', rospy.get_param(
'param2'))
306 rospy.delete_param(
'param2')
307 self.failIf(rospy.has_param(
'param2'))
309 rospy.get_param(
'param2')
310 self.fail(
"should have raised KeyError")
311 except KeyError:
pass 321 return rospy.wait_for_message(
'chatter', std_msgs.msg.String)
322 timeout_t = time.time() + 5.
325 while not t1.done
and time.time() < timeout_t:
327 self.assert_(t1.success)
328 self.assert_(
'hello' in t1.value.data)
332 return rospy.wait_for_message(
'chatter', std_msgs.msg.String, timeout=2.)
333 timeout_t = time.time() + 5.
336 while not t2.done
and time.time() < timeout_t:
338 self.assert_(t2.success)
339 self.assert_(
'hello' in t2.value.data)
344 return rospy.wait_for_message(
'fake_topic', std_msgs.msg.String, timeout=.3)
345 timeout_t = time.time() + 2.
348 while not t3.done
and time.time() < timeout_t:
350 self.failIf(t3.success)
351 self.assert_(t3.done)
352 self.assert_(t3.value
is None)
354 if __name__ ==
'__main__':
355 rosunit.unitrun(
'test_rospy', sys.argv[0], TestRospyClientOnline, coverage_packages=[
'rospy.client',
'rospy.msproxy'])
def test_wait_for_message(self)
def test_param_server(self)
def test_wait_for_service(self)
def test_ServiceProxy_wait_for_service(self)