test_basic_services.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2008, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 
00034 
00035 ## Integration test for empty services to test serializers
00036 ## and transport
00037 
00038 PKG = 'test_rospy'
00039 NAME = 'basic_services'
00040 
00041 import sys 
00042 import time
00043 import math
00044 import unittest
00045 
00046 import rospy
00047 import rostest
00048 from test_rospy.srv import *
00049 
00050 CONSTANTS_SERVICE_NAKED = 'constants_service_naked'
00051 CONSTANTS_SERVICE_WRAPPED = 'constants_service_wrapped'
00052 
00053 ADD_TWO_INTS_SERVICE_NAKED = 'a2i_naked'
00054 ADD_TWO_INTS_SERVICE_WRAPPED = 'a2i_wrapped'
00055 
00056 STRING_CAT_SERVICE_NAKED = 'string_lower_naked'
00057 STRING_CAT_SERVICE_WRAPPED = 'string_lower_wrapped'
00058 
00059 #TODO:
00060 
00061 STRING_SERVICE       = 'string_service'
00062 EMBEDDED_MSG_SERVICE = 'embedded_msg_service'
00063 
00064 WAIT_TIMEOUT = 10.0 #s
00065 
00066 def add_two_ints_wrapped(req):
00067     from test_ros.srv import AddTwoIntsResponse
00068     return AddTwoIntsResponse(req.a + req.b)
00069 def add_two_ints_naked(req):
00070     return req.a + req.b
00071 
00072 def string_cat_naked(req):
00073     from std_msgs.msg import String
00074     return String(req.str.data + req.str2.val)
00075 def string_cat_wrapped(req):
00076     from test_rospy.srv import StringStringResponse
00077     from std_msgs.msg import String
00078     return StringStringResponse(String(req.str.data + req.str2.val))
00079 
00080 def handle_constants_wrapped(req):
00081     cmr = ConstantsMultiplexRequest
00082     Resp = ConstantsMultiplexResponse
00083     if req.selection == cmr.SELECT_X:
00084         # test w/o wrapper
00085         return Resp(req.selection,
00086                     cmr.BYTE_X, cmr.INT32_X, cmr.UINT32_X, cmr.FLOAT32_X)
00087     elif req.selection == cmr.SELECT_Y:
00088         return Resp(req.selection,
00089             cmr.BYTE_Y, cmr.INT32_Y, cmr.UINT32_Y, cmr.FLOAT32_Y)
00090     elif req.selection == cmr.SELECT_Z:
00091         return Resp(req.selection,
00092             cmr.BYTE_Z, cmr.INT32_Z, cmr.UINT32_Z, cmr.FLOAT32_Z)
00093     else:
00094         print "test failed, req.selection not in (X,Y,Z)", req.selection
00095 
00096 def handle_constants_naked(req):
00097     cmr = ConstantsMultiplexRequest
00098     if req.selection == cmr.SELECT_X:
00099         return req.selection,cmr.BYTE_X, cmr.INT32_X, cmr.UINT32_X, cmr.FLOAT32_X
00100     elif req.selection == cmr.SELECT_Y:
00101         return req.selection, cmr.BYTE_Y, cmr.INT32_Y, cmr.UINT32_Y, cmr.FLOAT32_Y
00102     elif req.selection == cmr.SELECT_Z:
00103         return req.selection, cmr.BYTE_Z, cmr.INT32_Z, cmr.UINT32_Z, cmr.FLOAT32_Z
00104     else:
00105         print "test failed, req.selection not in (X,Y,Z)", req.selection
00106         
00107 def services():
00108     from test_ros.srv import AddTwoInts
00109     rospy.init_node(NAME)
00110     s1 = rospy.Service(CONSTANTS_SERVICE_NAKED, ConstantsMultiplex, handle_constants_naked)
00111     s2 = rospy.Service(CONSTANTS_SERVICE_WRAPPED, ConstantsMultiplex, handle_constants_wrapped)
00112     
00113     s3 = rospy.Service(ADD_TWO_INTS_SERVICE_NAKED, AddTwoInts, add_two_ints_naked)
00114     s4 = rospy.Service(ADD_TWO_INTS_SERVICE_WRAPPED, AddTwoInts, add_two_ints_wrapped)
00115 
00116     s5 = rospy.Service(STRING_CAT_SERVICE_NAKED, StringString, string_cat_naked)
00117     s6 = rospy.Service(STRING_CAT_SERVICE_WRAPPED, StringString, string_cat_wrapped)
00118     
00119     rospy.spin()
00120 
00121 class TestBasicServicesClient(unittest.TestCase):
00122 
00123     # call with request instance style
00124     def _test(self, name, srv, req):
00125         rospy.wait_for_service(name, WAIT_TIMEOUT)        
00126         s = rospy.ServiceProxy(name, srv)
00127         resp = s.call(req)
00128         self.assert_(resp is not None)
00129         return resp
00130 
00131     # call with args style    
00132     def _test_req_naked(self, name, srv, args):
00133         rospy.wait_for_service(name, WAIT_TIMEOUT)        
00134         s = rospy.ServiceProxy(name, srv)
00135         resp = s.call(*args)
00136         self.assert_(resp is not None)
00137         return resp
00138 
00139     # call with keyword style
00140     def _test_req_kwds(self, name, srv, kwds):
00141         rospy.wait_for_service(name, WAIT_TIMEOUT)        
00142         s = rospy.ServiceProxy(name, srv)
00143         resp = s.call(**kwds)
00144         self.assert_(resp is not None)
00145         return resp
00146     
00147     def test_calltype_mismatch(self):
00148         try:
00149             s = rospy.ServiceProxy(CONSTANTS_SERVICE_WRAPPED, ConstantsMultiplex)
00150             s.call(EmptySrvRequest())
00151             self.fail("rospy failed to raise TypeError when request type does not match service type")
00152         except TypeError:
00153             pass
00154         
00155     def test_type_mismatch(self):
00156         try:
00157             self._test(CONSTANTS_SERVICE_WRAPPED, EmptySrv, EmptySrvRequest())
00158             self.fail("Service failed to throw exception on type mismatch [sent EmptySrvRequest to ConstantsMultiplex service")
00159         except rospy.ServiceException:
00160             pass 
00161 
00162     def test_add_two_ints(self):
00163         # add two ints checks single, integer return value, which is
00164         # an interesting in the naked case
00165         from test_ros.srv import AddTwoInts, AddTwoIntsRequest
00166         Cls = AddTwoInts
00167         Req = AddTwoIntsRequest
00168 
00169         for name in [ADD_TWO_INTS_SERVICE_NAKED, ADD_TWO_INTS_SERVICE_WRAPPED]:
00170             resp_req = self._test(name, Cls, Req(1, 2))
00171             resp_req_naked = self._test_req_naked(name, Cls, (1, 2))
00172             resp_req_kwds = self._test_req_kwds(name, Cls, {'a': 3})
00173             for resp in [resp_req, resp_req_naked, resp_req_kwds]:
00174                 self.assertEquals(3, resp.sum)
00175 
00176     def test_String_String(self):
00177         from std_msgs.msg import String
00178         from test_rospy.srv import StringString, StringStringRequest
00179         from test_rospy.msg import Val
00180         Cls = StringString
00181         Req = StringStringRequest
00182 
00183         for name in [STRING_CAT_SERVICE_NAKED, STRING_CAT_SERVICE_WRAPPED]:
00184             resp_req = self._test(name, Cls, Req(String('FOO'), Val('bar')))
00185             resp_req_naked = self._test_req_naked(name, Cls, (String('FOO'), Val('bar'),))
00186             resp_req_kwds = self._test_req_kwds(name, Cls, {'str': String('FOO'), 'str2': Val('bar')})
00187             for resp in [resp_req, resp_req_naked, resp_req_kwds]:
00188                 self.assertEquals('FOObar', resp.str.data)
00189         
00190     def test_constants(self):
00191         Cls = ConstantsMultiplex
00192         Req = ConstantsMultiplexRequest
00193 
00194         for name in [CONSTANTS_SERVICE_NAKED, CONSTANTS_SERVICE_WRAPPED]:
00195             # test all three call forms
00196             resp_req = self._test(name, Cls, Req(Req.SELECT_X))
00197             resp_req_naked = self._test_req_naked(name, Cls, (Req.SELECT_X,))
00198             resp_req_kwds = self._test_req_kwds(name, Cls, {'selection': Req.SELECT_X})
00199 
00200             for resp in [resp_req, resp_req_naked, resp_req_kwds]:
00201                 self.assertEquals(ConstantsMultiplexResponse.CONFIRM_X,
00202                                   resp.select_confirm)
00203                 self.assertEquals(Req.BYTE_X, resp.ret_byte)
00204                 self.assertEquals(Req.INT32_X, resp.ret_int32)
00205                 self.assertEquals(Req.UINT32_X, resp.ret_uint32)
00206                 self.assert_(math.fabs(Req.FLOAT32_X - resp.ret_float32) < 0.001)
00207 
00208             resp = self._test(name, Cls,
00209                               ConstantsMultiplexRequest(Req.SELECT_Y))
00210             self.assertEquals(ConstantsMultiplexResponse.CONFIRM_Y,
00211                               resp.select_confirm)
00212             self.assertEquals(Req.BYTE_Y, resp.ret_byte)
00213             self.assertEquals(Req.INT32_Y, resp.ret_int32)
00214             self.assertEquals(Req.UINT32_Y, resp.ret_uint32)
00215             self.assert_(math.fabs(Req.FLOAT32_Y - resp.ret_float32) < 0.001)
00216 
00217             resp = self._test(name, Cls,
00218                               ConstantsMultiplexRequest(Req.SELECT_Z))
00219             self.assertEquals(ConstantsMultiplexResponse.CONFIRM_Z,
00220                               resp.select_confirm)
00221             self.assertEquals(Req.BYTE_Z, resp.ret_byte)
00222             self.assertEquals(Req.INT32_Z, resp.ret_int32)
00223             self.assertEquals(Req.UINT32_Z, resp.ret_uint32)
00224             self.assert_(math.fabs(Req.FLOAT32_Z - resp.ret_float32) < 0.001)
00225         
00226 if __name__ == '__main__':
00227     if '--service' in sys.argv:
00228         services()
00229     else:
00230         rostest.run(PKG, 'rospy_basic_services', TestBasicServicesClient, sys.argv)


test_rospy
Author(s): Ken Conley/kwc@willowgarage.com
autogenerated on Sat Dec 28 2013 17:36:20