test_rosservice_command_line_online.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2009, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 
00034 import os
00035 import sys 
00036 import time
00037 import unittest
00038 
00039 import rospy
00040 import rostest
00041 
00042 import test_rosmaster.srv
00043 
00044 from subprocess import Popen, PIPE, check_call, call
00045 
00046 from threading import Thread
00047 class TestTask(Thread):
00048     def __init__(self, task):
00049         Thread.__init__(self)
00050         self.task = task
00051         self.success = False
00052         self.done = False
00053         self.value = None
00054         
00055     def run(self):
00056         try:
00057             print "STARTING TASK"
00058             self.value = self.task()
00059             print "TASK HAS COMPLETED"
00060             self.success = True
00061         except:
00062             import traceback
00063             traceback.print_exc()
00064         self.done = True
00065 
00066 class TestRosserviceOnline(unittest.TestCase):
00067 
00068     def setUp(self):
00069         pass
00070         
00071     def test_rosservice(self):
00072         # wait for network to initialize
00073         services = ['/add_two_ints', '/foo/add_two_ints', '/bar/add_two_ints']
00074         for s in services:
00075             rospy.wait_for_service(s)
00076 
00077         cmd = 'rosservice'
00078         names = ['add_two_ints', '/add_two_ints', 'foo/add_two_ints', '/bar/add_two_ints']
00079 
00080         # list
00081         # - hard to exact match as we are still adding builtin services to nodes (e.g. set_logger_level)
00082         output = Popen([cmd, 'list'], stdout=PIPE).communicate()[0]
00083         l = set(output.split())
00084         for s in services:
00085             self.assert_(s in l)
00086 
00087         for name in names:
00088             # args
00089             output = Popen([cmd, 'args', name], stdout=PIPE).communicate()[0]
00090             self.assertEquals('a b', output.strip())
00091 
00092             # type
00093             output = Popen([cmd, 'type', name], stdout=PIPE).communicate()[0]
00094             self.assertEquals('test_rosmaster/AddTwoInts', output.strip())
00095 
00096             # find
00097             output = Popen([cmd, 'find', 'test_rosmaster/AddTwoInts'], stdout=PIPE).communicate()[0]
00098             values = [v.strip() for v in output.split('\n') if v.strip()]
00099             self.assertEquals(set(values), set(services))
00100 
00101             # uri
00102             output = Popen([cmd, 'uri', name], stdout=PIPE).communicate()[0]
00103             # - no exact answer
00104             self.assert_(output.startswith('rosrpc://'), output)
00105 
00106             # call
00107             output = Popen([cmd, 'call', '--wait', name, '1', '2'], stdout=PIPE).communicate()[0]
00108             self.assertEquals('sum: 3', output.strip())
00109 
00110             output = Popen([cmd, 'call', name, '1', '2'], stdout=PIPE).communicate()[0]
00111             self.assertEquals('sum: 3', output.strip())
00112 
00113         name = 'header_echo'
00114         # test with a Header so we can validate keyword args
00115         import yaml
00116         import time
00117         t = time.time()
00118         
00119         # test with empty headers
00120         for v in ['{}', '{header: {}}', '{header: {seq: 0}}']:
00121             output = Popen([cmd, 'call', name, v], stdout=PIPE).communicate()[0]
00122             output = output.strip()
00123             self.assert_(output, output)
00124             val = yaml.load(output)['header']
00125             self.assertEquals('', val['frame_id'])
00126             self.assert_(val['seq'] >= 0)
00127             self.assertEquals(0, val['stamp']['secs'])
00128             self.assertEquals(0, val['stamp']['nsecs'])
00129 
00130         # test with auto headers
00131         for v in ['{header: auto}', '{header: {stamp: now}}']:
00132             output = Popen([cmd, 'call', name, v], stdout=PIPE).communicate()[0]
00133             val = yaml.load(output.strip())['header']
00134             self.assertEquals('', val['frame_id'])
00135             self.assert_(val['seq'] >= 0)
00136             self.assert_(val['stamp']['secs'] >= int(t))
00137         
00138 
00139         # verify that it respects ROS_NS
00140         # - the uris should be different as the names should resolve differently
00141         env = os.environ.copy()
00142         env['ROS_NAMESPACE'] = 'foo'
00143         uri1 = Popen([cmd, 'uri', 'add_two_ints'], stdout=PIPE).communicate()[0]
00144         uri2 = Popen([cmd, 'uri', 'add_two_ints'], env=env, stdout=PIPE).communicate()[0]
00145         self.assert_(uri2.startswith('rosrpc://'))
00146         self.assertNotEquals(uri1, uri2)
00147 
00148         # test_call_wait
00149         def task1():
00150             output = Popen([cmd, 'call', '--wait', 'wait_two_ints', '1', '2'], stdout=PIPE).communicate()[0]
00151             self.assertEquals('sum: 3', output.strip())
00152         timeout_t = time.time() + 5.
00153         t1 = TestTask(task1)
00154         t1.start()
00155         
00156         rospy.init_node('test_call_wait')
00157         rospy.Service("wait_two_ints", test_rosmaster.srv.AddTwoInts, lambda x: x.a + x.b)
00158         while not t1.done and time.time() < timeout_t:
00159             time.sleep(0.5)
00160         self.assert_(t1.success)
00161         
00162 
00163 PKG = 'test_rosservice'
00164 NAME = 'test_rosservice_command_line_online'
00165 if __name__ == '__main__':
00166     rostest.run(PKG, NAME, TestRosserviceOnline, sys.argv)


test_rosservice
Author(s): Ken Conley
autogenerated on Mon Oct 6 2014 11:53:51