$search
00001 #!/usr/bin/env python 00002 # Software License Agreement (BSD License) 00003 # 00004 # Copyright (c) 2008, Willow Garage, Inc. 00005 # All rights reserved. 00006 # 00007 # Redistribution and use in source and binary forms, with or without 00008 # modification, are permitted provided that the following conditions 00009 # are met: 00010 # 00011 # * Redistributions of source code must retain the above copyright 00012 # notice, this list of conditions and the following disclaimer. 00013 # * Redistributions in binary form must reproduce the above 00014 # copyright notice, this list of conditions and the following 00015 # disclaimer in the documentation and/or other materials provided 00016 # with the distribution. 00017 # * Neither the name of Willow Garage, Inc. nor the names of its 00018 # contributors may be used to endorse or promote products derived 00019 # from this software without specific prior written permission. 00020 # 00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 # POSSIBILITY OF SUCH DAMAGE. 00033 # 00034 # Revision $Id: test_rospy_core.py 4218 2009-04-16 21:09:31Z sfkwc $ 00035 00036 import roslib; roslib.load_manifest('test_rospy') 00037 00038 import os 00039 import sys 00040 import struct 00041 import unittest 00042 import time 00043 import random 00044 00045 import rostest 00046 00047 from threading import Thread 00048 class TestTask(Thread): 00049 def __init__(self, task): 00050 Thread.__init__(self) 00051 self.task = task 00052 self.success = False 00053 self.done = False 00054 self.value = None 00055 00056 def run(self): 00057 try: 00058 print "STARTING TASK" 00059 self.value = self.task() 00060 print "TASK HAS COMPLETED" 00061 self.success = True 00062 except: 00063 import traceback 00064 traceback.print_exc() 00065 self.done = True 00066 00067 class TestRospyClientOnline(unittest.TestCase): 00068 00069 def __init__(self, *args): 00070 unittest.TestCase.__init__(self, *args) 00071 import rospy 00072 rospy.init_node('test_rospy_online') 00073 00074 def test_log(self): 00075 # we can't do anything fancy here like capture stdout as rospy 00076 # grabs the streams before we do. Instead, just make sure the 00077 # routines don't crash. 00078 00079 real_stdout = sys.stdout 00080 real_stderr = sys.stderr 00081 00082 try: 00083 from cStringIO import StringIO 00084 sys.stdout = StringIO() 00085 sys.stderr = StringIO() 00086 00087 import rospy 00088 rospy.loginfo("test 1") 00089 self.assert_("test 1" in sys.stdout.getvalue()) 00090 00091 rospy.logwarn("test 2") 00092 self.assert_("[WARN]" in sys.stderr.getvalue()) 00093 self.assert_("test 2" in sys.stderr.getvalue()) 00094 00095 sys.stderr = StringIO() 00096 rospy.logerr("test 3") 00097 self.assert_("[ERROR]" in sys.stderr.getvalue()) 00098 self.assert_("test 3" in sys.stderr.getvalue()) 00099 00100 sys.stderr = StringIO() 00101 rospy.logfatal("test 4") 00102 self.assert_("[FATAL]" in sys.stderr.getvalue()) 00103 self.assert_("test 4" in sys.stderr.getvalue()) 00104 finally: 00105 sys.stdout = real_stdout 00106 sys.stderr = real_stderr 00107 00108 def test_wait_for_service(self): 00109 # lazy-import for coverage 00110 import rospy 00111 import time 00112 00113 # test wait for service in success case 00114 def task1(): 00115 rospy.wait_for_service('add_two_ints') 00116 timeout_t = time.time() + 5. 00117 t1 = TestTask(task1) 00118 t1.start() 00119 while not t1.done and time.time() < timeout_t: 00120 time.sleep(0.5) 00121 self.assert_(t1.success) 00122 00123 # test wait for service with timeout in success case 00124 def task2(): 00125 rospy.wait_for_service('add_two_ints', timeout=1.) 00126 timeout_t = time.time() + 5. 00127 t2 = TestTask(task2) 00128 t2.start() 00129 while not t2.done and time.time() < timeout_t: 00130 time.sleep(0.5) 00131 self.assert_(t2.success) 00132 00133 # test wait for service in failure case 00134 def task3(): 00135 # #2842 raising bounds from .1 to .3 for amazon VM 00136 rospy.wait_for_service('fake_service', timeout=0.3) 00137 timeout_t = time.time() + 2. 00138 t3 = TestTask(task3) 00139 t3.start() 00140 while not t3.done and time.time() < timeout_t: 00141 time.sleep(0.5) 00142 self.assert_(t3.done) 00143 self.failIf(t3.success) 00144 00145 def test_ServiceProxy_wait_for_service(self): 00146 """ 00147 Test ServiceProxy.wait_for_service 00148 """ 00149 # lazy-import for coverage 00150 import rospy 00151 import time 00152 import test_ros.srv 00153 00154 # test wait for service in success case 00155 proxy = rospy.ServiceProxy('add_two_ints', test_ros.srv.AddTwoInts) 00156 class ProxyTask(object): 00157 def __init__(self, proxy, timeout=None): 00158 self.proxy = proxy 00159 self.timeout = timeout 00160 def __call__(self): 00161 if self.timeout is None: 00162 self.proxy.wait_for_service() 00163 else: 00164 self.proxy.wait_for_service(timeout=self.timeout) 00165 timeout_t = time.time() + 5. 00166 t1 = TestTask(ProxyTask(proxy)) 00167 t1.start() 00168 while not t1.done and time.time() < timeout_t: 00169 time.sleep(0.5) 00170 self.assert_(t1.success) 00171 00172 # test wait for service with timeout in success case 00173 timeout_t = time.time() + 5. 00174 t2 = TestTask(ProxyTask(proxy, timeout=1.)) 00175 t2.start() 00176 while not t2.done and time.time() < timeout_t: 00177 time.sleep(0.5) 00178 self.assert_(t2.success) 00179 00180 # test wait for service in failure case 00181 fake_proxy = rospy.ServiceProxy('fake_service', test_ros.srv.AddTwoInts) 00182 timeout_t = time.time() + 2. 00183 t3 = TestTask(ProxyTask(fake_proxy, timeout=0.1)) 00184 t3.start() 00185 while not t3.done and time.time() < timeout_t: 00186 time.sleep(0.5) 00187 self.assert_(t3.done) 00188 self.failIf(t3.success) 00189 00190 def test_sleep(self): 00191 import rospy 00192 import time 00193 t = time.time() 00194 rospy.sleep(0.1) 00195 dur = time.time() - t 00196 # #2842 raising bounds from .01 to .03 for amazon VM 00197 00198 # make sure sleep is approximately right 00199 self.assert_(abs(dur - 0.1) < 0.03, dur) 00200 00201 t = time.time() 00202 rospy.sleep(rospy.Duration.from_sec(0.1)) 00203 dur = time.time() - t 00204 # make sure sleep is approximately right 00205 self.assert_(abs(dur - 0.1) < 0.03, dur) 00206 00207 # sleep for neg duration 00208 t = time.time() 00209 rospy.sleep(rospy.Duration.from_sec(-10.)) 00210 dur = time.time() - t 00211 # make sure returned immediately 00212 self.assert_(abs(dur) < 0.1, dur) 00213 00214 def test_Rate(self): 00215 import rospy 00216 import time 00217 t = time.time() 00218 count = 0 00219 r = rospy.Rate(10) 00220 for x in xrange(10): 00221 r.sleep() 00222 dur = time.time() - t 00223 # make sure sleep is approximately right 00224 self.assert_(abs(dur - 1.0) < 0.5, dur) 00225 00226 00227 def test_param_server(self): 00228 # this isn't a parameter server test, just checking that the rospy bindings work 00229 import rospy 00230 00231 try: 00232 rospy.get_param('not_a_param') 00233 self.fail("should have raised KeyError") 00234 except KeyError: pass 00235 self.assertEquals('default_val', rospy.get_param('not_a_param', 'default_val') ) 00236 00237 p = rospy.get_param('/param') 00238 self.assertEquals("value", p) 00239 p = rospy.get_param('param') 00240 self.assertEquals("value", p) 00241 p = rospy.get_param('/group/param') 00242 self.assertEquals("group_value", p) 00243 p = rospy.get_param('group/param') 00244 self.assertEquals("group_value", p) 00245 00246 self.assertEquals('/param', rospy.search_param('param')) 00247 00248 names = rospy.get_param_names() 00249 self.assert_('/param' in names) 00250 self.assert_('/group/param' in names) 00251 00252 for p in ['/param', 'param', 'group/param', '/group/param']: 00253 self.assert_(rospy.has_param(p)) 00254 00255 rospy.set_param('param2', 'value2') 00256 self.assert_(rospy.has_param('param2')) 00257 self.assertEquals('value2', rospy.get_param('param2')) 00258 rospy.delete_param('param2') 00259 self.failIf(rospy.has_param('param2')) 00260 try: 00261 rospy.get_param('param2') 00262 self.fail("should have raised KeyError") 00263 except KeyError: pass 00264 00265 def test_wait_for_message(self): 00266 # lazy-import for coverage 00267 import rospy 00268 import std_msgs.msg 00269 import time 00270 00271 # test standard wait for message 00272 def task1(): 00273 return rospy.wait_for_message('chatter', std_msgs.msg.String) 00274 timeout_t = time.time() + 5. 00275 t1 = TestTask(task1) 00276 t1.start() 00277 while not t1.done and time.time() < timeout_t: 00278 time.sleep(0.5) 00279 self.assert_(t1.success) 00280 self.assert_('hello' in t1.value.data) 00281 00282 # test wait for message with timeout 00283 def task2(): 00284 return rospy.wait_for_message('chatter', std_msgs.msg.String, timeout=2.) 00285 timeout_t = time.time() + 5. 00286 t2 = TestTask(task2) 00287 t2.start() 00288 while not t2.done and time.time() < timeout_t: 00289 time.sleep(0.5) 00290 self.assert_(t2.success) 00291 self.assert_('hello' in t2.value.data) 00292 00293 # test wait for message with timeout FAILURE 00294 def task3(): 00295 # #2842 raising bounds from .1 to .3 for amazon VM 00296 return rospy.wait_for_message('fake_topic', std_msgs.msg.String, timeout=.3) 00297 timeout_t = time.time() + 2. 00298 t3 = TestTask(task3) 00299 t3.start() 00300 while not t3.done and time.time() < timeout_t: 00301 time.sleep(0.5) 00302 self.failIf(t3.success) 00303 self.assert_(t3.done) 00304 self.assert_(t3.value is None) 00305 00306 if __name__ == '__main__': 00307 rostest.unitrun('test_rospy', sys.argv[0], TestRospyClientOnline, coverage_packages=['rospy.client', 'rospy.msproxy'])