test_rosservice.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2009, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 
00034 import os
00035 import sys 
00036 import unittest
00037 import cStringIO
00038 import time
00039         
00040 import rosunit
00041 
00042 from subprocess import Popen, PIPE, check_call, call
00043 
00044 from contextlib import contextmanager
00045 
00046 @contextmanager
00047 def fakestdout():
00048     realstdout = sys.stdout
00049     fakestdout = cStringIO.StringIO()
00050     sys.stdout = fakestdout
00051     yield fakestdout
00052     sys.stdout = realstdout
00053     
00054 def todict(s):
00055     d = {}
00056     for l in s.split('\n'):
00057         key, p, val = l.partition(':')
00058         if p:
00059             d[key] = val.strip()
00060     return d
00061         
00062 class TestRosservice(unittest.TestCase):
00063 
00064     def setUp(self):
00065         pass
00066 
00067     def test_get_service_headers(self):
00068         import rosservice
00069         orig_uri = os.environ['ROS_MASTER_URI']
00070         os.environ['ROS_MASTER_URI'] = 'http://fake_host:12356'
00071         try:
00072             c = 'rosservice'
00073 
00074             # test error conditions, integration tests cover success cases
00075             try:
00076                 rosservice.get_service_headers('/add_two_ints', 'fake://localhost:1234')
00077                 self.fail("should have raised")
00078             except rosservice.ROSServiceException: pass
00079             try:
00080                 rosservice.get_service_headers('/add_two_ints', 'rosrpc://fake_host:1234')
00081                 self.fail("should have raised IO exc")
00082             except rosservice.ROSServiceIOException: pass
00083             
00084         
00085         finally:
00086             os.environ['ROS_MASTER_URI'] = orig_uri
00087         
00088     def test_get_service_type(self):
00089         import rosservice
00090         self.assertEquals('test_rosmaster/AddTwoInts', rosservice.get_service_type('/add_two_ints'))
00091         self.assertEquals(None, rosservice.get_service_type('/fake_add_two_ints'))
00092 
00093     def test_offline(self):
00094         import rosservice
00095         orig_uri = os.environ['ROS_MASTER_URI']
00096         os.environ['ROS_MASTER_URI'] = 'http://fake_host:12356'
00097 
00098         try:
00099             c = 'rosservice'
00100 
00101             try:
00102                 rosservice.get_service_type('/add_two_ints')
00103                 self.fail("should have raised ROSServiceIOException")
00104             except rosservice.ROSServiceIOException:
00105                 pass
00106             
00107             try:
00108                 rosservice._rosservice_cmd_list([c, 'list'])
00109                 self.fail("should have raised ROSServiceIOException")
00110             except rosservice.ROSServiceIOException: pass
00111             
00112             try:
00113                 rosservice._rosservice_cmd_info([c, 'info', '/add_two_ints'])
00114                 self.fail("should have raised ROSServiceIOException")
00115             except rosservice.ROSServiceIOException: pass
00116 
00117             try:
00118                 rosservice._rosservice_cmd_type([c, 'type', '/add_two_ints'])
00119                 self.fail("should have raised ROSServiceIOException")
00120             except rosservice.ROSServiceIOException: pass
00121 
00122             try:
00123                 rosservice._rosservice_cmd_uri([c, 'uri', '/add_two_ints'])
00124                 self.fail("should have raised ROSServiceIOException")
00125             except rosservice.ROSServiceIOException: pass
00126 
00127             try:
00128                 rosservice._rosservice_cmd_find([c, 'find', 'test_ros/AddTwoInts'])
00129                 self.fail("should have raised ROSServiceIOException")
00130             except rosservice.ROSServiceIOException: pass
00131             
00132             try:
00133                 rosservice._rosservice_cmd_call([c, 'call', '/add_two_ints', '1', '2'])
00134                 self.fail("should have raised ROSServiceIOException")
00135             except rosservice.ROSServiceIOException: pass
00136 
00137         finally:
00138             os.environ['ROS_MASTER_URI'] = orig_uri
00139         
00140     def test_cmd_type(self):
00141         import rosservice
00142         cmd = 'rosservice'
00143         s = '/add_two_ints'
00144         try:
00145             rosservice.rosservicemain([cmd, 'type', '/fake_service'])
00146             self.fail("should have triggered error exit")
00147         except SystemExit:
00148             pass
00149 
00150         for s in ['/add_two_ints', 'add_two_ints', 'foo/add_two_ints']:
00151             with fakestdout() as b:
00152                 rosservice.rosservicemain([cmd, 'type', s])
00153                 v = b.getvalue().strip()
00154                 self.assertEquals('test_rosmaster/AddTwoInts', v)
00155 
00156     def test_cmd_uri(self):
00157         import rosservice
00158         cmd = 'rosservice'
00159         with fakestdout() as b:
00160             try:
00161                 rosservice.rosservicemain([cmd, 'uri', '/fake_service'])
00162                 self.fail("should have triggered error exit")
00163             except SystemExit:
00164                 pass
00165 
00166         for s in ['/add_two_ints', 'add_two_ints', 'foo/add_two_ints']:
00167             with fakestdout() as b:
00168                 rosservice.rosservicemain([cmd, 'uri', s])
00169                 v = b.getvalue().strip()
00170                 self.assert_(v.startswith('rosrpc://'), v)
00171                 
00172 
00173     def test_cmd_node(self):
00174         import rosservice
00175         cmd = 'rosservice'
00176         for s in ['/add_two_ints', 'add_two_ints', 'foo/add_two_ints']:
00177             with fakestdout() as b:
00178                 rosservice.rosservicemain([cmd, 'node', s])
00179                 v = b.getvalue().strip()
00180                 if 'foo' in s:
00181                     self.assertEquals('/foo/a2iserver', v)
00182                 else:
00183                     self.assertEquals('/a2iserver', v)
00184         try:
00185             rosservice.rosservicemain([cmd, 'node', '/fake_two_ints'])
00186             self.fail("should have exited with error")
00187         except SystemExit: pass
00188 
00189     def test_full_usage(self):
00190         import rosservice
00191         try:
00192             rosservice._fullusage()
00193             self.fail("should have caused system exit")
00194         except SystemExit: pass
00195         
00196     def test_cmd_info(self):
00197         import rosservice
00198         cmd = 'rosservice'
00199 
00200         try:
00201             rosservice.rosservicemain([cmd, 'info'])
00202             self.fail("should have exited with error")
00203         except SystemExit: pass
00204         try:
00205             rosservice.rosservicemain([cmd, 'info', '/fake_service'])
00206             self.fail("should have exited with error")
00207         except SystemExit: pass
00208         try:
00209             rosservice.rosservicemain([cmd, 'info', '/add_two_ints', '/foo/add_two_ints'])
00210             self.fail("should have exited with error")
00211         except SystemExit: pass
00212         
00213         for s in ['/add_two_ints', 'add_two_ints', 'foo/add_two_ints']:
00214             with fakestdout() as b:
00215                 rosservice.rosservicemain([cmd, 'info', s])
00216                 d = todict(b.getvalue())
00217                 if 'foo' in s:
00218                     self.assertEquals('/foo/a2iserver', d['Node'])
00219                 else:
00220                     self.assertEquals('/a2iserver', d['Node'], repr(d['Node']))
00221                 self.assertEquals('test_rosmaster/AddTwoInts', d['Type'])
00222                 self.assertEquals('a b', d['Args'])
00223                 self.assert_('URI' in d)
00224 
00225     def test_cmd_find(self):
00226         import rosservice
00227         cmd = 'rosservice'
00228 
00229         try:
00230             rosservice.rosservicemain([cmd, 'find'])
00231             self.fail("arg parsing should have failed")
00232         except SystemExit: pass
00233         try:
00234             rosservice.rosservicemain([cmd, 'find', 'test_ros/AddTwoInts', 'test/AddThreeInts'])
00235             self.fail("arg parsing should have failed")
00236         except SystemExit: pass
00237         
00238         v = set(['/add_two_ints', '/bar/add_two_ints', '/foo/add_two_ints'])
00239         with fakestdout() as b:
00240             rosservice.rosservicemain([cmd, 'find', 'test_rosmaster/AddTwoInts'])
00241             d = set([x for x in b.getvalue().split('\n') if x.strip()])
00242             self.assertEquals(v, d)
00243 
00244         with fakestdout() as b:
00245             rosservice.rosservicemain([cmd, 'find', 'fake/AddTwoInts'])
00246             self.assertEquals('', b.getvalue().strip())
00247 
00248     def test_get_service_class_by_name(self):
00249         import rosservice
00250         try:
00251             rosservice.get_service_class_by_name('fake')
00252             self.fail("should have raised")
00253         except rosservice.ROSServiceException, e:
00254             self.assertEquals("Service [fake] is not available.", str(e))
00255         
00256     def test_cmd_call(self):
00257         import rosservice
00258         cmd = 'rosservice'
00259 
00260         try:
00261             rosservice.rosservicemain([cmd, 'call'])
00262             self.fail("arg parsing should have failed")
00263         except SystemExit: pass
00264         try:
00265             rosservice.rosservicemain([cmd, 'call', 'add_two_ints', '1', '2', '3'])
00266             self.fail("should have failed with too many args")
00267         except SystemExit: pass
00268 
00269             
00270     def setUp(self):
00271         # wait for all services to come up
00272 
00273         import rosservice        
00274         services = ['/add_two_ints',
00275                     '/foo/add_two_ints',
00276                     '/bar/add_two_ints',
00277                     ]
00278 
00279         import time
00280         timeout_t = time.time() + 10.
00281         while time.time() < timeout_t:
00282             with fakestdout() as b:
00283                 rosservice._rosservice_cmd_list(['rosservice', 'list'])
00284                 v = [x.strip() for x in b.getvalue().split('\n') if x.strip()]
00285                 if not (set(services) - set(v) ):
00286                     return
00287         self.fail("timeout")
00288         
00289     def test_cmd_list(self):
00290         import rosservice
00291         cmd = 'rosservice'
00292         s = '/add_two_ints'
00293         
00294         # test main entry
00295         rosservice.rosservicemain([cmd, 'list'])
00296 
00297         # test directly
00298         services = ['/add_two_ints',
00299                     '/foo/add_two_ints',
00300                     '/bar/add_two_ints',
00301                     '/header_echo',
00302                     ]
00303         services_nodes = ['/add_two_ints /a2iserver',
00304                           '/foo/add_two_ints /foo/a2iserver',
00305                           '/bar/add_two_ints /bar/a2iserver',
00306                           '/header_echo /headerserver',
00307                           ]
00308 
00309         with fakestdout() as b:
00310             rosservice._rosservice_cmd_list([cmd, 'list'])
00311             v = [x.strip() for x in b.getvalue().split('\n') if x.strip()]
00312             v = [x for x in v if not x.startswith('/rosout/')]
00313             v = [x for x in v if not x.endswith('/get_loggers') and not x.endswith('/set_logger_level')]
00314             self.assertEquals(set(services), set(v))
00315         with fakestdout() as b:
00316             rosservice._rosservice_cmd_list([cmd, 'list', '-n'])
00317             v = [x.strip() for x in b.getvalue().split('\n') if x.strip()]
00318             v = [x for x in v if not x.startswith('/rosout/')]
00319             v = [x for x in v if x.find('/get_loggers ') == -1 and x.find('/set_logger_level ') == -1]
00320             self.assertEquals(set(services_nodes), set(v))
00321         with fakestdout() as b:            
00322             rosservice._rosservice_cmd_list([cmd, 'list', '--nodes'])
00323             v = [x.strip() for x in b.getvalue().split('\n') if x.strip()]
00324             v = [x for x in v if not x.startswith('/rosout/')]
00325             v = [x for x in v if x.find('/get_loggers ') == -1 and x.find('/set_logger_level ') == -1]
00326             self.assertEquals(set(services_nodes), set(v))
00327 
00328         # test with multiple service names
00329         try:
00330             rosservice._rosservice_cmd_list([cmd, 'list', s, s])
00331             self.fail("should have caused parser error")
00332         except SystemExit:
00333             pass
00334 
00335         # test with resolved service names
00336         for s in services:
00337             with fakestdout() as b:
00338                 rosservice._rosservice_cmd_list([cmd, 'list', s])
00339                 self.assertEquals(s, b.getvalue().strip())
00340 
00341         # test with relative service names
00342         s = 'add_two_ints'       
00343         with fakestdout() as b:
00344             rosservice._rosservice_cmd_list([cmd, 'list', s])
00345             self.assertEquals('/add_two_ints', b.getvalue().strip())
00346         with fakestdout() as b:
00347             rosservice._rosservice_cmd_list([cmd, 'list', s, '-n'])
00348             self.assertEquals('/add_two_ints /a2iserver', b.getvalue().strip())
00349         with fakestdout() as b:            
00350             rosservice._rosservice_cmd_list([cmd, 'list', s, '--nodes'])
00351             self.assertEquals('/add_two_ints /a2iserver', b.getvalue().strip())
00352             
00353         # test with namespaces
00354         s = '/foo'
00355         rosservice._rosservice_cmd_list([cmd, 'list', s])
00356         rosservice._rosservice_cmd_list([cmd, 'list', s, '-n'])
00357         rosservice._rosservice_cmd_list([cmd, 'list', s, '--nodes'])
00358         s = 'foo'
00359         rosservice._rosservice_cmd_list([cmd, 'list', s])
00360         rosservice._rosservice_cmd_list([cmd, 'list', s, '-n'])
00361         rosservice._rosservice_cmd_list([cmd, 'list', s, '--nodes'])
00362         
00363 
00364 NAME = 'test_rosservice'
00365 if __name__ == '__main__':
00366     rosunit.unitrun('test_rosservice', NAME, TestRosservice, sys.argv, coverage_packages=['rosservice'])


test_rosservice
Author(s): Ken Conley
autogenerated on Mon Oct 6 2014 11:53:51