test_basic_services.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2008, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 
00034 
00035 ## Integration test for empty services to test serializers
00036 ## and transport
00037 
00038 PKG = 'test_rospy'
00039 NAME = 'basic_services'
00040 
00041 import sys 
00042 import time
00043 import math
00044 import unittest
00045 
00046 import rospy
00047 import rostest
00048 from test_rospy.srv import *
00049 
00050 CONSTANTS_SERVICE_NAKED = 'constants_service_naked'
00051 CONSTANTS_SERVICE_WRAPPED = 'constants_service_wrapped'
00052 
00053 ADD_TWO_INTS_SERVICE_NAKED = 'a2i_naked'
00054 ADD_TWO_INTS_SERVICE_WRAPPED = 'a2i_wrapped'
00055 
00056 STRING_CAT_SERVICE_NAKED = 'string_lower_naked'
00057 STRING_CAT_SERVICE_WRAPPED = 'string_lower_wrapped'
00058 
00059 FAULTY_SERVICE = 'faulty_service'
00060 FAULTY_SERVICE_RESULT = 'faulty_service_result'
00061 
00062 #TODO:
00063 
00064 STRING_SERVICE       = 'string_service'
00065 EMBEDDED_MSG_SERVICE = 'embedded_msg_service'
00066 
00067 WAIT_TIMEOUT = 10.0 #s
00068 
00069 def add_two_ints_wrapped(req):
00070     from test_rosmaster.srv import AddTwoIntsResponse
00071     return AddTwoIntsResponse(req.a + req.b)
00072 def add_two_ints_naked(req):
00073     return req.a + req.b
00074 
00075 def string_cat_naked(req):
00076     from std_msgs.msg import String
00077     return String(req.str.data + req.str2.val)
00078 def string_cat_wrapped(req):
00079     from test_rospy.srv import StringStringResponse
00080     from std_msgs.msg import String
00081     return StringStringResponse(String(req.str.data + req.str2.val))
00082 
00083 def handle_constants_wrapped(req):
00084     cmr = ConstantsMultiplexRequest
00085     Resp = ConstantsMultiplexResponse
00086     if req.selection == cmr.SELECT_X:
00087         # test w/o wrapper
00088         return Resp(req.selection,
00089                     cmr.BYTE_X, cmr.INT32_X, cmr.UINT32_X, cmr.FLOAT32_X)
00090     elif req.selection == cmr.SELECT_Y:
00091         return Resp(req.selection,
00092             cmr.BYTE_Y, cmr.INT32_Y, cmr.UINT32_Y, cmr.FLOAT32_Y)
00093     elif req.selection == cmr.SELECT_Z:
00094         return Resp(req.selection,
00095             cmr.BYTE_Z, cmr.INT32_Z, cmr.UINT32_Z, cmr.FLOAT32_Z)
00096     else:
00097         print("test failed, req.selection not in (X,Y,Z)", req.selection)
00098 
00099 def handle_constants_naked(req):
00100     cmr = ConstantsMultiplexRequest
00101     if req.selection == cmr.SELECT_X:
00102         return req.selection,cmr.BYTE_X, cmr.INT32_X, cmr.UINT32_X, cmr.FLOAT32_X
00103     elif req.selection == cmr.SELECT_Y:
00104         return req.selection, cmr.BYTE_Y, cmr.INT32_Y, cmr.UINT32_Y, cmr.FLOAT32_Y
00105     elif req.selection == cmr.SELECT_Z:
00106         return req.selection, cmr.BYTE_Z, cmr.INT32_Z, cmr.UINT32_Z, cmr.FLOAT32_Z
00107     else:
00108         print("test failed, req.selection not in (X,Y,Z)", req.selection)
00109 
00110 class UnexpectedException(Exception):
00111     pass
00112 
00113 
00114 class FaultyHandler(object):
00115     def __init__(self):
00116         self.test_call = False
00117 
00118     def custom_error_handler(self, e, exc_type, exc_value, tb):
00119         self.test_call = True
00120 
00121     def call_handler(self, req):
00122         raise UnexpectedException('Call raised an exception')
00123 
00124     def result_handler(self, req):
00125         resp = EmptyReqSrvResponse()
00126         resp.fake_secret = 1 if self.test_call else 0
00127         return resp
00128 
00129 
00130 def services():
00131     from test_rosmaster.srv import AddTwoInts
00132     rospy.init_node(NAME)
00133     s1 = rospy.Service(CONSTANTS_SERVICE_NAKED, ConstantsMultiplex, handle_constants_naked)
00134     s2 = rospy.Service(CONSTANTS_SERVICE_WRAPPED, ConstantsMultiplex, handle_constants_wrapped)
00135     
00136     s3 = rospy.Service(ADD_TWO_INTS_SERVICE_NAKED, AddTwoInts, add_two_ints_naked)
00137     s4 = rospy.Service(ADD_TWO_INTS_SERVICE_WRAPPED, AddTwoInts, add_two_ints_wrapped)
00138 
00139     s5 = rospy.Service(STRING_CAT_SERVICE_NAKED, StringString, string_cat_naked)
00140     s6 = rospy.Service(STRING_CAT_SERVICE_WRAPPED, StringString, string_cat_wrapped)
00141 
00142     faulty_handler = FaultyHandler()
00143     s7 = rospy.Service(FAULTY_SERVICE, EmptySrv, faulty_handler.call_handler, error_handler=faulty_handler.custom_error_handler)
00144     s8 = rospy.Service(FAULTY_SERVICE_RESULT, EmptyReqSrv, faulty_handler.result_handler)
00145 
00146     rospy.spin()
00147 
00148 class TestBasicServicesClient(unittest.TestCase):
00149 
00150     # call with request instance style
00151     def _test(self, name, srv, req):
00152         rospy.wait_for_service(name, WAIT_TIMEOUT)        
00153         s = rospy.ServiceProxy(name, srv)
00154         resp = s.call(req)
00155         self.assert_(resp is not None)
00156         return resp
00157 
00158     # call with args style    
00159     def _test_req_naked(self, name, srv, args):
00160         rospy.wait_for_service(name, WAIT_TIMEOUT)        
00161         s = rospy.ServiceProxy(name, srv)
00162         resp = s.call(*args)
00163         self.assert_(resp is not None)
00164         return resp
00165 
00166     # call with keyword style
00167     def _test_req_kwds(self, name, srv, kwds):
00168         rospy.wait_for_service(name, WAIT_TIMEOUT)        
00169         s = rospy.ServiceProxy(name, srv)
00170         resp = s.call(**kwds)
00171         self.assert_(resp is not None)
00172         return resp
00173     
00174     def test_calltype_mismatch(self):
00175         try:
00176             s = rospy.ServiceProxy(CONSTANTS_SERVICE_WRAPPED, ConstantsMultiplex)
00177             s.call(EmptySrvRequest())
00178             self.fail("rospy failed to raise TypeError when request type does not match service type")
00179         except TypeError:
00180             pass
00181         
00182     def test_type_mismatch(self):
00183         try:
00184             self._test(CONSTANTS_SERVICE_WRAPPED, EmptySrv, EmptySrvRequest())
00185             self.fail("Service failed to throw exception on type mismatch [sent EmptySrvRequest to ConstantsMultiplex service")
00186         except rospy.ServiceException:
00187             pass 
00188 
00189     def test_add_two_ints(self):
00190         # add two ints checks single, integer return value, which is
00191         # an interesting in the naked case
00192         from test_rosmaster.srv import AddTwoInts, AddTwoIntsRequest
00193         Cls = AddTwoInts
00194         Req = AddTwoIntsRequest
00195 
00196         for name in [ADD_TWO_INTS_SERVICE_NAKED, ADD_TWO_INTS_SERVICE_WRAPPED]:
00197             resp_req = self._test(name, Cls, Req(1, 2))
00198             resp_req_naked = self._test_req_naked(name, Cls, (1, 2))
00199             resp_req_kwds = self._test_req_kwds(name, Cls, {'a': 3})
00200             for resp in [resp_req, resp_req_naked, resp_req_kwds]:
00201                 self.assertEquals(3, resp.sum)
00202 
00203     def test_String_String(self):
00204         from std_msgs.msg import String
00205         from test_rospy.srv import StringString, StringStringRequest
00206         from test_rospy.msg import Val
00207         Cls = StringString
00208         Req = StringStringRequest
00209 
00210         for name in [STRING_CAT_SERVICE_NAKED, STRING_CAT_SERVICE_WRAPPED]:
00211             resp_req = self._test(name, Cls, Req(String('FOO'), Val('bar')))
00212             resp_req_naked = self._test_req_naked(name, Cls, (String('FOO'), Val('bar'),))
00213             resp_req_kwds = self._test_req_kwds(name, Cls, {'str': String('FOO'), 'str2': Val('bar')})
00214             for resp in [resp_req, resp_req_naked, resp_req_kwds]:
00215                 self.assertEquals('FOObar', resp.str.data)
00216         
00217     def test_constants(self):
00218         Cls = ConstantsMultiplex
00219         Req = ConstantsMultiplexRequest
00220 
00221         for name in [CONSTANTS_SERVICE_NAKED, CONSTANTS_SERVICE_WRAPPED]:
00222             # test all three call forms
00223             resp_req = self._test(name, Cls, Req(Req.SELECT_X))
00224             resp_req_naked = self._test_req_naked(name, Cls, (Req.SELECT_X,))
00225             resp_req_kwds = self._test_req_kwds(name, Cls, {'selection': Req.SELECT_X})
00226 
00227             for resp in [resp_req, resp_req_naked, resp_req_kwds]:
00228                 self.assertEquals(ConstantsMultiplexResponse.CONFIRM_X,
00229                                   resp.select_confirm)
00230                 self.assertEquals(Req.BYTE_X, resp.ret_byte)
00231                 self.assertEquals(Req.INT32_X, resp.ret_int32)
00232                 self.assertEquals(Req.UINT32_X, resp.ret_uint32)
00233                 self.assert_(math.fabs(Req.FLOAT32_X - resp.ret_float32) < 0.001)
00234 
00235             resp = self._test(name, Cls,
00236                               ConstantsMultiplexRequest(Req.SELECT_Y))
00237             self.assertEquals(ConstantsMultiplexResponse.CONFIRM_Y,
00238                               resp.select_confirm)
00239             self.assertEquals(Req.BYTE_Y, resp.ret_byte)
00240             self.assertEquals(Req.INT32_Y, resp.ret_int32)
00241             self.assertEquals(Req.UINT32_Y, resp.ret_uint32)
00242             self.assert_(math.fabs(Req.FLOAT32_Y - resp.ret_float32) < 0.001)
00243 
00244             resp = self._test(name, Cls,
00245                               ConstantsMultiplexRequest(Req.SELECT_Z))
00246             self.assertEquals(ConstantsMultiplexResponse.CONFIRM_Z,
00247                               resp.select_confirm)
00248             self.assertEquals(Req.BYTE_Z, resp.ret_byte)
00249             self.assertEquals(Req.INT32_Z, resp.ret_int32)
00250             self.assertEquals(Req.UINT32_Z, resp.ret_uint32)
00251             self.assert_(math.fabs(Req.FLOAT32_Z - resp.ret_float32) < 0.001)
00252 
00253     def test_faulty_service(self):
00254         rospy.wait_for_service(FAULTY_SERVICE, WAIT_TIMEOUT)
00255         sproxy = rospy.ServiceProxy(FAULTY_SERVICE, EmptySrv)
00256 
00257         rospy.wait_for_service(FAULTY_SERVICE_RESULT, WAIT_TIMEOUT)
00258         sproxy_result = rospy.ServiceProxy(FAULTY_SERVICE_RESULT, EmptyReqSrv)
00259 
00260         resp = sproxy_result.call(EmptyReqSrvRequest())
00261         self.assertEquals(resp.fake_secret, 0)
00262         try:
00263             resp = sproxy.call(EmptySrvRequest())
00264             self.assert_(False)
00265         except rospy.ServiceException:
00266             pass
00267         resp = sproxy_result.call(EmptyReqSrvRequest())
00268         self.assertEquals(resp.fake_secret, 1)
00269 
00270 if __name__ == '__main__':
00271     if '--service' in sys.argv:
00272         services()
00273     else:
00274         rostest.run(PKG, 'rospy_basic_services', TestBasicServicesClient, sys.argv)


test_rospy
Author(s): Ken Conley
autogenerated on Thu Jun 6 2019 21:10:57