Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 import os
00035 import sys
00036 import struct
00037 import unittest
00038 import time
00039 import random
00040
00041 import rosunit
00042
00043 from threading import Thread
00044 class TestTask(Thread):
00045 def __init__(self, task):
00046 Thread.__init__(self)
00047 self.task = task
00048 self.success = False
00049 self.done = False
00050 self.value = None
00051
00052 def run(self):
00053 try:
00054 print("STARTING TASK")
00055 self.value = self.task()
00056 print("TASK HAS COMPLETED")
00057 self.success = True
00058 except:
00059 import traceback
00060 traceback.print_exc()
00061 self.done = True
00062
00063 class TestRospyClientOnline(unittest.TestCase):
00064
00065 def __init__(self, *args):
00066 unittest.TestCase.__init__(self, *args)
00067 import rospy
00068 rospy.init_node('test_rospy_online')
00069
00070 def test_log(self):
00071
00072
00073
00074
00075 real_stdout = sys.stdout
00076 real_stderr = sys.stderr
00077
00078 try:
00079 try:
00080 from cStringIO import StringIO
00081 except ImportError:
00082 from io import StringIO
00083 sys.stdout = StringIO()
00084 sys.stderr = StringIO()
00085
00086 import rospy
00087 rospy.loginfo("test 1")
00088 self.assert_("test 1" in sys.stdout.getvalue())
00089
00090 rospy.logwarn("test 2")
00091 self.assert_("[WARN]" in sys.stderr.getvalue())
00092 self.assert_("test 2" in sys.stderr.getvalue())
00093
00094 sys.stderr = StringIO()
00095 rospy.logerr("test 3")
00096 self.assert_("[ERROR]" in sys.stderr.getvalue())
00097 self.assert_("test 3" in sys.stderr.getvalue())
00098
00099 sys.stderr = StringIO()
00100 rospy.logfatal("test 4")
00101 self.assert_("[FATAL]" in sys.stderr.getvalue())
00102 self.assert_("test 4" in sys.stderr.getvalue())
00103
00104
00105 for i in range(3):
00106 sys.stdout = StringIO()
00107 rospy.loginfo_throttle(3, "test 1")
00108 if i == 0:
00109 self.assert_("test 1" in sys.stdout.getvalue())
00110 rospy.sleep(rospy.Duration(1))
00111 elif i == 1:
00112 self.assert_("" == sys.stdout.getvalue())
00113 rospy.sleep(rospy.Duration(2))
00114 else:
00115 self.assert_("test 1" in sys.stdout.getvalue())
00116
00117 for i in range(3):
00118 sys.stderr = StringIO()
00119 rospy.logwarn_throttle(3, "test 2")
00120 if i == 0:
00121 self.assert_("test 2" in sys.stderr.getvalue())
00122 rospy.sleep(rospy.Duration(1))
00123 elif i == 1:
00124 self.assert_("" == sys.stderr.getvalue())
00125 rospy.sleep(rospy.Duration(2))
00126 else:
00127 self.assert_("test 2" in sys.stderr.getvalue())
00128
00129 for i in range(3):
00130 sys.stderr = StringIO()
00131 rospy.logerr_throttle(3, "test 3")
00132 if i == 0:
00133 self.assert_("test 3" in sys.stderr.getvalue())
00134 rospy.sleep(rospy.Duration(1))
00135 elif i == 1:
00136 self.assert_("" == sys.stderr.getvalue())
00137 rospy.sleep(rospy.Duration(2))
00138 else:
00139 self.assert_("test 3" in sys.stderr.getvalue())
00140
00141 for i in range(3):
00142 sys.stderr = StringIO()
00143 rospy.logfatal_throttle(3, "test 4")
00144 if i == 0:
00145 self.assert_("test 4" in sys.stderr.getvalue())
00146 rospy.sleep(rospy.Duration(1))
00147 elif i == 1:
00148 self.assert_("" == sys.stderr.getvalue())
00149 rospy.sleep(rospy.Duration(2))
00150 else:
00151 self.assert_("test 4" in sys.stderr.getvalue())
00152 finally:
00153 sys.stdout = real_stdout
00154 sys.stderr = real_stderr
00155
00156 def test_wait_for_service(self):
00157
00158 import rospy
00159 import time
00160
00161
00162 def task1():
00163 rospy.wait_for_service('add_two_ints')
00164 timeout_t = time.time() + 5.
00165 t1 = TestTask(task1)
00166 t1.start()
00167 while not t1.done and time.time() < timeout_t:
00168 time.sleep(0.5)
00169 self.assert_(t1.success)
00170
00171
00172 def task2():
00173 rospy.wait_for_service('add_two_ints', timeout=1.)
00174 timeout_t = time.time() + 5.
00175 t2 = TestTask(task2)
00176 t2.start()
00177 while not t2.done and time.time() < timeout_t:
00178 time.sleep(0.5)
00179 self.assert_(t2.success)
00180
00181
00182 def task3():
00183
00184 rospy.wait_for_service('fake_service', timeout=0.3)
00185 timeout_t = time.time() + 2.
00186 t3 = TestTask(task3)
00187 t3.start()
00188 while not t3.done and time.time() < timeout_t:
00189 time.sleep(0.5)
00190 self.assert_(t3.done)
00191 self.failIf(t3.success)
00192
00193 def test_ServiceProxy_wait_for_service(self):
00194 """
00195 Test ServiceProxy.wait_for_service
00196 """
00197
00198 import rospy
00199 import time
00200 import test_rosmaster.srv
00201
00202
00203 proxy = rospy.ServiceProxy('add_two_ints', test_rosmaster.srv.AddTwoInts)
00204 class ProxyTask(object):
00205 def __init__(self, proxy, timeout=None):
00206 self.proxy = proxy
00207 self.timeout = timeout
00208 def __call__(self):
00209 if self.timeout is None:
00210 self.proxy.wait_for_service()
00211 else:
00212 self.proxy.wait_for_service(timeout=self.timeout)
00213 timeout_t = time.time() + 5.
00214 t1 = TestTask(ProxyTask(proxy))
00215 t1.start()
00216 while not t1.done and time.time() < timeout_t:
00217 time.sleep(0.5)
00218 self.assert_(t1.success)
00219
00220
00221 timeout_t = time.time() + 5.
00222 t2 = TestTask(ProxyTask(proxy, timeout=1.))
00223 t2.start()
00224 while not t2.done and time.time() < timeout_t:
00225 time.sleep(0.5)
00226 self.assert_(t2.success)
00227
00228
00229 fake_proxy = rospy.ServiceProxy('fake_service', test_rosmaster.srv.AddTwoInts)
00230 timeout_t = time.time() + 2.
00231 t3 = TestTask(ProxyTask(fake_proxy, timeout=0.1))
00232 t3.start()
00233 while not t3.done and time.time() < timeout_t:
00234 time.sleep(0.5)
00235 self.assert_(t3.done)
00236 self.failIf(t3.success)
00237
00238 def test_sleep(self):
00239 import rospy
00240 import time
00241 t = time.time()
00242 rospy.sleep(0.1)
00243 dur = time.time() - t
00244
00245
00246
00247 self.assert_(abs(dur - 0.1) < 0.03, dur)
00248
00249 t = time.time()
00250 rospy.sleep(rospy.Duration.from_sec(0.1))
00251 dur = time.time() - t
00252
00253 self.assert_(abs(dur - 0.1) < 0.03, dur)
00254
00255
00256 t = time.time()
00257 rospy.sleep(rospy.Duration.from_sec(-10.))
00258 dur = time.time() - t
00259
00260 self.assert_(abs(dur) < 0.1, dur)
00261
00262 def test_Rate(self):
00263 import rospy
00264 import time
00265 t = time.time()
00266 count = 0
00267 r = rospy.Rate(10)
00268 for x in range(10):
00269 r.sleep()
00270 dur = time.time() - t
00271
00272 self.assert_(abs(dur - 1.0) < 0.5, dur)
00273
00274
00275 def test_param_server(self):
00276
00277 import rospy
00278
00279 try:
00280 rospy.get_param('not_a_param')
00281 self.fail("should have raised KeyError")
00282 except KeyError: pass
00283 self.assertEquals('default_val', rospy.get_param('not_a_param', 'default_val') )
00284
00285 p = rospy.get_param('/param')
00286 self.assertEquals("value", p)
00287 p = rospy.get_param('param')
00288 self.assertEquals("value", p)
00289 p = rospy.get_param('/group/param')
00290 self.assertEquals("group_value", p)
00291 p = rospy.get_param('group/param')
00292 self.assertEquals("group_value", p)
00293
00294 self.assertEquals('/param', rospy.search_param('param'))
00295
00296 names = rospy.get_param_names()
00297 self.assert_('/param' in names)
00298 self.assert_('/group/param' in names)
00299
00300 for p in ['/param', 'param', 'group/param', '/group/param']:
00301 self.assert_(rospy.has_param(p))
00302
00303 rospy.set_param('param2', 'value2')
00304 self.assert_(rospy.has_param('param2'))
00305 self.assertEquals('value2', rospy.get_param('param2'))
00306 rospy.delete_param('param2')
00307 self.failIf(rospy.has_param('param2'))
00308 try:
00309 rospy.get_param('param2')
00310 self.fail("should have raised KeyError")
00311 except KeyError: pass
00312
00313 def test_wait_for_message(self):
00314
00315 import rospy
00316 import std_msgs.msg
00317 import time
00318
00319
00320 def task1():
00321 return rospy.wait_for_message('chatter', std_msgs.msg.String)
00322 timeout_t = time.time() + 5.
00323 t1 = TestTask(task1)
00324 t1.start()
00325 while not t1.done and time.time() < timeout_t:
00326 time.sleep(0.5)
00327 self.assert_(t1.success)
00328 self.assert_('hello' in t1.value.data)
00329
00330
00331 def task2():
00332 return rospy.wait_for_message('chatter', std_msgs.msg.String, timeout=2.)
00333 timeout_t = time.time() + 5.
00334 t2 = TestTask(task2)
00335 t2.start()
00336 while not t2.done and time.time() < timeout_t:
00337 time.sleep(0.5)
00338 self.assert_(t2.success)
00339 self.assert_('hello' in t2.value.data)
00340
00341
00342 def task3():
00343
00344 return rospy.wait_for_message('fake_topic', std_msgs.msg.String, timeout=.3)
00345 timeout_t = time.time() + 2.
00346 t3 = TestTask(task3)
00347 t3.start()
00348 while not t3.done and time.time() < timeout_t:
00349 time.sleep(0.5)
00350 self.failIf(t3.success)
00351 self.assert_(t3.done)
00352 self.assert_(t3.value is None)
00353
00354 if __name__ == '__main__':
00355 rosunit.unitrun('test_rospy', sys.argv[0], TestRospyClientOnline, coverage_packages=['rospy.client', 'rospy.msproxy'])