00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 import roslib; roslib.load_manifest('test_rospy')
00037
00038 import os
00039 import sys
00040 import struct
00041 import unittest
00042 import time
00043 import random
00044
00045 import rostest
00046
00047 from threading import Thread
00048 class TestTask(Thread):
00049 def __init__(self, task):
00050 Thread.__init__(self)
00051 self.task = task
00052 self.success = False
00053 self.done = False
00054 self.value = None
00055
00056 def run(self):
00057 try:
00058 print "STARTING TASK"
00059 self.value = self.task()
00060 print "TASK HAS COMPLETED"
00061 self.success = True
00062 except:
00063 import traceback
00064 traceback.print_exc()
00065 self.done = True
00066
00067 class TestRospyClientOnline(unittest.TestCase):
00068
00069 def __init__(self, *args):
00070 unittest.TestCase.__init__(self, *args)
00071 import rospy
00072 rospy.init_node('test_rospy_online')
00073
00074 def test_log(self):
00075
00076
00077
00078
00079 real_stdout = sys.stdout
00080 real_stderr = sys.stderr
00081
00082 try:
00083 from cStringIO import StringIO
00084 sys.stdout = StringIO()
00085 sys.stderr = StringIO()
00086
00087 import rospy
00088 rospy.loginfo("test 1")
00089 self.assert_("test 1" in sys.stdout.getvalue())
00090
00091 rospy.logwarn("test 2")
00092 self.assert_("[WARN]" in sys.stderr.getvalue())
00093 self.assert_("test 2" in sys.stderr.getvalue())
00094
00095 sys.stderr = StringIO()
00096 rospy.logerr("test 3")
00097 self.assert_("[ERROR]" in sys.stderr.getvalue())
00098 self.assert_("test 3" in sys.stderr.getvalue())
00099
00100 sys.stderr = StringIO()
00101 rospy.logfatal("test 4")
00102 self.assert_("[FATAL]" in sys.stderr.getvalue())
00103 self.assert_("test 4" in sys.stderr.getvalue())
00104 finally:
00105 sys.stdout = real_stdout
00106 sys.stderr = real_stderr
00107
00108 def test_wait_for_service(self):
00109
00110 import rospy
00111 import time
00112
00113
00114 def task1():
00115 rospy.wait_for_service('add_two_ints')
00116 timeout_t = time.time() + 5.
00117 t1 = TestTask(task1)
00118 t1.start()
00119 while not t1.done and time.time() < timeout_t:
00120 time.sleep(0.5)
00121 self.assert_(t1.success)
00122
00123
00124 def task2():
00125 rospy.wait_for_service('add_two_ints', timeout=1.)
00126 timeout_t = time.time() + 5.
00127 t2 = TestTask(task2)
00128 t2.start()
00129 while not t2.done and time.time() < timeout_t:
00130 time.sleep(0.5)
00131 self.assert_(t2.success)
00132
00133
00134 def task3():
00135
00136 rospy.wait_for_service('fake_service', timeout=0.3)
00137 timeout_t = time.time() + 2.
00138 t3 = TestTask(task3)
00139 t3.start()
00140 while not t3.done and time.time() < timeout_t:
00141 time.sleep(0.5)
00142 self.assert_(t3.done)
00143 self.failIf(t3.success)
00144
00145 def test_ServiceProxy_wait_for_service(self):
00146 """
00147 Test ServiceProxy.wait_for_service
00148 """
00149
00150 import rospy
00151 import time
00152 import test_ros.srv
00153
00154
00155 proxy = rospy.ServiceProxy('add_two_ints', test_ros.srv.AddTwoInts)
00156 class ProxyTask(object):
00157 def __init__(self, proxy, timeout=None):
00158 self.proxy = proxy
00159 self.timeout = timeout
00160 def __call__(self):
00161 if self.timeout is None:
00162 self.proxy.wait_for_service()
00163 else:
00164 self.proxy.wait_for_service(timeout=self.timeout)
00165 timeout_t = time.time() + 5.
00166 t1 = TestTask(ProxyTask(proxy))
00167 t1.start()
00168 while not t1.done and time.time() < timeout_t:
00169 time.sleep(0.5)
00170 self.assert_(t1.success)
00171
00172
00173 timeout_t = time.time() + 5.
00174 t2 = TestTask(ProxyTask(proxy, timeout=1.))
00175 t2.start()
00176 while not t2.done and time.time() < timeout_t:
00177 time.sleep(0.5)
00178 self.assert_(t2.success)
00179
00180
00181 fake_proxy = rospy.ServiceProxy('fake_service', test_ros.srv.AddTwoInts)
00182 timeout_t = time.time() + 2.
00183 t3 = TestTask(ProxyTask(fake_proxy, timeout=0.1))
00184 t3.start()
00185 while not t3.done and time.time() < timeout_t:
00186 time.sleep(0.5)
00187 self.assert_(t3.done)
00188 self.failIf(t3.success)
00189
00190 def test_sleep(self):
00191 import rospy
00192 import time
00193 t = time.time()
00194 rospy.sleep(0.1)
00195 dur = time.time() - t
00196
00197
00198
00199 self.assert_(abs(dur - 0.1) < 0.03, dur)
00200
00201 t = time.time()
00202 rospy.sleep(rospy.Duration.from_sec(0.1))
00203 dur = time.time() - t
00204
00205 self.assert_(abs(dur - 0.1) < 0.03, dur)
00206
00207
00208 t = time.time()
00209 rospy.sleep(rospy.Duration.from_sec(-10.))
00210 dur = time.time() - t
00211
00212 self.assert_(abs(dur) < 0.1, dur)
00213
00214 def test_Rate(self):
00215 import rospy
00216 import time
00217 t = time.time()
00218 count = 0
00219 r = rospy.Rate(10)
00220 for x in xrange(10):
00221 r.sleep()
00222 dur = time.time() - t
00223
00224 self.assert_(abs(dur - 1.0) < 0.5, dur)
00225
00226
00227 def test_param_server(self):
00228
00229 import rospy
00230
00231 try:
00232 rospy.get_param('not_a_param')
00233 self.fail("should have raised KeyError")
00234 except KeyError: pass
00235 self.assertEquals('default_val', rospy.get_param('not_a_param', 'default_val') )
00236
00237 p = rospy.get_param('/param')
00238 self.assertEquals("value", p)
00239 p = rospy.get_param('param')
00240 self.assertEquals("value", p)
00241 p = rospy.get_param('/group/param')
00242 self.assertEquals("group_value", p)
00243 p = rospy.get_param('group/param')
00244 self.assertEquals("group_value", p)
00245
00246 self.assertEquals('/param', rospy.search_param('param'))
00247
00248 names = rospy.get_param_names()
00249 self.assert_('/param' in names)
00250 self.assert_('/group/param' in names)
00251
00252 for p in ['/param', 'param', 'group/param', '/group/param']:
00253 self.assert_(rospy.has_param(p))
00254
00255 rospy.set_param('param2', 'value2')
00256 self.assert_(rospy.has_param('param2'))
00257 self.assertEquals('value2', rospy.get_param('param2'))
00258 rospy.delete_param('param2')
00259 self.failIf(rospy.has_param('param2'))
00260 try:
00261 rospy.get_param('param2')
00262 self.fail("should have raised KeyError")
00263 except KeyError: pass
00264
00265 def test_wait_for_message(self):
00266
00267 import rospy
00268 import std_msgs.msg
00269 import time
00270
00271
00272 def task1():
00273 return rospy.wait_for_message('chatter', std_msgs.msg.String)
00274 timeout_t = time.time() + 5.
00275 t1 = TestTask(task1)
00276 t1.start()
00277 while not t1.done and time.time() < timeout_t:
00278 time.sleep(0.5)
00279 self.assert_(t1.success)
00280 self.assert_('hello' in t1.value.data)
00281
00282
00283 def task2():
00284 return rospy.wait_for_message('chatter', std_msgs.msg.String, timeout=2.)
00285 timeout_t = time.time() + 5.
00286 t2 = TestTask(task2)
00287 t2.start()
00288 while not t2.done and time.time() < timeout_t:
00289 time.sleep(0.5)
00290 self.assert_(t2.success)
00291 self.assert_('hello' in t2.value.data)
00292
00293
00294 def task3():
00295
00296 return rospy.wait_for_message('fake_topic', std_msgs.msg.String, timeout=.3)
00297 timeout_t = time.time() + 2.
00298 t3 = TestTask(task3)
00299 t3.start()
00300 while not t3.done and time.time() < timeout_t:
00301 time.sleep(0.5)
00302 self.failIf(t3.success)
00303 self.assert_(t3.done)
00304 self.assert_(t3.value is None)
00305
00306 if __name__ == '__main__':
00307 rostest.unitrun('test_rospy', sys.argv[0], TestRospyClientOnline, coverage_packages=['rospy.client', 'rospy.msproxy'])