00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038 PKG = 'test_rospy'
00039 NAME = 'basic_services'
00040
00041 import sys
00042 import time
00043 import math
00044 import unittest
00045
00046 import rospy
00047 import rostest
00048 from test_rospy.srv import *
00049
00050 CONSTANTS_SERVICE_NAKED = 'constants_service_naked'
00051 CONSTANTS_SERVICE_WRAPPED = 'constants_service_wrapped'
00052
00053 ADD_TWO_INTS_SERVICE_NAKED = 'a2i_naked'
00054 ADD_TWO_INTS_SERVICE_WRAPPED = 'a2i_wrapped'
00055
00056 STRING_CAT_SERVICE_NAKED = 'string_lower_naked'
00057 STRING_CAT_SERVICE_WRAPPED = 'string_lower_wrapped'
00058
00059 FAULTY_SERVICE = 'faulty_service'
00060 FAULTY_SERVICE_RESULT = 'faulty_service_result'
00061
00062
00063
00064 STRING_SERVICE = 'string_service'
00065 EMBEDDED_MSG_SERVICE = 'embedded_msg_service'
00066
00067 WAIT_TIMEOUT = 10.0
00068
00069 def add_two_ints_wrapped(req):
00070 from test_rosmaster.srv import AddTwoIntsResponse
00071 return AddTwoIntsResponse(req.a + req.b)
00072 def add_two_ints_naked(req):
00073 return req.a + req.b
00074
00075 def string_cat_naked(req):
00076 from std_msgs.msg import String
00077 return String(req.str.data + req.str2.val)
00078 def string_cat_wrapped(req):
00079 from test_rospy.srv import StringStringResponse
00080 from std_msgs.msg import String
00081 return StringStringResponse(String(req.str.data + req.str2.val))
00082
00083 def handle_constants_wrapped(req):
00084 cmr = ConstantsMultiplexRequest
00085 Resp = ConstantsMultiplexResponse
00086 if req.selection == cmr.SELECT_X:
00087
00088 return Resp(req.selection,
00089 cmr.BYTE_X, cmr.INT32_X, cmr.UINT32_X, cmr.FLOAT32_X)
00090 elif req.selection == cmr.SELECT_Y:
00091 return Resp(req.selection,
00092 cmr.BYTE_Y, cmr.INT32_Y, cmr.UINT32_Y, cmr.FLOAT32_Y)
00093 elif req.selection == cmr.SELECT_Z:
00094 return Resp(req.selection,
00095 cmr.BYTE_Z, cmr.INT32_Z, cmr.UINT32_Z, cmr.FLOAT32_Z)
00096 else:
00097 print "test failed, req.selection not in (X,Y,Z)", req.selection
00098
00099 def handle_constants_naked(req):
00100 cmr = ConstantsMultiplexRequest
00101 if req.selection == cmr.SELECT_X:
00102 return req.selection,cmr.BYTE_X, cmr.INT32_X, cmr.UINT32_X, cmr.FLOAT32_X
00103 elif req.selection == cmr.SELECT_Y:
00104 return req.selection, cmr.BYTE_Y, cmr.INT32_Y, cmr.UINT32_Y, cmr.FLOAT32_Y
00105 elif req.selection == cmr.SELECT_Z:
00106 return req.selection, cmr.BYTE_Z, cmr.INT32_Z, cmr.UINT32_Z, cmr.FLOAT32_Z
00107 else:
00108 print "test failed, req.selection not in (X,Y,Z)", req.selection
00109
00110 class UnexpectedException(Exception):
00111 pass
00112
00113
00114 class FaultyHandler(object):
00115 def __init__(self):
00116 self.test_call = False
00117
00118 def custom_error_handler(self, e, exc_type, exc_value, tb):
00119 self.test_call = True
00120
00121 def call_handler(self, req):
00122 raise UnexpectedException('Call raised an exception')
00123
00124 def result_handler(self, req):
00125 resp = EmptyReqSrvResponse()
00126 resp.fake_secret = 1 if self.test_call else 0
00127 return resp
00128
00129
00130 def services():
00131 from test_rosmaster.srv import AddTwoInts
00132 rospy.init_node(NAME)
00133 s1 = rospy.Service(CONSTANTS_SERVICE_NAKED, ConstantsMultiplex, handle_constants_naked)
00134 s2 = rospy.Service(CONSTANTS_SERVICE_WRAPPED, ConstantsMultiplex, handle_constants_wrapped)
00135
00136 s3 = rospy.Service(ADD_TWO_INTS_SERVICE_NAKED, AddTwoInts, add_two_ints_naked)
00137 s4 = rospy.Service(ADD_TWO_INTS_SERVICE_WRAPPED, AddTwoInts, add_two_ints_wrapped)
00138
00139 s5 = rospy.Service(STRING_CAT_SERVICE_NAKED, StringString, string_cat_naked)
00140 s6 = rospy.Service(STRING_CAT_SERVICE_WRAPPED, StringString, string_cat_wrapped)
00141
00142 faulty_handler = FaultyHandler()
00143 s7 = rospy.Service(FAULTY_SERVICE, EmptySrv, faulty_handler.call_handler, error_handler=faulty_handler.custom_error_handler)
00144 s8 = rospy.Service(FAULTY_SERVICE_RESULT, EmptyReqSrv, faulty_handler.result_handler)
00145
00146 rospy.spin()
00147
00148 class TestBasicServicesClient(unittest.TestCase):
00149
00150
00151 def _test(self, name, srv, req):
00152 rospy.wait_for_service(name, WAIT_TIMEOUT)
00153 s = rospy.ServiceProxy(name, srv)
00154 resp = s.call(req)
00155 self.assert_(resp is not None)
00156 return resp
00157
00158
00159 def _test_req_naked(self, name, srv, args):
00160 rospy.wait_for_service(name, WAIT_TIMEOUT)
00161 s = rospy.ServiceProxy(name, srv)
00162 resp = s.call(*args)
00163 self.assert_(resp is not None)
00164 return resp
00165
00166
00167 def _test_req_kwds(self, name, srv, kwds):
00168 rospy.wait_for_service(name, WAIT_TIMEOUT)
00169 s = rospy.ServiceProxy(name, srv)
00170 resp = s.call(**kwds)
00171 self.assert_(resp is not None)
00172 return resp
00173
00174 def test_calltype_mismatch(self):
00175 try:
00176 s = rospy.ServiceProxy(CONSTANTS_SERVICE_WRAPPED, ConstantsMultiplex)
00177 s.call(EmptySrvRequest())
00178 self.fail("rospy failed to raise TypeError when request type does not match service type")
00179 except TypeError:
00180 pass
00181
00182 def test_type_mismatch(self):
00183 try:
00184 self._test(CONSTANTS_SERVICE_WRAPPED, EmptySrv, EmptySrvRequest())
00185 self.fail("Service failed to throw exception on type mismatch [sent EmptySrvRequest to ConstantsMultiplex service")
00186 except rospy.ServiceException:
00187 pass
00188
00189 def test_add_two_ints(self):
00190
00191
00192 from test_rosmaster.srv import AddTwoInts, AddTwoIntsRequest
00193 Cls = AddTwoInts
00194 Req = AddTwoIntsRequest
00195
00196 for name in [ADD_TWO_INTS_SERVICE_NAKED, ADD_TWO_INTS_SERVICE_WRAPPED]:
00197 resp_req = self._test(name, Cls, Req(1, 2))
00198 resp_req_naked = self._test_req_naked(name, Cls, (1, 2))
00199 resp_req_kwds = self._test_req_kwds(name, Cls, {'a': 3})
00200 for resp in [resp_req, resp_req_naked, resp_req_kwds]:
00201 self.assertEquals(3, resp.sum)
00202
00203 def test_String_String(self):
00204 from std_msgs.msg import String
00205 from test_rospy.srv import StringString, StringStringRequest
00206 from test_rospy.msg import Val
00207 Cls = StringString
00208 Req = StringStringRequest
00209
00210 for name in [STRING_CAT_SERVICE_NAKED, STRING_CAT_SERVICE_WRAPPED]:
00211 resp_req = self._test(name, Cls, Req(String('FOO'), Val('bar')))
00212 resp_req_naked = self._test_req_naked(name, Cls, (String('FOO'), Val('bar'),))
00213 resp_req_kwds = self._test_req_kwds(name, Cls, {'str': String('FOO'), 'str2': Val('bar')})
00214 for resp in [resp_req, resp_req_naked, resp_req_kwds]:
00215 self.assertEquals('FOObar', resp.str.data)
00216
00217 def test_constants(self):
00218 Cls = ConstantsMultiplex
00219 Req = ConstantsMultiplexRequest
00220
00221 for name in [CONSTANTS_SERVICE_NAKED, CONSTANTS_SERVICE_WRAPPED]:
00222
00223 resp_req = self._test(name, Cls, Req(Req.SELECT_X))
00224 resp_req_naked = self._test_req_naked(name, Cls, (Req.SELECT_X,))
00225 resp_req_kwds = self._test_req_kwds(name, Cls, {'selection': Req.SELECT_X})
00226
00227 for resp in [resp_req, resp_req_naked, resp_req_kwds]:
00228 self.assertEquals(ConstantsMultiplexResponse.CONFIRM_X,
00229 resp.select_confirm)
00230 self.assertEquals(Req.BYTE_X, resp.ret_byte)
00231 self.assertEquals(Req.INT32_X, resp.ret_int32)
00232 self.assertEquals(Req.UINT32_X, resp.ret_uint32)
00233 self.assert_(math.fabs(Req.FLOAT32_X - resp.ret_float32) < 0.001)
00234
00235 resp = self._test(name, Cls,
00236 ConstantsMultiplexRequest(Req.SELECT_Y))
00237 self.assertEquals(ConstantsMultiplexResponse.CONFIRM_Y,
00238 resp.select_confirm)
00239 self.assertEquals(Req.BYTE_Y, resp.ret_byte)
00240 self.assertEquals(Req.INT32_Y, resp.ret_int32)
00241 self.assertEquals(Req.UINT32_Y, resp.ret_uint32)
00242 self.assert_(math.fabs(Req.FLOAT32_Y - resp.ret_float32) < 0.001)
00243
00244 resp = self._test(name, Cls,
00245 ConstantsMultiplexRequest(Req.SELECT_Z))
00246 self.assertEquals(ConstantsMultiplexResponse.CONFIRM_Z,
00247 resp.select_confirm)
00248 self.assertEquals(Req.BYTE_Z, resp.ret_byte)
00249 self.assertEquals(Req.INT32_Z, resp.ret_int32)
00250 self.assertEquals(Req.UINT32_Z, resp.ret_uint32)
00251 self.assert_(math.fabs(Req.FLOAT32_Z - resp.ret_float32) < 0.001)
00252
00253 def test_faulty_service(self):
00254 rospy.wait_for_service(FAULTY_SERVICE, WAIT_TIMEOUT)
00255 sproxy = rospy.ServiceProxy(FAULTY_SERVICE, EmptySrv)
00256
00257 rospy.wait_for_service(FAULTY_SERVICE_RESULT, WAIT_TIMEOUT)
00258 sproxy_result = rospy.ServiceProxy(FAULTY_SERVICE_RESULT, EmptyReqSrv)
00259
00260 resp = sproxy_result.call(EmptyReqSrvRequest())
00261 self.assertEquals(resp.fake_secret, 0)
00262 try:
00263 resp = sproxy.call(EmptySrvRequest())
00264 self.assert_(False)
00265 except rospy.ServiceException:
00266 pass
00267 resp = sproxy_result.call(EmptyReqSrvRequest())
00268 self.assertEquals(resp.fake_secret, 1)
00269
00270 if __name__ == '__main__':
00271 if '--service' in sys.argv:
00272 services()
00273 else:
00274 rostest.run(PKG, 'rospy_basic_services', TestBasicServicesClient, sys.argv)