test_rospy_client_online.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2008, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 
00034 import os
00035 import sys
00036 import struct
00037 import unittest
00038 import time
00039 import random
00040 
00041 import rosunit
00042 
00043 from threading import Thread
00044 class TestTask(Thread):
00045     def __init__(self, task):
00046         Thread.__init__(self)
00047         self.task = task
00048         self.success = False
00049         self.done = False
00050         self.value = None
00051         
00052     def run(self):
00053         try:
00054             print("STARTING TASK")
00055             self.value = self.task()
00056             print("TASK HAS COMPLETED")
00057             self.success = True
00058         except:
00059             import traceback
00060             traceback.print_exc()
00061         self.done = True
00062 
00063 class TestRospyClientOnline(unittest.TestCase):
00064     
00065     def __init__(self, *args):
00066         unittest.TestCase.__init__(self, *args)
00067         import rospy
00068         rospy.init_node('test_rospy_online')
00069 
00070     def test_log(self):
00071         # we can't do anything fancy here like capture stdout as rospy
00072         # grabs the streams before we do. Instead, just make sure the
00073         # routines don't crash.
00074         
00075         real_stdout = sys.stdout
00076         real_stderr = sys.stderr
00077 
00078         try:
00079             try:
00080                 from cStringIO import StringIO
00081             except ImportError:
00082                 from io import StringIO
00083             sys.stdout = StringIO()
00084             sys.stderr = StringIO()
00085 
00086             import rospy
00087             rospy.loginfo("test 1")
00088             self.assert_("test 1" in sys.stdout.getvalue())
00089             
00090             rospy.logwarn("test 2")            
00091             self.assert_("[WARN]" in sys.stderr.getvalue())
00092             self.assert_("test 2" in sys.stderr.getvalue())
00093 
00094             sys.stderr = StringIO()
00095             rospy.logerr("test 3")            
00096             self.assert_("[ERROR]" in sys.stderr.getvalue())
00097             self.assert_("test 3" in sys.stderr.getvalue())
00098             
00099             sys.stderr = StringIO()
00100             rospy.logfatal("test 4")            
00101             self.assert_("[FATAL]" in sys.stderr.getvalue())            
00102             self.assert_("test 4" in sys.stderr.getvalue())            
00103 
00104             # logXXX_throttle
00105             for i in range(3):
00106                 sys.stdout = StringIO()
00107                 rospy.loginfo_throttle(3, "test 1")
00108                 if i == 0:
00109                     self.assert_("test 1" in sys.stdout.getvalue())
00110                     rospy.sleep(rospy.Duration(1))
00111                 elif i == 1:
00112                     self.assert_("" == sys.stdout.getvalue())
00113                     rospy.sleep(rospy.Duration(2))
00114                 else:
00115                     self.assert_("test 1" in sys.stdout.getvalue())
00116 
00117             for i in range(3):
00118                 sys.stderr = StringIO()
00119                 rospy.logwarn_throttle(3, "test 2")
00120                 if i == 0:
00121                     self.assert_("test 2" in sys.stderr.getvalue())
00122                     rospy.sleep(rospy.Duration(1))
00123                 elif i == 1:
00124                     self.assert_("" == sys.stderr.getvalue())
00125                     rospy.sleep(rospy.Duration(2))
00126                 else:
00127                     self.assert_("test 2" in sys.stderr.getvalue())
00128 
00129             for i in range(3):
00130                 sys.stderr = StringIO()
00131                 rospy.logerr_throttle(3, "test 3")
00132                 if i == 0:
00133                     self.assert_("test 3" in sys.stderr.getvalue())
00134                     rospy.sleep(rospy.Duration(1))
00135                 elif i == 1:
00136                     self.assert_("" == sys.stderr.getvalue())
00137                     rospy.sleep(rospy.Duration(2))
00138                 else:
00139                     self.assert_("test 3" in sys.stderr.getvalue())
00140 
00141             for i in range(3):
00142                 sys.stderr = StringIO()
00143                 rospy.logfatal_throttle(3, "test 4")
00144                 if i == 0:
00145                     self.assert_("test 4" in sys.stderr.getvalue())
00146                     rospy.sleep(rospy.Duration(1))
00147                 elif i == 1:
00148                     self.assert_("" == sys.stderr.getvalue())
00149                     rospy.sleep(rospy.Duration(2))
00150                 else:
00151                     self.assert_("test 4" in sys.stderr.getvalue())
00152         finally:
00153             sys.stdout = real_stdout
00154             sys.stderr = real_stderr
00155         
00156     def test_wait_for_service(self):
00157         # lazy-import for coverage
00158         import rospy
00159         import time
00160 
00161         # test wait for service in success case        
00162         def task1():
00163             rospy.wait_for_service('add_two_ints')
00164         timeout_t = time.time() + 5.
00165         t1 = TestTask(task1)
00166         t1.start()
00167         while not t1.done and time.time() < timeout_t:
00168             time.sleep(0.5)
00169         self.assert_(t1.success)
00170         
00171         # test wait for service with timeout in success case
00172         def task2():
00173             rospy.wait_for_service('add_two_ints', timeout=1.)
00174         timeout_t = time.time() + 5.        
00175         t2 = TestTask(task2)        
00176         t2.start()
00177         while not t2.done and time.time() < timeout_t:
00178             time.sleep(0.5)
00179         self.assert_(t2.success)
00180 
00181         # test wait for service in failure case
00182         def task3():
00183             # #2842 raising bounds from .1 to .3 for amazon VM            
00184             rospy.wait_for_service('fake_service', timeout=0.3)
00185         timeout_t = time.time() + 2.        
00186         t3 = TestTask(task3)        
00187         t3.start()
00188         while not t3.done and time.time() < timeout_t:
00189             time.sleep(0.5)
00190         self.assert_(t3.done)
00191         self.failIf(t3.success)
00192     
00193     def test_ServiceProxy_wait_for_service(self):
00194         """
00195         Test ServiceProxy.wait_for_service
00196         """
00197         # lazy-import for coverage
00198         import rospy
00199         import time
00200         import test_rosmaster.srv
00201 
00202         # test wait for service in success case        
00203         proxy = rospy.ServiceProxy('add_two_ints', test_rosmaster.srv.AddTwoInts)
00204         class ProxyTask(object):
00205             def __init__(self, proxy, timeout=None):
00206                 self.proxy = proxy
00207                 self.timeout = timeout
00208             def __call__(self):
00209                 if self.timeout is None:
00210                     self.proxy.wait_for_service()
00211                 else:
00212                     self.proxy.wait_for_service(timeout=self.timeout)
00213         timeout_t = time.time() + 5.
00214         t1 = TestTask(ProxyTask(proxy))
00215         t1.start()
00216         while not t1.done and time.time() < timeout_t:
00217             time.sleep(0.5)
00218         self.assert_(t1.success)
00219         
00220         # test wait for service with timeout in success case
00221         timeout_t = time.time() + 5.        
00222         t2 = TestTask(ProxyTask(proxy, timeout=1.))
00223         t2.start()
00224         while not t2.done and time.time() < timeout_t:
00225             time.sleep(0.5)
00226         self.assert_(t2.success)
00227 
00228         # test wait for service in failure case
00229         fake_proxy = rospy.ServiceProxy('fake_service', test_rosmaster.srv.AddTwoInts)
00230         timeout_t = time.time() + 2.        
00231         t3 = TestTask(ProxyTask(fake_proxy, timeout=0.1))        
00232         t3.start()
00233         while not t3.done and time.time() < timeout_t:
00234             time.sleep(0.5)
00235         self.assert_(t3.done)
00236         self.failIf(t3.success)
00237 
00238     def test_sleep(self):
00239         import rospy
00240         import time
00241         t = time.time()
00242         rospy.sleep(0.1)
00243         dur = time.time() - t
00244         # #2842 raising bounds from .01 to .03 for amazon VM 
00245 
00246         # make sure sleep is approximately right
00247         self.assert_(abs(dur - 0.1) < 0.03, dur)
00248 
00249         t = time.time()
00250         rospy.sleep(rospy.Duration.from_sec(0.1))
00251         dur = time.time() - t
00252         # make sure sleep is approximately right
00253         self.assert_(abs(dur - 0.1) < 0.03, dur)
00254 
00255         # sleep for neg duration
00256         t = time.time()
00257         rospy.sleep(rospy.Duration.from_sec(-10.))
00258         dur = time.time() - t
00259         # make sure returned immediately
00260         self.assert_(abs(dur) < 0.1, dur)
00261 
00262     def test_Rate(self):
00263         import rospy
00264         import time
00265         t = time.time()
00266         count = 0
00267         r = rospy.Rate(10)
00268         for x in range(10):
00269             r.sleep()
00270         dur = time.time() - t
00271         # make sure sleep is approximately right
00272         self.assert_(abs(dur - 1.0) < 0.5, dur)
00273         
00274 
00275     def test_param_server(self):
00276         # this isn't a parameter server test, just checking that the rospy bindings work
00277         import rospy
00278 
00279         try:
00280             rospy.get_param('not_a_param')
00281             self.fail("should have raised KeyError")
00282         except KeyError: pass
00283         self.assertEquals('default_val', rospy.get_param('not_a_param', 'default_val') )
00284         
00285         p = rospy.get_param('/param')
00286         self.assertEquals("value", p)
00287         p = rospy.get_param('param')
00288         self.assertEquals("value", p)
00289         p = rospy.get_param('/group/param')
00290         self.assertEquals("group_value", p)
00291         p = rospy.get_param('group/param')
00292         self.assertEquals("group_value", p)
00293 
00294         self.assertEquals('/param', rospy.search_param('param'))
00295         
00296         names = rospy.get_param_names()
00297         self.assert_('/param' in names)
00298         self.assert_('/group/param' in names)        
00299 
00300         for p in ['/param', 'param', 'group/param', '/group/param']:
00301             self.assert_(rospy.has_param(p))
00302             
00303         rospy.set_param('param2', 'value2')
00304         self.assert_(rospy.has_param('param2'))
00305         self.assertEquals('value2', rospy.get_param('param2'))
00306         rospy.delete_param('param2')
00307         self.failIf(rospy.has_param('param2'))
00308         try:
00309             rospy.get_param('param2')
00310             self.fail("should have raised KeyError")
00311         except KeyError: pass
00312 
00313     def test_wait_for_message(self):
00314         # lazy-import for coverage
00315         import rospy
00316         import std_msgs.msg
00317         import time
00318 
00319         # test standard wait for message
00320         def task1():
00321             return rospy.wait_for_message('chatter', std_msgs.msg.String)
00322         timeout_t = time.time() + 5.
00323         t1 = TestTask(task1)
00324         t1.start()
00325         while not t1.done and time.time() < timeout_t:
00326             time.sleep(0.5)
00327         self.assert_(t1.success)
00328         self.assert_('hello' in t1.value.data)
00329 
00330         # test wait for message with timeout
00331         def task2():
00332             return rospy.wait_for_message('chatter', std_msgs.msg.String, timeout=2.)
00333         timeout_t = time.time() + 5.        
00334         t2 = TestTask(task2)        
00335         t2.start()
00336         while not t2.done and time.time() < timeout_t:
00337             time.sleep(0.5)
00338         self.assert_(t2.success)
00339         self.assert_('hello' in t2.value.data)
00340         
00341         # test wait for message with timeout FAILURE
00342         def task3():
00343             # #2842 raising bounds from .1 to .3 for amazon VM
00344             return rospy.wait_for_message('fake_topic', std_msgs.msg.String, timeout=.3)
00345         timeout_t = time.time() + 2.        
00346         t3 = TestTask(task3)        
00347         t3.start()
00348         while not t3.done and time.time() < timeout_t:
00349             time.sleep(0.5)
00350         self.failIf(t3.success)
00351         self.assert_(t3.done)
00352         self.assert_(t3.value is None)
00353     
00354 if __name__ == '__main__':
00355     rosunit.unitrun('test_rospy', sys.argv[0], TestRospyClientOnline, coverage_packages=['rospy.client', 'rospy.msproxy'])


test_rospy
Author(s): Ken Conley
autogenerated on Thu Jun 6 2019 21:10:57