check_rosservice_command_line_online.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2009, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 
00034 import os
00035 import sys 
00036 import time
00037 import unittest
00038 
00039 import rospy
00040 import rostest
00041 
00042 import test_rosmaster.srv
00043 
00044 from subprocess import Popen, PIPE, check_call, call
00045 
00046 from threading import Thread
00047 class TestTask(Thread):
00048     def __init__(self, task):
00049         Thread.__init__(self)
00050         self.task = task
00051         self.success = False
00052         self.done = False
00053         self.value = None
00054         
00055     def run(self):
00056         try:
00057             print("STARTING TASK")
00058             self.value = self.task()
00059             print("TASK HAS COMPLETED")
00060             self.success = True
00061         except:
00062             import traceback
00063             traceback.print_exc()
00064         self.done = True
00065 
00066 class TestRosserviceOnline(unittest.TestCase):
00067 
00068     def setUp(self):
00069         pass
00070         
00071     def test_rosservice(self):
00072         # wait for network to initialize
00073         services = ['/add_two_ints', '/foo/add_two_ints', '/bar/add_two_ints']
00074         for s in services:
00075             rospy.wait_for_service(s)
00076 
00077         cmd = 'rosservice'
00078         names = ['add_two_ints', '/add_two_ints', 'foo/add_two_ints', '/bar/add_two_ints']
00079 
00080         # list
00081         # - hard to exact match as we are still adding builtin services to nodes (e.g. set_logger_level)
00082         output = Popen([cmd, 'list'], stdout=PIPE).communicate()[0]
00083         output = output.decode()
00084         l = set(output.split())
00085         for s in services:
00086             self.assert_(s in l)
00087 
00088         for name in names:
00089             # args
00090             output = Popen([cmd, 'args', name], stdout=PIPE).communicate()[0]
00091             self.assertEquals('a b', output.strip())
00092 
00093             # type
00094             output = Popen([cmd, 'type', name], stdout=PIPE).communicate()[0]
00095             self.assertEquals('test_rosmaster/AddTwoInts', output.strip())
00096 
00097             # find
00098             output = Popen([cmd, 'find', 'test_rosmaster/AddTwoInts'], stdout=PIPE).communicate()[0]
00099             values = [v.strip() for v in output.split('\n') if v.strip()]
00100             self.assertEquals(set(values), set(services))
00101 
00102             # uri
00103             output = Popen([cmd, 'uri', name], stdout=PIPE).communicate()[0]
00104             # - no exact answer
00105             self.assert_(output.startswith('rosrpc://'), output)
00106 
00107             # call
00108             output = Popen([cmd, 'call', '--wait', name, '1', '2'], stdout=PIPE).communicate()[0]
00109             self.assertEquals('sum: 3', output.strip())
00110 
00111             output = Popen([cmd, 'call', name, '1', '2'], stdout=PIPE).communicate()[0]
00112             self.assertEquals('sum: 3', output.strip())
00113 
00114         name = 'header_echo'
00115         # test with a Header so we can validate keyword args
00116         import yaml
00117         import time
00118         t = time.time()
00119         
00120         # test with empty headers
00121         for v in ['{}', '{header: {}}', '{header: {seq: 0}}']:
00122             output = Popen([cmd, 'call', name, v], stdout=PIPE).communicate()[0]
00123             output = output.strip()
00124             self.assert_(output, output)
00125             val = yaml.load(output)['header']
00126             self.assertEquals('', val['frame_id'])
00127             self.assert_(val['seq'] >= 0)
00128             self.assertEquals(0, val['stamp']['secs'])
00129             self.assertEquals(0, val['stamp']['nsecs'])
00130 
00131         # test with auto headers
00132         for v in ['{header: auto}', '{header: {stamp: now}}']:
00133             output = Popen([cmd, 'call', name, v], stdout=PIPE).communicate()[0]
00134             val = yaml.load(output.strip())['header']
00135             self.assertEquals('', val['frame_id'])
00136             self.assert_(val['seq'] >= 0)
00137             self.assert_(val['stamp']['secs'] >= int(t))
00138         
00139 
00140         # verify that it respects ROS_NS
00141         # - the uris should be different as the names should resolve differently
00142         env = os.environ.copy()
00143         env['ROS_NAMESPACE'] = 'foo'
00144         uri1 = Popen([cmd, 'uri', 'add_two_ints'], stdout=PIPE).communicate()[0]
00145         uri2 = Popen([cmd, 'uri', 'add_two_ints'], env=env, stdout=PIPE).communicate()[0]
00146         self.assert_(uri2.startswith('rosrpc://'))
00147         self.assertNotEquals(uri1, uri2)
00148 
00149         # test_call_wait
00150         def task1():
00151             output = Popen([cmd, 'call', '--wait', 'wait_two_ints', '1', '2'], stdout=PIPE).communicate()[0]
00152             self.assertEquals('sum: 3', output.strip())
00153         timeout_t = time.time() + 5.
00154         t1 = TestTask(task1)
00155         t1.start()
00156         
00157         rospy.init_node('test_call_wait')
00158         rospy.Service("wait_two_ints", test_rosmaster.srv.AddTwoInts, lambda x: x.a + x.b)
00159         while not t1.done and time.time() < timeout_t:
00160             time.sleep(0.5)
00161         self.assert_(t1.success)
00162         
00163 
00164 PKG = 'test_rosservice'
00165 NAME = 'test_rosservice_command_line_online'
00166 if __name__ == '__main__':
00167     rostest.run(PKG, NAME, TestRosserviceOnline, sys.argv)


test_rosservice
Author(s): Ken Conley
autogenerated on Thu Jun 6 2019 21:10:58