test_rospy_client_online.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2008, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 
34 import os
35 import sys
36 import struct
37 import unittest
38 import time
39 import random
40 
41 import rosunit
42 
43 from threading import Thread
44 class TestTask(Thread):
45  def __init__(self, task):
46  Thread.__init__(self)
47  self.task = task
48  self.success = False
49  self.done = False
50  self.value = None
51 
52  def run(self):
53  try:
54  print("STARTING TASK")
55  self.value = self.task()
56  print("TASK HAS COMPLETED")
57  self.success = True
58  except:
59  import traceback
60  traceback.print_exc()
61  self.done = True
62 
63 class TestRospyClientOnline(unittest.TestCase):
64 
65  def __init__(self, *args):
66  unittest.TestCase.__init__(self, *args)
67  import rospy
68  rospy.init_node('test_rospy_online')
69 
70  def test_log(self):
71  # we can't do anything fancy here like capture stdout as rospy
72  # grabs the streams before we do. Instead, just make sure the
73  # routines don't crash.
74 
75  real_stdout = sys.stdout
76  real_stderr = sys.stderr
77 
78  try:
79  try:
80  from cStringIO import StringIO
81  except ImportError:
82  from io import StringIO
83  sys.stdout = StringIO()
84  sys.stderr = StringIO()
85 
86  import rospy
87  rospy.loginfo("test 1")
88  self.assert_("test 1" in sys.stdout.getvalue())
89 
90  rospy.logwarn("test 2")
91  self.assert_("[WARN]" in sys.stderr.getvalue())
92  self.assert_("test 2" in sys.stderr.getvalue())
93 
94  sys.stderr = StringIO()
95  rospy.logerr("test 3")
96  self.assert_("[ERROR]" in sys.stderr.getvalue())
97  self.assert_("test 3" in sys.stderr.getvalue())
98 
99  sys.stderr = StringIO()
100  rospy.logfatal("test 4")
101  self.assert_("[FATAL]" in sys.stderr.getvalue())
102  self.assert_("test 4" in sys.stderr.getvalue())
103 
104  # logXXX_throttle
105  for i in range(3):
106  sys.stdout = StringIO()
107  rospy.loginfo_throttle(3, "test 1")
108  if i == 0:
109  self.assert_("test 1" in sys.stdout.getvalue())
110  rospy.sleep(rospy.Duration(1))
111  elif i == 1:
112  self.assert_("" == sys.stdout.getvalue())
113  rospy.sleep(rospy.Duration(2))
114  else:
115  self.assert_("test 1" in sys.stdout.getvalue())
116 
117  for i in range(3):
118  sys.stderr = StringIO()
119  rospy.logwarn_throttle(3, "test 2")
120  if i == 0:
121  self.assert_("test 2" in sys.stderr.getvalue())
122  rospy.sleep(rospy.Duration(1))
123  elif i == 1:
124  self.assert_("" == sys.stderr.getvalue())
125  rospy.sleep(rospy.Duration(2))
126  else:
127  self.assert_("test 2" in sys.stderr.getvalue())
128 
129  for i in range(3):
130  sys.stderr = StringIO()
131  rospy.logerr_throttle(3, "test 3")
132  if i == 0:
133  self.assert_("test 3" in sys.stderr.getvalue())
134  rospy.sleep(rospy.Duration(1))
135  elif i == 1:
136  self.assert_("" == sys.stderr.getvalue())
137  rospy.sleep(rospy.Duration(2))
138  else:
139  self.assert_("test 3" in sys.stderr.getvalue())
140 
141  for i in range(3):
142  sys.stderr = StringIO()
143  rospy.logfatal_throttle(3, "test 4")
144  if i == 0:
145  self.assert_("test 4" in sys.stderr.getvalue())
146  rospy.sleep(rospy.Duration(1))
147  elif i == 1:
148  self.assert_("" == sys.stderr.getvalue())
149  rospy.sleep(rospy.Duration(2))
150  else:
151  self.assert_("test 4" in sys.stderr.getvalue())
152  finally:
153  sys.stdout = real_stdout
154  sys.stderr = real_stderr
155 
157  # lazy-import for coverage
158  import rospy
159  import time
160 
161  # test wait for service in success case
162  def task1():
163  rospy.wait_for_service('add_two_ints')
164  timeout_t = time.time() + 5.
165  t1 = TestTask(task1)
166  t1.start()
167  while not t1.done and time.time() < timeout_t:
168  time.sleep(0.5)
169  self.assert_(t1.success)
170 
171  # test wait for service with timeout in success case
172  def task2():
173  rospy.wait_for_service('add_two_ints', timeout=1.)
174  timeout_t = time.time() + 5.
175  t2 = TestTask(task2)
176  t2.start()
177  while not t2.done and time.time() < timeout_t:
178  time.sleep(0.5)
179  self.assert_(t2.success)
180 
181  # test wait for service in failure case
182  def task3():
183  # #2842 raising bounds from .1 to .3 for amazon VM
184  rospy.wait_for_service('fake_service', timeout=0.3)
185  timeout_t = time.time() + 2.
186  t3 = TestTask(task3)
187  t3.start()
188  while not t3.done and time.time() < timeout_t:
189  time.sleep(0.5)
190  self.assert_(t3.done)
191  self.failIf(t3.success)
192 
194  """
195  Test ServiceProxy.wait_for_service
196  """
197  # lazy-import for coverage
198  import rospy
199  import time
200  import test_rosmaster.srv
201 
202  # test wait for service in success case
203  proxy = rospy.ServiceProxy('add_two_ints', test_rosmaster.srv.AddTwoInts)
204  class ProxyTask(object):
205  def __init__(self, proxy, timeout=None):
206  self.proxy = proxy
207  self.timeout = timeout
208  def __call__(self):
209  if self.timeout is None:
210  self.proxy.wait_for_service()
211  else:
212  self.proxy.wait_for_service(timeout=self.timeout)
213  timeout_t = time.time() + 5.
214  t1 = TestTask(ProxyTask(proxy))
215  t1.start()
216  while not t1.done and time.time() < timeout_t:
217  time.sleep(0.5)
218  self.assert_(t1.success)
219 
220  # test wait for service with timeout in success case
221  timeout_t = time.time() + 5.
222  t2 = TestTask(ProxyTask(proxy, timeout=1.))
223  t2.start()
224  while not t2.done and time.time() < timeout_t:
225  time.sleep(0.5)
226  self.assert_(t2.success)
227 
228  # test wait for service in failure case
229  fake_proxy = rospy.ServiceProxy('fake_service', test_rosmaster.srv.AddTwoInts)
230  timeout_t = time.time() + 2.
231  t3 = TestTask(ProxyTask(fake_proxy, timeout=0.1))
232  t3.start()
233  while not t3.done and time.time() < timeout_t:
234  time.sleep(0.5)
235  self.assert_(t3.done)
236  self.failIf(t3.success)
237 
238  def test_sleep(self):
239  import rospy
240  import time
241  t = time.time()
242  rospy.sleep(0.1)
243  dur = time.time() - t
244  # #2842 raising bounds from .01 to .03 for amazon VM
245 
246  # make sure sleep is approximately right
247  self.assert_(abs(dur - 0.1) < 0.03, dur)
248 
249  t = time.time()
250  rospy.sleep(rospy.Duration.from_sec(0.1))
251  dur = time.time() - t
252  # make sure sleep is approximately right
253  self.assert_(abs(dur - 0.1) < 0.03, dur)
254 
255  # sleep for neg duration
256  t = time.time()
257  rospy.sleep(rospy.Duration.from_sec(-10.))
258  dur = time.time() - t
259  # make sure returned immediately
260  self.assert_(abs(dur) < 0.1, dur)
261 
262  def test_Rate(self):
263  import rospy
264  import time
265  t = time.time()
266  count = 0
267  r = rospy.Rate(10)
268  for x in range(10):
269  r.sleep()
270  dur = time.time() - t
271  # make sure sleep is approximately right
272  self.assert_(abs(dur - 1.0) < 0.5, dur)
273 
274 
275  def test_param_server(self):
276  # this isn't a parameter server test, just checking that the rospy bindings work
277  import rospy
278 
279  try:
280  rospy.get_param('not_a_param')
281  self.fail("should have raised KeyError")
282  except KeyError: pass
283  self.assertEquals('default_val', rospy.get_param('not_a_param', 'default_val') )
284 
285  p = rospy.get_param('/param')
286  self.assertEquals("value", p)
287  p = rospy.get_param('param')
288  self.assertEquals("value", p)
289  p = rospy.get_param('/group/param')
290  self.assertEquals("group_value", p)
291  p = rospy.get_param('group/param')
292  self.assertEquals("group_value", p)
293 
294  self.assertEquals('/param', rospy.search_param('param'))
295 
296  names = rospy.get_param_names()
297  self.assert_('/param' in names)
298  self.assert_('/group/param' in names)
299 
300  for p in ['/param', 'param', 'group/param', '/group/param']:
301  self.assert_(rospy.has_param(p))
302 
303  rospy.set_param('param2', 'value2')
304  self.assert_(rospy.has_param('param2'))
305  self.assertEquals('value2', rospy.get_param('param2'))
306  rospy.delete_param('param2')
307  self.failIf(rospy.has_param('param2'))
308  try:
309  rospy.get_param('param2')
310  self.fail("should have raised KeyError")
311  except KeyError: pass
312 
314  # lazy-import for coverage
315  import rospy
316  import std_msgs.msg
317  import time
318 
319  # test standard wait for message
320  def task1():
321  return rospy.wait_for_message('chatter', std_msgs.msg.String)
322  timeout_t = time.time() + 5.
323  t1 = TestTask(task1)
324  t1.start()
325  while not t1.done and time.time() < timeout_t:
326  time.sleep(0.5)
327  self.assert_(t1.success)
328  self.assert_('hello' in t1.value.data)
329 
330  # test wait for message with timeout
331  def task2():
332  return rospy.wait_for_message('chatter', std_msgs.msg.String, timeout=2.)
333  timeout_t = time.time() + 5.
334  t2 = TestTask(task2)
335  t2.start()
336  while not t2.done and time.time() < timeout_t:
337  time.sleep(0.5)
338  self.assert_(t2.success)
339  self.assert_('hello' in t2.value.data)
340 
341  # test wait for message with timeout FAILURE
342  def task3():
343  # #2842 raising bounds from .1 to .3 for amazon VM
344  return rospy.wait_for_message('fake_topic', std_msgs.msg.String, timeout=.3)
345  timeout_t = time.time() + 2.
346  t3 = TestTask(task3)
347  t3.start()
348  while not t3.done and time.time() < timeout_t:
349  time.sleep(0.5)
350  self.failIf(t3.success)
351  self.assert_(t3.done)
352  self.assert_(t3.value is None)
353 
354 if __name__ == '__main__':
355  rosunit.unitrun('test_rospy', sys.argv[0], TestRospyClientOnline, coverage_packages=['rospy.client', 'rospy.msproxy'])


test_rospy
Author(s): Ken Conley, Dirk Thomas
autogenerated on Mon Nov 2 2020 03:52:56