Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 import os
00035 import sys
00036 import struct
00037 import unittest
00038 import time
00039 import random
00040
00041 import rosunit
00042
00043 from threading import Thread
00044 class TestTask(Thread):
00045 def __init__(self, task):
00046 Thread.__init__(self)
00047 self.task = task
00048 self.success = False
00049 self.done = False
00050 self.value = None
00051
00052 def run(self):
00053 try:
00054 print "STARTING TASK"
00055 self.value = self.task()
00056 print "TASK HAS COMPLETED"
00057 self.success = True
00058 except:
00059 import traceback
00060 traceback.print_exc()
00061 self.done = True
00062
00063 class TestRospyClientOnline(unittest.TestCase):
00064
00065 def __init__(self, *args):
00066 unittest.TestCase.__init__(self, *args)
00067 import rospy
00068 rospy.init_node('test_rospy_online')
00069
00070 def test_log(self):
00071
00072
00073
00074
00075 real_stdout = sys.stdout
00076 real_stderr = sys.stderr
00077
00078 try:
00079 from cStringIO import StringIO
00080 sys.stdout = StringIO()
00081 sys.stderr = StringIO()
00082
00083 import rospy
00084 rospy.loginfo("test 1")
00085 self.assert_("test 1" in sys.stdout.getvalue())
00086
00087 rospy.logwarn("test 2")
00088 self.assert_("[WARN]" in sys.stderr.getvalue())
00089 self.assert_("test 2" in sys.stderr.getvalue())
00090
00091 sys.stderr = StringIO()
00092 rospy.logerr("test 3")
00093 self.assert_("[ERROR]" in sys.stderr.getvalue())
00094 self.assert_("test 3" in sys.stderr.getvalue())
00095
00096 sys.stderr = StringIO()
00097 rospy.logfatal("test 4")
00098 self.assert_("[FATAL]" in sys.stderr.getvalue())
00099 self.assert_("test 4" in sys.stderr.getvalue())
00100 finally:
00101 sys.stdout = real_stdout
00102 sys.stderr = real_stderr
00103
00104 def test_wait_for_service(self):
00105
00106 import rospy
00107 import time
00108
00109
00110 def task1():
00111 rospy.wait_for_service('add_two_ints')
00112 timeout_t = time.time() + 5.
00113 t1 = TestTask(task1)
00114 t1.start()
00115 while not t1.done and time.time() < timeout_t:
00116 time.sleep(0.5)
00117 self.assert_(t1.success)
00118
00119
00120 def task2():
00121 rospy.wait_for_service('add_two_ints', timeout=1.)
00122 timeout_t = time.time() + 5.
00123 t2 = TestTask(task2)
00124 t2.start()
00125 while not t2.done and time.time() < timeout_t:
00126 time.sleep(0.5)
00127 self.assert_(t2.success)
00128
00129
00130 def task3():
00131
00132 rospy.wait_for_service('fake_service', timeout=0.3)
00133 timeout_t = time.time() + 2.
00134 t3 = TestTask(task3)
00135 t3.start()
00136 while not t3.done and time.time() < timeout_t:
00137 time.sleep(0.5)
00138 self.assert_(t3.done)
00139 self.failIf(t3.success)
00140
00141 def test_ServiceProxy_wait_for_service(self):
00142 """
00143 Test ServiceProxy.wait_for_service
00144 """
00145
00146 import rospy
00147 import time
00148 import test_ros.srv
00149
00150
00151 proxy = rospy.ServiceProxy('add_two_ints', test_ros.srv.AddTwoInts)
00152 class ProxyTask(object):
00153 def __init__(self, proxy, timeout=None):
00154 self.proxy = proxy
00155 self.timeout = timeout
00156 def __call__(self):
00157 if self.timeout is None:
00158 self.proxy.wait_for_service()
00159 else:
00160 self.proxy.wait_for_service(timeout=self.timeout)
00161 timeout_t = time.time() + 5.
00162 t1 = TestTask(ProxyTask(proxy))
00163 t1.start()
00164 while not t1.done and time.time() < timeout_t:
00165 time.sleep(0.5)
00166 self.assert_(t1.success)
00167
00168
00169 timeout_t = time.time() + 5.
00170 t2 = TestTask(ProxyTask(proxy, timeout=1.))
00171 t2.start()
00172 while not t2.done and time.time() < timeout_t:
00173 time.sleep(0.5)
00174 self.assert_(t2.success)
00175
00176
00177 fake_proxy = rospy.ServiceProxy('fake_service', test_ros.srv.AddTwoInts)
00178 timeout_t = time.time() + 2.
00179 t3 = TestTask(ProxyTask(fake_proxy, timeout=0.1))
00180 t3.start()
00181 while not t3.done and time.time() < timeout_t:
00182 time.sleep(0.5)
00183 self.assert_(t3.done)
00184 self.failIf(t3.success)
00185
00186 def test_sleep(self):
00187 import rospy
00188 import time
00189 t = time.time()
00190 rospy.sleep(0.1)
00191 dur = time.time() - t
00192
00193
00194
00195 self.assert_(abs(dur - 0.1) < 0.03, dur)
00196
00197 t = time.time()
00198 rospy.sleep(rospy.Duration.from_sec(0.1))
00199 dur = time.time() - t
00200
00201 self.assert_(abs(dur - 0.1) < 0.03, dur)
00202
00203
00204 t = time.time()
00205 rospy.sleep(rospy.Duration.from_sec(-10.))
00206 dur = time.time() - t
00207
00208 self.assert_(abs(dur) < 0.1, dur)
00209
00210 def test_Rate(self):
00211 import rospy
00212 import time
00213 t = time.time()
00214 count = 0
00215 r = rospy.Rate(10)
00216 for x in xrange(10):
00217 r.sleep()
00218 dur = time.time() - t
00219
00220 self.assert_(abs(dur - 1.0) < 0.5, dur)
00221
00222
00223 def test_param_server(self):
00224
00225 import rospy
00226
00227 try:
00228 rospy.get_param('not_a_param')
00229 self.fail("should have raised KeyError")
00230 except KeyError: pass
00231 self.assertEquals('default_val', rospy.get_param('not_a_param', 'default_val') )
00232
00233 p = rospy.get_param('/param')
00234 self.assertEquals("value", p)
00235 p = rospy.get_param('param')
00236 self.assertEquals("value", p)
00237 p = rospy.get_param('/group/param')
00238 self.assertEquals("group_value", p)
00239 p = rospy.get_param('group/param')
00240 self.assertEquals("group_value", p)
00241
00242 self.assertEquals('/param', rospy.search_param('param'))
00243
00244 names = rospy.get_param_names()
00245 self.assert_('/param' in names)
00246 self.assert_('/group/param' in names)
00247
00248 for p in ['/param', 'param', 'group/param', '/group/param']:
00249 self.assert_(rospy.has_param(p))
00250
00251 rospy.set_param('param2', 'value2')
00252 self.assert_(rospy.has_param('param2'))
00253 self.assertEquals('value2', rospy.get_param('param2'))
00254 rospy.delete_param('param2')
00255 self.failIf(rospy.has_param('param2'))
00256 try:
00257 rospy.get_param('param2')
00258 self.fail("should have raised KeyError")
00259 except KeyError: pass
00260
00261 def test_wait_for_message(self):
00262
00263 import rospy
00264 import std_msgs.msg
00265 import time
00266
00267
00268 def task1():
00269 return rospy.wait_for_message('chatter', std_msgs.msg.String)
00270 timeout_t = time.time() + 5.
00271 t1 = TestTask(task1)
00272 t1.start()
00273 while not t1.done and time.time() < timeout_t:
00274 time.sleep(0.5)
00275 self.assert_(t1.success)
00276 self.assert_('hello' in t1.value.data)
00277
00278
00279 def task2():
00280 return rospy.wait_for_message('chatter', std_msgs.msg.String, timeout=2.)
00281 timeout_t = time.time() + 5.
00282 t2 = TestTask(task2)
00283 t2.start()
00284 while not t2.done and time.time() < timeout_t:
00285 time.sleep(0.5)
00286 self.assert_(t2.success)
00287 self.assert_('hello' in t2.value.data)
00288
00289
00290 def task3():
00291
00292 return rospy.wait_for_message('fake_topic', std_msgs.msg.String, timeout=.3)
00293 timeout_t = time.time() + 2.
00294 t3 = TestTask(task3)
00295 t3.start()
00296 while not t3.done and time.time() < timeout_t:
00297 time.sleep(0.5)
00298 self.failIf(t3.success)
00299 self.assert_(t3.done)
00300 self.assert_(t3.value is None)
00301
00302 if __name__ == '__main__':
00303 rosunit.unitrun('test_rospy', sys.argv[0], TestRospyClientOnline, coverage_packages=['rospy.client', 'rospy.msproxy'])