check_rosservice.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2009, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 
00034 import os
00035 import sys 
00036 import unittest
00037 try:
00038     from cStringIO import StringIO
00039 except ImportError:
00040     from io import StringIO
00041 import time
00042         
00043 import rosunit
00044 
00045 from subprocess import Popen, PIPE, check_call, call
00046 
00047 from contextlib import contextmanager
00048 
00049 @contextmanager
00050 def fakestdout():
00051     realstdout = sys.stdout
00052     fakestdout = StringIO()
00053     sys.stdout = fakestdout
00054     yield fakestdout
00055     sys.stdout = realstdout
00056     
00057 def todict(s):
00058     d = {}
00059     for l in s.split('\n'):
00060         key, p, val = l.partition(':')
00061         if p:
00062             d[key] = val.strip()
00063     return d
00064         
00065 class TestRosservice(unittest.TestCase):
00066 
00067     def setUp(self):
00068         pass
00069 
00070     def test_get_service_headers(self):
00071         import rosservice
00072         orig_uri = os.environ['ROS_MASTER_URI']
00073         os.environ['ROS_MASTER_URI'] = 'http://fake_host:12356'
00074         try:
00075             c = 'rosservice'
00076 
00077             # test error conditions, integration tests cover success cases
00078             try:
00079                 rosservice.get_service_headers('/add_two_ints', 'fake://localhost:1234')
00080                 self.fail("should have raised")
00081             except rosservice.ROSServiceException: pass
00082             try:
00083                 rosservice.get_service_headers('/add_two_ints', 'rosrpc://fake_host:1234')
00084                 self.fail("should have raised IO exc")
00085             except rosservice.ROSServiceIOException: pass
00086             
00087         
00088         finally:
00089             os.environ['ROS_MASTER_URI'] = orig_uri
00090         
00091     def test_get_service_type(self):
00092         import rosservice
00093         self.assertEquals('test_rosmaster/AddTwoInts', rosservice.get_service_type('/add_two_ints'))
00094         self.assertEquals(None, rosservice.get_service_type('/fake_add_two_ints'))
00095 
00096     def test_offline(self):
00097         import rosservice
00098         orig_uri = os.environ['ROS_MASTER_URI']
00099         os.environ['ROS_MASTER_URI'] = 'http://fake_host:12356'
00100 
00101         try:
00102             c = 'rosservice'
00103 
00104             try:
00105                 rosservice.get_service_type('/add_two_ints')
00106                 self.fail("should have raised ROSServiceIOException")
00107             except rosservice.ROSServiceIOException:
00108                 pass
00109             
00110             try:
00111                 rosservice._rosservice_cmd_list([c, 'list'])
00112                 self.fail("should have raised ROSServiceIOException")
00113             except rosservice.ROSServiceIOException: pass
00114             
00115             try:
00116                 rosservice._rosservice_cmd_info([c, 'info', '/add_two_ints'])
00117                 self.fail("should have raised ROSServiceIOException")
00118             except rosservice.ROSServiceIOException: pass
00119 
00120             try:
00121                 rosservice._rosservice_cmd_type([c, 'type', '/add_two_ints'])
00122                 self.fail("should have raised ROSServiceIOException")
00123             except rosservice.ROSServiceIOException: pass
00124 
00125             try:
00126                 rosservice._rosservice_cmd_uri([c, 'uri', '/add_two_ints'])
00127                 self.fail("should have raised ROSServiceIOException")
00128             except rosservice.ROSServiceIOException: pass
00129 
00130             try:
00131                 rosservice._rosservice_cmd_find([c, 'find', 'test_ros/AddTwoInts'])
00132                 self.fail("should have raised ROSServiceIOException")
00133             except rosservice.ROSServiceIOException: pass
00134             
00135             try:
00136                 rosservice._rosservice_cmd_call([c, 'call', '/add_two_ints', '1', '2'])
00137                 self.fail("should have raised ROSServiceIOException")
00138             except rosservice.ROSServiceIOException: pass
00139 
00140         finally:
00141             os.environ['ROS_MASTER_URI'] = orig_uri
00142         
00143     def test_cmd_type(self):
00144         import rosservice
00145         cmd = 'rosservice'
00146         s = '/add_two_ints'
00147         try:
00148             rosservice.rosservicemain([cmd, 'type', '/fake_service'])
00149             self.fail("should have triggered error exit")
00150         except SystemExit:
00151             pass
00152 
00153         for s in ['/add_two_ints', 'add_two_ints', 'foo/add_two_ints']:
00154             with fakestdout() as b:
00155                 rosservice.rosservicemain([cmd, 'type', s])
00156                 v = b.getvalue().strip()
00157                 self.assertEquals('test_rosmaster/AddTwoInts', v)
00158 
00159     def test_cmd_uri(self):
00160         import rosservice
00161         cmd = 'rosservice'
00162         with fakestdout() as b:
00163             try:
00164                 rosservice.rosservicemain([cmd, 'uri', '/fake_service'])
00165                 self.fail("should have triggered error exit")
00166             except SystemExit:
00167                 pass
00168 
00169         for s in ['/add_two_ints', 'add_two_ints', 'foo/add_two_ints']:
00170             with fakestdout() as b:
00171                 rosservice.rosservicemain([cmd, 'uri', s])
00172                 v = b.getvalue().strip()
00173                 self.assert_(v.startswith('rosrpc://'), v)
00174                 
00175 
00176     def test_cmd_node(self):
00177         import rosservice
00178         cmd = 'rosservice'
00179         for s in ['/add_two_ints', 'add_two_ints', 'foo/add_two_ints']:
00180             with fakestdout() as b:
00181                 rosservice.rosservicemain([cmd, 'node', s])
00182                 v = b.getvalue().strip()
00183                 if 'foo' in s:
00184                     self.assertEquals('/foo/a2iserver', v)
00185                 else:
00186                     self.assertEquals('/a2iserver', v)
00187         try:
00188             rosservice.rosservicemain([cmd, 'node', '/fake_two_ints'])
00189             self.fail("should have exited with error")
00190         except SystemExit: pass
00191 
00192     def test_full_usage(self):
00193         import rosservice
00194         try:
00195             rosservice._fullusage()
00196             self.fail("should have caused system exit")
00197         except SystemExit: pass
00198         
00199     def test_cmd_info(self):
00200         import rosservice
00201         cmd = 'rosservice'
00202 
00203         try:
00204             rosservice.rosservicemain([cmd, 'info'])
00205             self.fail("should have exited with error")
00206         except SystemExit: pass
00207         try:
00208             rosservice.rosservicemain([cmd, 'info', '/fake_service'])
00209             self.fail("should have exited with error")
00210         except SystemExit: pass
00211         try:
00212             rosservice.rosservicemain([cmd, 'info', '/add_two_ints', '/foo/add_two_ints'])
00213             self.fail("should have exited with error")
00214         except SystemExit: pass
00215         
00216         for s in ['/add_two_ints', 'add_two_ints', 'foo/add_two_ints']:
00217             with fakestdout() as b:
00218                 rosservice.rosservicemain([cmd, 'info', s])
00219                 d = todict(b.getvalue())
00220                 if 'foo' in s:
00221                     self.assertEquals('/foo/a2iserver', d['Node'])
00222                 else:
00223                     self.assertEquals('/a2iserver', d['Node'], repr(d['Node']))
00224                 self.assertEquals('test_rosmaster/AddTwoInts', d['Type'])
00225                 self.assertEquals('a b', d['Args'])
00226                 self.assert_('URI' in d)
00227 
00228     def test_cmd_find(self):
00229         import rosservice
00230         cmd = 'rosservice'
00231 
00232         try:
00233             rosservice.rosservicemain([cmd, 'find'])
00234             self.fail("arg parsing should have failed")
00235         except SystemExit: pass
00236         try:
00237             rosservice.rosservicemain([cmd, 'find', 'test_ros/AddTwoInts', 'test/AddThreeInts'])
00238             self.fail("arg parsing should have failed")
00239         except SystemExit: pass
00240         
00241         v = set(['/add_two_ints', '/bar/add_two_ints', '/foo/add_two_ints'])
00242         with fakestdout() as b:
00243             rosservice.rosservicemain([cmd, 'find', 'test_rosmaster/AddTwoInts'])
00244             d = set([x for x in b.getvalue().split('\n') if x.strip()])
00245             self.assertEquals(v, d)
00246 
00247         with fakestdout() as b:
00248             rosservice.rosservicemain([cmd, 'find', 'fake/AddTwoInts'])
00249             self.assertEquals('', b.getvalue().strip())
00250 
00251     def test_get_service_class_by_name(self):
00252         import rosservice
00253         try:
00254             rosservice.get_service_class_by_name('fake')
00255             self.fail("should have raised")
00256         except rosservice.ROSServiceException as e:
00257             self.assertEquals("Service [fake] is not available.", str(e))
00258         
00259     def test_cmd_call(self):
00260         import rosservice
00261         cmd = 'rosservice'
00262 
00263         try:
00264             rosservice.rosservicemain([cmd, 'call'])
00265             self.fail("arg parsing should have failed")
00266         except SystemExit: pass
00267         try:
00268             rosservice.rosservicemain([cmd, 'call', 'add_two_ints', '1', '2', '3'])
00269             self.fail("should have failed with too many args")
00270         except SystemExit: pass
00271 
00272             
00273     def setUp(self):
00274         # wait for all services to come up
00275 
00276         import rosservice        
00277         services = ['/add_two_ints',
00278                     '/foo/add_two_ints',
00279                     '/bar/add_two_ints',
00280                     ]
00281 
00282         import time
00283         timeout_t = time.time() + 10.
00284         while time.time() < timeout_t:
00285             with fakestdout() as b:
00286                 rosservice._rosservice_cmd_list(['rosservice', 'list'])
00287                 v = [x.strip() for x in b.getvalue().split('\n') if x.strip()]
00288                 if not (set(services) - set(v) ):
00289                     return
00290         self.fail("timeout")
00291         
00292     def test_cmd_list(self):
00293         import rosservice
00294         cmd = 'rosservice'
00295         s = '/add_two_ints'
00296         
00297         # test main entry
00298         rosservice.rosservicemain([cmd, 'list'])
00299 
00300         # test directly
00301         services = ['/add_two_ints',
00302                     '/foo/add_two_ints',
00303                     '/bar/add_two_ints',
00304                     '/header_echo',
00305                     ]
00306         services_nodes = ['/add_two_ints /a2iserver',
00307                           '/foo/add_two_ints /foo/a2iserver',
00308                           '/bar/add_two_ints /bar/a2iserver',
00309                           '/header_echo /headerserver',
00310                           ]
00311 
00312         with fakestdout() as b:
00313             rosservice._rosservice_cmd_list([cmd, 'list'])
00314             v = [x.strip() for x in b.getvalue().split('\n') if x.strip()]
00315             v = [x for x in v if not x.startswith('/rosout/')]
00316             v = [x for x in v if not x.endswith('/get_loggers') and not x.endswith('/set_logger_level')]
00317             self.assertEquals(set(services), set(v))
00318         with fakestdout() as b:
00319             rosservice._rosservice_cmd_list([cmd, 'list', '-n'])
00320             v = [x.strip() for x in b.getvalue().split('\n') if x.strip()]
00321             v = [x for x in v if not x.startswith('/rosout/')]
00322             v = [x for x in v if x.find('/get_loggers ') == -1 and x.find('/set_logger_level ') == -1]
00323             self.assertEquals(set(services_nodes), set(v))
00324         with fakestdout() as b:            
00325             rosservice._rosservice_cmd_list([cmd, 'list', '--nodes'])
00326             v = [x.strip() for x in b.getvalue().split('\n') if x.strip()]
00327             v = [x for x in v if not x.startswith('/rosout/')]
00328             v = [x for x in v if x.find('/get_loggers ') == -1 and x.find('/set_logger_level ') == -1]
00329             self.assertEquals(set(services_nodes), set(v))
00330 
00331         # test with multiple service names
00332         try:
00333             rosservice._rosservice_cmd_list([cmd, 'list', s, s])
00334             self.fail("should have caused parser error")
00335         except SystemExit:
00336             pass
00337 
00338         # test with resolved service names
00339         for s in services:
00340             with fakestdout() as b:
00341                 rosservice._rosservice_cmd_list([cmd, 'list', s])
00342                 self.assertEquals(s, b.getvalue().strip())
00343 
00344         # test with relative service names
00345         s = 'add_two_ints'       
00346         with fakestdout() as b:
00347             rosservice._rosservice_cmd_list([cmd, 'list', s])
00348             self.assertEquals('/add_two_ints', b.getvalue().strip())
00349         with fakestdout() as b:
00350             rosservice._rosservice_cmd_list([cmd, 'list', s, '-n'])
00351             self.assertEquals('/add_two_ints /a2iserver', b.getvalue().strip())
00352         with fakestdout() as b:            
00353             rosservice._rosservice_cmd_list([cmd, 'list', s, '--nodes'])
00354             self.assertEquals('/add_two_ints /a2iserver', b.getvalue().strip())
00355             
00356         # test with namespaces
00357         s = '/foo'
00358         rosservice._rosservice_cmd_list([cmd, 'list', s])
00359         rosservice._rosservice_cmd_list([cmd, 'list', s, '-n'])
00360         rosservice._rosservice_cmd_list([cmd, 'list', s, '--nodes'])
00361         s = 'foo'
00362         rosservice._rosservice_cmd_list([cmd, 'list', s])
00363         rosservice._rosservice_cmd_list([cmd, 'list', s, '-n'])
00364         rosservice._rosservice_cmd_list([cmd, 'list', s, '--nodes'])
00365         
00366 
00367 NAME = 'test_rosservice'
00368 if __name__ == '__main__':
00369     rosunit.unitrun('test_rosservice', NAME, TestRosservice, sys.argv, coverage_packages=['rosservice'])


test_rosservice
Author(s): Ken Conley
autogenerated on Thu Jun 6 2019 21:10:58