$search
00001 #!/usr/bin/env python 00002 # Software License Agreement (BSD License) 00003 # 00004 # Copyright (c) 2009, Willow Garage, Inc. 00005 # All rights reserved. 00006 # 00007 # Redistribution and use in source and binary forms, with or without 00008 # modification, are permitted provided that the following conditions 00009 # are met: 00010 # 00011 # * Redistributions of source code must retain the above copyright 00012 # notice, this list of conditions and the following disclaimer. 00013 # * Redistributions in binary form must reproduce the above 00014 # copyright notice, this list of conditions and the following 00015 # disclaimer in the documentation and/or other materials provided 00016 # with the distribution. 00017 # * Neither the name of Willow Garage, Inc. nor the names of its 00018 # contributors may be used to endorse or promote products derived 00019 # from this software without specific prior written permission. 00020 # 00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 # POSSIBILITY OF SUCH DAMAGE. 00033 # 00034 # Revision $Id: test_rosservice_command_line_offline.py 6214 2009-09-18 23:38:22Z kwc $ 00035 00036 from __future__ import with_statement 00037 00038 NAME = 'test_rosservice' 00039 import roslib; roslib.load_manifest('test_rosservice') 00040 00041 import os 00042 import sys 00043 import unittest 00044 import cStringIO 00045 import time 00046 00047 import rostest 00048 00049 from subprocess import Popen, PIPE, check_call, call 00050 00051 from contextlib import contextmanager 00052 00053 @contextmanager 00054 def fakestdout(): 00055 realstdout = sys.stdout 00056 fakestdout = cStringIO.StringIO() 00057 sys.stdout = fakestdout 00058 yield fakestdout 00059 sys.stdout = realstdout 00060 00061 def todict(s): 00062 d = {} 00063 for l in s.split('\n'): 00064 key, p, val = l.partition(':') 00065 if p: 00066 d[key] = val.strip() 00067 return d 00068 00069 class TestRosservice(unittest.TestCase): 00070 00071 def setUp(self): 00072 pass 00073 00074 def test_get_service_headers(self): 00075 from ros import rosservice 00076 orig_uri = os.environ['ROS_MASTER_URI'] 00077 os.environ['ROS_MASTER_URI'] = 'http://fake_host:12356' 00078 try: 00079 c = 'rosservice' 00080 00081 # test error conditions, integration tests cover success cases 00082 try: 00083 rosservice.get_service_headers('/add_two_ints', 'fake://localhost:1234') 00084 self.fail("should have raised") 00085 except rosservice.ROSServiceException: pass 00086 try: 00087 rosservice.get_service_headers('/add_two_ints', 'rosrpc://fake_host:1234') 00088 self.fail("should have raised IO exc") 00089 except rosservice.ROSServiceIOException: pass 00090 00091 00092 finally: 00093 os.environ['ROS_MASTER_URI'] = orig_uri 00094 00095 def test_get_service_type(self): 00096 from ros import rosservice 00097 self.assertEquals('test_ros/AddTwoInts', rosservice.get_service_type('/add_two_ints')) 00098 self.assertEquals(None, rosservice.get_service_type('/fake_add_two_ints')) 00099 00100 def test_offline(self): 00101 from ros import rosservice 00102 orig_uri = os.environ['ROS_MASTER_URI'] 00103 os.environ['ROS_MASTER_URI'] = 'http://fake_host:12356' 00104 00105 try: 00106 c = 'rosservice' 00107 00108 try: 00109 rosservice.get_service_type('/add_two_ints') 00110 self.fail("should have raised ROSServiceIOException") 00111 except rosservice.ROSServiceIOException: 00112 pass 00113 00114 try: 00115 rosservice._rosservice_cmd_list([c, 'list']) 00116 self.fail("should have raised ROSServiceIOException") 00117 except rosservice.ROSServiceIOException: pass 00118 00119 try: 00120 rosservice._rosservice_cmd_info([c, 'info', '/add_two_ints']) 00121 self.fail("should have raised ROSServiceIOException") 00122 except rosservice.ROSServiceIOException: pass 00123 00124 try: 00125 rosservice._rosservice_cmd_type([c, 'type', '/add_two_ints']) 00126 self.fail("should have raised ROSServiceIOException") 00127 except rosservice.ROSServiceIOException: pass 00128 00129 try: 00130 rosservice._rosservice_cmd_uri([c, 'uri', '/add_two_ints']) 00131 self.fail("should have raised ROSServiceIOException") 00132 except rosservice.ROSServiceIOException: pass 00133 00134 try: 00135 rosservice._rosservice_cmd_find([c, 'find', 'test_ros/AddTwoInts']) 00136 self.fail("should have raised ROSServiceIOException") 00137 except rosservice.ROSServiceIOException: pass 00138 00139 try: 00140 rosservice._rosservice_cmd_call([c, 'call', '/add_two_ints', '1', '2']) 00141 self.fail("should have raised ROSServiceIOException") 00142 except rosservice.ROSServiceIOException: pass 00143 00144 finally: 00145 os.environ['ROS_MASTER_URI'] = orig_uri 00146 00147 def test_cmd_type(self): 00148 from ros import rosservice 00149 cmd = 'rosservice' 00150 s = '/add_two_ints' 00151 try: 00152 rosservice.rosservicemain([cmd, 'type', '/fake_service']) 00153 self.fail("should have triggered error exit") 00154 except SystemExit: 00155 pass 00156 00157 for s in ['/add_two_ints', 'add_two_ints', 'foo/add_two_ints']: 00158 with fakestdout() as b: 00159 rosservice.rosservicemain([cmd, 'type', s]) 00160 v = b.getvalue().strip() 00161 self.assertEquals('test_ros/AddTwoInts', v) 00162 00163 def test_cmd_uri(self): 00164 from ros import rosservice 00165 cmd = 'rosservice' 00166 with fakestdout() as b: 00167 try: 00168 rosservice.rosservicemain([cmd, 'uri', '/fake_service']) 00169 self.fail("should have triggered error exit") 00170 except SystemExit: 00171 pass 00172 00173 for s in ['/add_two_ints', 'add_two_ints', 'foo/add_two_ints']: 00174 with fakestdout() as b: 00175 rosservice.rosservicemain([cmd, 'uri', s]) 00176 v = b.getvalue().strip() 00177 self.assert_(v.startswith('rosrpc://'), v) 00178 00179 00180 def test_cmd_node(self): 00181 from ros import rosservice 00182 cmd = 'rosservice' 00183 for s in ['/add_two_ints', 'add_two_ints', 'foo/add_two_ints']: 00184 with fakestdout() as b: 00185 rosservice.rosservicemain([cmd, 'node', s]) 00186 v = b.getvalue().strip() 00187 if 'foo' in s: 00188 self.assertEquals('/foo/a2iserver', v) 00189 else: 00190 self.assertEquals('/a2iserver', v) 00191 try: 00192 rosservice.rosservicemain([cmd, 'node', '/fake_two_ints']) 00193 self.fail("should have exited with error") 00194 except SystemExit: pass 00195 00196 def test_full_usage(self): 00197 from ros import rosservice 00198 try: 00199 rosservice._fullusage() 00200 self.fail("should have caused system exit") 00201 except SystemExit: pass 00202 00203 def test_cmd_info(self): 00204 from ros import rosservice 00205 cmd = 'rosservice' 00206 00207 try: 00208 rosservice.rosservicemain([cmd, 'info']) 00209 self.fail("should have exited with error") 00210 except SystemExit: pass 00211 try: 00212 rosservice.rosservicemain([cmd, 'info', '/fake_service']) 00213 self.fail("should have exited with error") 00214 except SystemExit: pass 00215 try: 00216 rosservice.rosservicemain([cmd, 'info', '/add_two_ints', '/foo/add_two_ints']) 00217 self.fail("should have exited with error") 00218 except SystemExit: pass 00219 00220 for s in ['/add_two_ints', 'add_two_ints', 'foo/add_two_ints']: 00221 with fakestdout() as b: 00222 rosservice.rosservicemain([cmd, 'info', s]) 00223 d = todict(b.getvalue()) 00224 if 'foo' in s: 00225 self.assertEquals('/foo/a2iserver', d['Node']) 00226 else: 00227 self.assertEquals('/a2iserver', d['Node'], repr(d['Node'])) 00228 self.assertEquals('test_ros/AddTwoInts', d['Type']) 00229 self.assertEquals('a b', d['Args']) 00230 self.assert_('URI' in d) 00231 00232 def test_cmd_find(self): 00233 from ros import rosservice 00234 cmd = 'rosservice' 00235 00236 try: 00237 rosservice.rosservicemain([cmd, 'find']) 00238 self.fail("arg parsing should have failed") 00239 except SystemExit: pass 00240 try: 00241 rosservice.rosservicemain([cmd, 'find', 'test_ros/AddTwoInts', 'test/AddThreeInts']) 00242 self.fail("arg parsing should have failed") 00243 except SystemExit: pass 00244 00245 v = set(['/add_two_ints', '/bar/add_two_ints', '/foo/add_two_ints']) 00246 with fakestdout() as b: 00247 rosservice.rosservicemain([cmd, 'find', 'test_ros/AddTwoInts']) 00248 d = set([x for x in b.getvalue().split('\n') if x.strip()]) 00249 self.assertEquals(v, d) 00250 00251 with fakestdout() as b: 00252 rosservice.rosservicemain([cmd, 'find', 'fake/AddTwoInts']) 00253 self.assertEquals('', b.getvalue().strip()) 00254 00255 def test_get_service_class_by_name(self): 00256 from ros import rosservice 00257 try: 00258 rosservice.get_service_class_by_name('fake') 00259 self.fail("should have raised") 00260 except rosservice.ROSServiceException, e: 00261 self.assertEquals("Service [fake] is not available.", str(e)) 00262 00263 def test_cmd_call(self): 00264 from ros import rosservice 00265 cmd = 'rosservice' 00266 00267 try: 00268 rosservice.rosservicemain([cmd, 'call']) 00269 self.fail("arg parsing should have failed") 00270 except SystemExit: pass 00271 try: 00272 rosservice.rosservicemain([cmd, 'call', 'add_two_ints', '1', '2', '3']) 00273 self.fail("should have failed with too many args") 00274 except SystemExit: pass 00275 00276 00277 def setUp(self): 00278 # wait for all services to come up 00279 00280 from ros import rosservice 00281 services = ['/add_two_ints', 00282 '/foo/add_two_ints', 00283 '/bar/add_two_ints', 00284 ] 00285 00286 import time 00287 timeout_t = time.time() + 10. 00288 while time.time() < timeout_t: 00289 with fakestdout() as b: 00290 rosservice._rosservice_cmd_list(['rosservice', 'list']) 00291 v = [x.strip() for x in b.getvalue().split('\n') if x.strip()] 00292 if not (set(services) - set(v) ): 00293 return 00294 self.fail("timeout") 00295 00296 def test_cmd_list(self): 00297 from ros import rosservice 00298 cmd = 'rosservice' 00299 s = '/add_two_ints' 00300 00301 # test main entry 00302 rosservice.rosservicemain([cmd, 'list']) 00303 00304 # test directly 00305 services = ['/add_two_ints', 00306 '/foo/add_two_ints', 00307 '/bar/add_two_ints', 00308 '/header_echo', 00309 ] 00310 services_nodes = ['/add_two_ints /a2iserver', 00311 '/foo/add_two_ints /foo/a2iserver', 00312 '/bar/add_two_ints /bar/a2iserver', 00313 '/header_echo /headerserver', 00314 ] 00315 00316 with fakestdout() as b: 00317 rosservice._rosservice_cmd_list([cmd, 'list']) 00318 v = [x.strip() for x in b.getvalue().split('\n') if x.strip()] 00319 v = [x for x in v if not x.startswith('/rosout/')] 00320 self.assertEquals(set(services), set(v)) 00321 with fakestdout() as b: 00322 rosservice._rosservice_cmd_list([cmd, 'list', '-n']) 00323 v = [x.strip() for x in b.getvalue().split('\n') if x.strip()] 00324 v = [x for x in v if not x.startswith('/rosout/')] 00325 self.assertEquals(set(services_nodes), set(v)) 00326 with fakestdout() as b: 00327 rosservice._rosservice_cmd_list([cmd, 'list', '--nodes']) 00328 v = [x.strip() for x in b.getvalue().split('\n') if x.strip()] 00329 v = [x for x in v if not x.startswith('/rosout/')] 00330 self.assertEquals(set(services_nodes), set(v)) 00331 00332 # test with multiple service names 00333 try: 00334 rosservice._rosservice_cmd_list([cmd, 'list', s, s]) 00335 self.fail("should have caused parser error") 00336 except SystemExit: 00337 pass 00338 00339 # test with resolved service names 00340 for s in services: 00341 with fakestdout() as b: 00342 rosservice._rosservice_cmd_list([cmd, 'list', s]) 00343 self.assertEquals(s, b.getvalue().strip()) 00344 00345 # test with relative service names 00346 s = 'add_two_ints' 00347 with fakestdout() as b: 00348 rosservice._rosservice_cmd_list([cmd, 'list', s]) 00349 self.assertEquals('/add_two_ints', b.getvalue().strip()) 00350 with fakestdout() as b: 00351 rosservice._rosservice_cmd_list([cmd, 'list', s, '-n']) 00352 self.assertEquals('/add_two_ints /a2iserver', b.getvalue().strip()) 00353 with fakestdout() as b: 00354 rosservice._rosservice_cmd_list([cmd, 'list', s, '--nodes']) 00355 self.assertEquals('/add_two_ints /a2iserver', b.getvalue().strip()) 00356 00357 # test with namespaces 00358 s = '/foo' 00359 rosservice._rosservice_cmd_list([cmd, 'list', s]) 00360 rosservice._rosservice_cmd_list([cmd, 'list', s, '-n']) 00361 rosservice._rosservice_cmd_list([cmd, 'list', s, '--nodes']) 00362 s = 'foo' 00363 rosservice._rosservice_cmd_list([cmd, 'list', s]) 00364 rosservice._rosservice_cmd_list([cmd, 'list', s, '-n']) 00365 rosservice._rosservice_cmd_list([cmd, 'list', s, '--nodes']) 00366 00367 00368 if __name__ == '__main__': 00369 rostest.unitrun('test_rosservice', NAME, TestRosservice, sys.argv, coverage_packages=['rosservice'])