$search
00001 #!/usr/bin/env python 00002 # Software License Agreement (BSD License) 00003 # 00004 # Copyright (c) 2009, Willow Garage, Inc. 00005 # All rights reserved. 00006 # 00007 # Redistribution and use in source and binary forms, with or without 00008 # modification, are permitted provided that the following conditions 00009 # are met: 00010 # 00011 # * Redistributions of source code must retain the above copyright 00012 # notice, this list of conditions and the following disclaimer. 00013 # * Redistributions in binary form must reproduce the above 00014 # copyright notice, this list of conditions and the following 00015 # disclaimer in the documentation and/or other materials provided 00016 # with the distribution. 00017 # * Neither the name of Willow Garage, Inc. nor the names of its 00018 # contributors may be used to endorse or promote products derived 00019 # from this software without specific prior written permission. 00020 # 00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 # POSSIBILITY OF SUCH DAMAGE. 00033 # 00034 # Revision $Id$ 00035 00036 PKG = 'test_rosservice' 00037 NAME = 'test_rosservice_command_line_online' 00038 import roslib; roslib.load_manifest(PKG) 00039 00040 import os 00041 import sys 00042 import time 00043 import unittest 00044 00045 import rospy 00046 import rostest 00047 00048 import test_ros.srv 00049 00050 from subprocess import Popen, PIPE, check_call, call 00051 00052 from threading import Thread 00053 class TestTask(Thread): 00054 def __init__(self, task): 00055 Thread.__init__(self) 00056 self.task = task 00057 self.success = False 00058 self.done = False 00059 self.value = None 00060 00061 def run(self): 00062 try: 00063 print "STARTING TASK" 00064 self.value = self.task() 00065 print "TASK HAS COMPLETED" 00066 self.success = True 00067 except: 00068 import traceback 00069 traceback.print_exc() 00070 self.done = True 00071 00072 class TestRosserviceOnline(unittest.TestCase): 00073 00074 def setUp(self): 00075 pass 00076 00077 def test_rosservice(self): 00078 # wait for network to initialize 00079 services = ['/add_two_ints', '/foo/add_two_ints', '/bar/add_two_ints'] 00080 for s in services: 00081 rospy.wait_for_service(s) 00082 00083 cmd = 'rosservice' 00084 names = ['add_two_ints', '/add_two_ints', 'foo/add_two_ints', '/bar/add_two_ints'] 00085 00086 # list 00087 # - hard to exact match as we are still adding builtin services to nodes (e.g. set_logger_level) 00088 output = Popen([cmd, 'list'], stdout=PIPE).communicate()[0] 00089 l = set(output.split()) 00090 for s in services: 00091 self.assert_(s in l) 00092 00093 for name in names: 00094 # args 00095 output = Popen([cmd, 'args', name], stdout=PIPE).communicate()[0] 00096 self.assertEquals('a b', output.strip()) 00097 00098 # type 00099 output = Popen([cmd, 'type', name], stdout=PIPE).communicate()[0] 00100 self.assertEquals('test_ros/AddTwoInts', output.strip()) 00101 00102 # find 00103 output = Popen([cmd, 'find', 'test_ros/AddTwoInts'], stdout=PIPE).communicate()[0] 00104 values = [v.strip() for v in output.split('\n') if v.strip()] 00105 self.assertEquals(set(values), set(services)) 00106 00107 # uri 00108 output = Popen([cmd, 'uri', name], stdout=PIPE).communicate()[0] 00109 # - no exact answer 00110 self.assert_(output.startswith('rosrpc://'), output) 00111 00112 # call 00113 output = Popen([cmd, 'call', '--wait', name, '1', '2'], stdout=PIPE).communicate()[0] 00114 self.assertEquals('sum: 3', output.strip()) 00115 00116 output = Popen([cmd, 'call', name, '1', '2'], stdout=PIPE).communicate()[0] 00117 self.assertEquals('sum: 3', output.strip()) 00118 00119 name = 'header_echo' 00120 # test with a Header so we can validate keyword args 00121 import yaml 00122 import time 00123 t = time.time() 00124 00125 # test with empty headers 00126 for v in ['{}', '{header: {}}', '{header: {seq: 0}}']: 00127 output = Popen([cmd, 'call', name, v], stdout=PIPE).communicate()[0] 00128 val = yaml.load(output.strip())['header'] 00129 self.assertEquals('', val['frame_id']) 00130 self.assert_(val['seq'] >= 0) 00131 self.assertEquals(0, val['stamp']['secs']) 00132 self.assertEquals(0, val['stamp']['nsecs']) 00133 00134 # test with auto headers 00135 for v in ['{header: auto}', '{header: {stamp: now}}']: 00136 output = Popen([cmd, 'call', name, v], stdout=PIPE).communicate()[0] 00137 val = yaml.load(output.strip())['header'] 00138 self.assertEquals('', val['frame_id']) 00139 self.assert_(val['seq'] >= 0) 00140 self.assert_(val['stamp']['secs'] >= int(t)) 00141 00142 00143 # verify that it respects ROS_NS 00144 # - the uris should be different as the names should resolve differently 00145 env = os.environ.copy() 00146 env['ROS_NAMESPACE'] = 'foo' 00147 uri1 = Popen([cmd, 'uri', 'add_two_ints'], stdout=PIPE).communicate()[0] 00148 uri2 = Popen([cmd, 'uri', 'add_two_ints'], env=env, stdout=PIPE).communicate()[0] 00149 self.assert_(uri2.startswith('rosrpc://')) 00150 self.assertNotEquals(uri1, uri2) 00151 00152 # test_call_wait 00153 def task1(): 00154 output = Popen([cmd, 'call', '--wait', 'wait_two_ints', '1', '2'], stdout=PIPE).communicate()[0] 00155 self.assertEquals('sum: 3', output.strip()) 00156 timeout_t = time.time() + 5. 00157 t1 = TestTask(task1) 00158 t1.start() 00159 00160 rospy.init_node('test_call_wait') 00161 rospy.Service("wait_two_ints", test_ros.srv.AddTwoInts, lambda x: x.a + x.b) 00162 while not t1.done and time.time() < timeout_t: 00163 time.sleep(0.5) 00164 self.assert_(t1.success) 00165 00166 00167 if __name__ == '__main__': 00168 rostest.run(PKG, NAME, TestRosserviceOnline, sys.argv)