test_rospy_client_online.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2008, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 
00034 import os
00035 import sys
00036 import struct
00037 import unittest
00038 import time
00039 import random
00040 
00041 import rosunit
00042 
00043 from threading import Thread
00044 class TestTask(Thread):
00045     def __init__(self, task):
00046         Thread.__init__(self)
00047         self.task = task
00048         self.success = False
00049         self.done = False
00050         self.value = None
00051         
00052     def run(self):
00053         try:
00054             print "STARTING TASK"
00055             self.value = self.task()
00056             print "TASK HAS COMPLETED"
00057             self.success = True
00058         except:
00059             import traceback
00060             traceback.print_exc()
00061         self.done = True
00062 
00063 class TestRospyClientOnline(unittest.TestCase):
00064     
00065     def __init__(self, *args):
00066         unittest.TestCase.__init__(self, *args)
00067         import rospy
00068         rospy.init_node('test_rospy_online')
00069 
00070     def test_log(self):
00071         # we can't do anything fancy here like capture stdout as rospy
00072         # grabs the streams before we do. Instead, just make sure the
00073         # routines don't crash.
00074         
00075         real_stdout = sys.stdout
00076         real_stderr = sys.stderr
00077 
00078         try:
00079             from cStringIO import StringIO
00080             sys.stdout = StringIO()
00081             sys.stderr = StringIO()
00082 
00083             import rospy
00084             rospy.loginfo("test 1")
00085             self.assert_("test 1" in sys.stdout.getvalue())
00086             
00087             rospy.logwarn("test 2")            
00088             self.assert_("[WARN]" in sys.stderr.getvalue())
00089             self.assert_("test 2" in sys.stderr.getvalue())
00090 
00091             sys.stderr = StringIO()
00092             rospy.logerr("test 3")            
00093             self.assert_("[ERROR]" in sys.stderr.getvalue())
00094             self.assert_("test 3" in sys.stderr.getvalue())
00095             
00096             sys.stderr = StringIO()
00097             rospy.logfatal("test 4")            
00098             self.assert_("[FATAL]" in sys.stderr.getvalue())            
00099             self.assert_("test 4" in sys.stderr.getvalue())            
00100         finally:
00101             sys.stdout = real_stdout
00102             sys.stderr = real_stderr
00103         
00104     def test_wait_for_service(self):
00105         # lazy-import for coverage
00106         import rospy
00107         import time
00108 
00109         # test wait for service in success case        
00110         def task1():
00111             rospy.wait_for_service('add_two_ints')
00112         timeout_t = time.time() + 5.
00113         t1 = TestTask(task1)
00114         t1.start()
00115         while not t1.done and time.time() < timeout_t:
00116             time.sleep(0.5)
00117         self.assert_(t1.success)
00118         
00119         # test wait for service with timeout in success case
00120         def task2():
00121             rospy.wait_for_service('add_two_ints', timeout=1.)
00122         timeout_t = time.time() + 5.        
00123         t2 = TestTask(task2)        
00124         t2.start()
00125         while not t2.done and time.time() < timeout_t:
00126             time.sleep(0.5)
00127         self.assert_(t2.success)
00128 
00129         # test wait for service in failure case
00130         def task3():
00131             # #2842 raising bounds from .1 to .3 for amazon VM            
00132             rospy.wait_for_service('fake_service', timeout=0.3)
00133         timeout_t = time.time() + 2.        
00134         t3 = TestTask(task3)        
00135         t3.start()
00136         while not t3.done and time.time() < timeout_t:
00137             time.sleep(0.5)
00138         self.assert_(t3.done)
00139         self.failIf(t3.success)
00140     
00141     def test_ServiceProxy_wait_for_service(self):
00142         """
00143         Test ServiceProxy.wait_for_service
00144         """
00145         # lazy-import for coverage
00146         import rospy
00147         import time
00148         import test_rosmaster.srv
00149 
00150         # test wait for service in success case        
00151         proxy = rospy.ServiceProxy('add_two_ints', test_rosmaster.srv.AddTwoInts)
00152         class ProxyTask(object):
00153             def __init__(self, proxy, timeout=None):
00154                 self.proxy = proxy
00155                 self.timeout = timeout
00156             def __call__(self):
00157                 if self.timeout is None:
00158                     self.proxy.wait_for_service()
00159                 else:
00160                     self.proxy.wait_for_service(timeout=self.timeout)
00161         timeout_t = time.time() + 5.
00162         t1 = TestTask(ProxyTask(proxy))
00163         t1.start()
00164         while not t1.done and time.time() < timeout_t:
00165             time.sleep(0.5)
00166         self.assert_(t1.success)
00167         
00168         # test wait for service with timeout in success case
00169         timeout_t = time.time() + 5.        
00170         t2 = TestTask(ProxyTask(proxy, timeout=1.))
00171         t2.start()
00172         while not t2.done and time.time() < timeout_t:
00173             time.sleep(0.5)
00174         self.assert_(t2.success)
00175 
00176         # test wait for service in failure case
00177         fake_proxy = rospy.ServiceProxy('fake_service', test_rosmaster.srv.AddTwoInts)
00178         timeout_t = time.time() + 2.        
00179         t3 = TestTask(ProxyTask(fake_proxy, timeout=0.1))        
00180         t3.start()
00181         while not t3.done and time.time() < timeout_t:
00182             time.sleep(0.5)
00183         self.assert_(t3.done)
00184         self.failIf(t3.success)
00185 
00186     def test_sleep(self):
00187         import rospy
00188         import time
00189         t = time.time()
00190         rospy.sleep(0.1)
00191         dur = time.time() - t
00192         # #2842 raising bounds from .01 to .03 for amazon VM 
00193 
00194         # make sure sleep is approximately right
00195         self.assert_(abs(dur - 0.1) < 0.03, dur)
00196 
00197         t = time.time()
00198         rospy.sleep(rospy.Duration.from_sec(0.1))
00199         dur = time.time() - t
00200         # make sure sleep is approximately right
00201         self.assert_(abs(dur - 0.1) < 0.03, dur)
00202 
00203         # sleep for neg duration
00204         t = time.time()
00205         rospy.sleep(rospy.Duration.from_sec(-10.))
00206         dur = time.time() - t
00207         # make sure returned immediately
00208         self.assert_(abs(dur) < 0.1, dur)
00209 
00210     def test_Rate(self):
00211         import rospy
00212         import time
00213         t = time.time()
00214         count = 0
00215         r = rospy.Rate(10)
00216         for x in xrange(10):
00217             r.sleep()
00218         dur = time.time() - t
00219         # make sure sleep is approximately right
00220         self.assert_(abs(dur - 1.0) < 0.5, dur)
00221         
00222 
00223     def test_param_server(self):
00224         # this isn't a parameter server test, just checking that the rospy bindings work
00225         import rospy
00226 
00227         try:
00228             rospy.get_param('not_a_param')
00229             self.fail("should have raised KeyError")
00230         except KeyError: pass
00231         self.assertEquals('default_val', rospy.get_param('not_a_param', 'default_val') )
00232         
00233         p = rospy.get_param('/param')
00234         self.assertEquals("value", p)
00235         p = rospy.get_param('param')
00236         self.assertEquals("value", p)
00237         p = rospy.get_param('/group/param')
00238         self.assertEquals("group_value", p)
00239         p = rospy.get_param('group/param')
00240         self.assertEquals("group_value", p)
00241 
00242         self.assertEquals('/param', rospy.search_param('param'))
00243         
00244         names = rospy.get_param_names()
00245         self.assert_('/param' in names)
00246         self.assert_('/group/param' in names)        
00247 
00248         for p in ['/param', 'param', 'group/param', '/group/param']:
00249             self.assert_(rospy.has_param(p))
00250             
00251         rospy.set_param('param2', 'value2')
00252         self.assert_(rospy.has_param('param2'))
00253         self.assertEquals('value2', rospy.get_param('param2'))
00254         rospy.delete_param('param2')
00255         self.failIf(rospy.has_param('param2'))
00256         try:
00257             rospy.get_param('param2')
00258             self.fail("should have raised KeyError")
00259         except KeyError: pass
00260 
00261     def test_wait_for_message(self):
00262         # lazy-import for coverage
00263         import rospy
00264         import std_msgs.msg
00265         import time
00266 
00267         # test standard wait for message
00268         def task1():
00269             return rospy.wait_for_message('chatter', std_msgs.msg.String)
00270         timeout_t = time.time() + 5.
00271         t1 = TestTask(task1)
00272         t1.start()
00273         while not t1.done and time.time() < timeout_t:
00274             time.sleep(0.5)
00275         self.assert_(t1.success)
00276         self.assert_('hello' in t1.value.data)
00277 
00278         # test wait for message with timeout
00279         def task2():
00280             return rospy.wait_for_message('chatter', std_msgs.msg.String, timeout=2.)
00281         timeout_t = time.time() + 5.        
00282         t2 = TestTask(task2)        
00283         t2.start()
00284         while not t2.done and time.time() < timeout_t:
00285             time.sleep(0.5)
00286         self.assert_(t2.success)
00287         self.assert_('hello' in t2.value.data)
00288         
00289         # test wait for message with timeout FAILURE
00290         def task3():
00291             # #2842 raising bounds from .1 to .3 for amazon VM
00292             return rospy.wait_for_message('fake_topic', std_msgs.msg.String, timeout=.3)
00293         timeout_t = time.time() + 2.        
00294         t3 = TestTask(task3)        
00295         t3.start()
00296         while not t3.done and time.time() < timeout_t:
00297             time.sleep(0.5)
00298         self.failIf(t3.success)
00299         self.assert_(t3.done)
00300         self.assert_(t3.value is None)
00301     
00302 if __name__ == '__main__':
00303     rosunit.unitrun('test_rospy', sys.argv[0], TestRospyClientOnline, coverage_packages=['rospy.client', 'rospy.msproxy'])


test_rospy
Author(s): Ken Conley
autogenerated on Mon Oct 6 2014 11:47:19