00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038 PKG = 'test_rospy'
00039 NAME = 'basic_services'
00040
00041 import sys
00042 import time
00043 import math
00044 import unittest
00045
00046 import rospy
00047 import rostest
00048 from test_rospy.srv import *
00049
00050 CONSTANTS_SERVICE_NAKED = 'constants_service_naked'
00051 CONSTANTS_SERVICE_WRAPPED = 'constants_service_wrapped'
00052
00053 ADD_TWO_INTS_SERVICE_NAKED = 'a2i_naked'
00054 ADD_TWO_INTS_SERVICE_WRAPPED = 'a2i_wrapped'
00055
00056 STRING_CAT_SERVICE_NAKED = 'string_lower_naked'
00057 STRING_CAT_SERVICE_WRAPPED = 'string_lower_wrapped'
00058
00059
00060
00061 STRING_SERVICE = 'string_service'
00062 EMBEDDED_MSG_SERVICE = 'embedded_msg_service'
00063
00064 WAIT_TIMEOUT = 10.0
00065
00066 def add_two_ints_wrapped(req):
00067 from test_ros.srv import AddTwoIntsResponse
00068 return AddTwoIntsResponse(req.a + req.b)
00069 def add_two_ints_naked(req):
00070 return req.a + req.b
00071
00072 def string_cat_naked(req):
00073 from std_msgs.msg import String
00074 return String(req.str.data + req.str2.val)
00075 def string_cat_wrapped(req):
00076 from test_rospy.srv import StringStringResponse
00077 from std_msgs.msg import String
00078 return StringStringResponse(String(req.str.data + req.str2.val))
00079
00080 def handle_constants_wrapped(req):
00081 cmr = ConstantsMultiplexRequest
00082 Resp = ConstantsMultiplexResponse
00083 if req.selection == cmr.SELECT_X:
00084
00085 return Resp(req.selection,
00086 cmr.BYTE_X, cmr.INT32_X, cmr.UINT32_X, cmr.FLOAT32_X)
00087 elif req.selection == cmr.SELECT_Y:
00088 return Resp(req.selection,
00089 cmr.BYTE_Y, cmr.INT32_Y, cmr.UINT32_Y, cmr.FLOAT32_Y)
00090 elif req.selection == cmr.SELECT_Z:
00091 return Resp(req.selection,
00092 cmr.BYTE_Z, cmr.INT32_Z, cmr.UINT32_Z, cmr.FLOAT32_Z)
00093 else:
00094 print "test failed, req.selection not in (X,Y,Z)", req.selection
00095
00096 def handle_constants_naked(req):
00097 cmr = ConstantsMultiplexRequest
00098 if req.selection == cmr.SELECT_X:
00099 return req.selection,cmr.BYTE_X, cmr.INT32_X, cmr.UINT32_X, cmr.FLOAT32_X
00100 elif req.selection == cmr.SELECT_Y:
00101 return req.selection, cmr.BYTE_Y, cmr.INT32_Y, cmr.UINT32_Y, cmr.FLOAT32_Y
00102 elif req.selection == cmr.SELECT_Z:
00103 return req.selection, cmr.BYTE_Z, cmr.INT32_Z, cmr.UINT32_Z, cmr.FLOAT32_Z
00104 else:
00105 print "test failed, req.selection not in (X,Y,Z)", req.selection
00106
00107 def services():
00108 from test_ros.srv import AddTwoInts
00109 rospy.init_node(NAME)
00110 s1 = rospy.Service(CONSTANTS_SERVICE_NAKED, ConstantsMultiplex, handle_constants_naked)
00111 s2 = rospy.Service(CONSTANTS_SERVICE_WRAPPED, ConstantsMultiplex, handle_constants_wrapped)
00112
00113 s3 = rospy.Service(ADD_TWO_INTS_SERVICE_NAKED, AddTwoInts, add_two_ints_naked)
00114 s4 = rospy.Service(ADD_TWO_INTS_SERVICE_WRAPPED, AddTwoInts, add_two_ints_wrapped)
00115
00116 s5 = rospy.Service(STRING_CAT_SERVICE_NAKED, StringString, string_cat_naked)
00117 s6 = rospy.Service(STRING_CAT_SERVICE_WRAPPED, StringString, string_cat_wrapped)
00118
00119 rospy.spin()
00120
00121 class TestBasicServicesClient(unittest.TestCase):
00122
00123
00124 def _test(self, name, srv, req):
00125 rospy.wait_for_service(name, WAIT_TIMEOUT)
00126 s = rospy.ServiceProxy(name, srv)
00127 resp = s.call(req)
00128 self.assert_(resp is not None)
00129 return resp
00130
00131
00132 def _test_req_naked(self, name, srv, args):
00133 rospy.wait_for_service(name, WAIT_TIMEOUT)
00134 s = rospy.ServiceProxy(name, srv)
00135 resp = s.call(*args)
00136 self.assert_(resp is not None)
00137 return resp
00138
00139
00140 def _test_req_kwds(self, name, srv, kwds):
00141 rospy.wait_for_service(name, WAIT_TIMEOUT)
00142 s = rospy.ServiceProxy(name, srv)
00143 resp = s.call(**kwds)
00144 self.assert_(resp is not None)
00145 return resp
00146
00147 def test_calltype_mismatch(self):
00148 try:
00149 s = rospy.ServiceProxy(CONSTANTS_SERVICE_WRAPPED, ConstantsMultiplex)
00150 s.call(EmptySrvRequest())
00151 self.fail("rospy failed to raise TypeError when request type does not match service type")
00152 except TypeError:
00153 pass
00154
00155 def test_type_mismatch(self):
00156 try:
00157 self._test(CONSTANTS_SERVICE_WRAPPED, EmptySrv, EmptySrvRequest())
00158 self.fail("Service failed to throw exception on type mismatch [sent EmptySrvRequest to ConstantsMultiplex service")
00159 except rospy.ServiceException:
00160 pass
00161
00162 def test_add_two_ints(self):
00163
00164
00165 from test_ros.srv import AddTwoInts, AddTwoIntsRequest
00166 Cls = AddTwoInts
00167 Req = AddTwoIntsRequest
00168
00169 for name in [ADD_TWO_INTS_SERVICE_NAKED, ADD_TWO_INTS_SERVICE_WRAPPED]:
00170 resp_req = self._test(name, Cls, Req(1, 2))
00171 resp_req_naked = self._test_req_naked(name, Cls, (1, 2))
00172 resp_req_kwds = self._test_req_kwds(name, Cls, {'a': 3})
00173 for resp in [resp_req, resp_req_naked, resp_req_kwds]:
00174 self.assertEquals(3, resp.sum)
00175
00176 def test_String_String(self):
00177 from std_msgs.msg import String
00178 from test_rospy.srv import StringString, StringStringRequest
00179 from test_rospy.msg import Val
00180 Cls = StringString
00181 Req = StringStringRequest
00182
00183 for name in [STRING_CAT_SERVICE_NAKED, STRING_CAT_SERVICE_WRAPPED]:
00184 resp_req = self._test(name, Cls, Req(String('FOO'), Val('bar')))
00185 resp_req_naked = self._test_req_naked(name, Cls, (String('FOO'), Val('bar'),))
00186 resp_req_kwds = self._test_req_kwds(name, Cls, {'str': String('FOO'), 'str2': Val('bar')})
00187 for resp in [resp_req, resp_req_naked, resp_req_kwds]:
00188 self.assertEquals('FOObar', resp.str.data)
00189
00190 def test_constants(self):
00191 Cls = ConstantsMultiplex
00192 Req = ConstantsMultiplexRequest
00193
00194 for name in [CONSTANTS_SERVICE_NAKED, CONSTANTS_SERVICE_WRAPPED]:
00195
00196 resp_req = self._test(name, Cls, Req(Req.SELECT_X))
00197 resp_req_naked = self._test_req_naked(name, Cls, (Req.SELECT_X,))
00198 resp_req_kwds = self._test_req_kwds(name, Cls, {'selection': Req.SELECT_X})
00199
00200 for resp in [resp_req, resp_req_naked, resp_req_kwds]:
00201 self.assertEquals(ConstantsMultiplexResponse.CONFIRM_X,
00202 resp.select_confirm)
00203 self.assertEquals(Req.BYTE_X, resp.ret_byte)
00204 self.assertEquals(Req.INT32_X, resp.ret_int32)
00205 self.assertEquals(Req.UINT32_X, resp.ret_uint32)
00206 self.assert_(math.fabs(Req.FLOAT32_X - resp.ret_float32) < 0.001)
00207
00208 resp = self._test(name, Cls,
00209 ConstantsMultiplexRequest(Req.SELECT_Y))
00210 self.assertEquals(ConstantsMultiplexResponse.CONFIRM_Y,
00211 resp.select_confirm)
00212 self.assertEquals(Req.BYTE_Y, resp.ret_byte)
00213 self.assertEquals(Req.INT32_Y, resp.ret_int32)
00214 self.assertEquals(Req.UINT32_Y, resp.ret_uint32)
00215 self.assert_(math.fabs(Req.FLOAT32_Y - resp.ret_float32) < 0.001)
00216
00217 resp = self._test(name, Cls,
00218 ConstantsMultiplexRequest(Req.SELECT_Z))
00219 self.assertEquals(ConstantsMultiplexResponse.CONFIRM_Z,
00220 resp.select_confirm)
00221 self.assertEquals(Req.BYTE_Z, resp.ret_byte)
00222 self.assertEquals(Req.INT32_Z, resp.ret_int32)
00223 self.assertEquals(Req.UINT32_Z, resp.ret_uint32)
00224 self.assert_(math.fabs(Req.FLOAT32_Z - resp.ret_float32) < 0.001)
00225
00226 if __name__ == '__main__':
00227 if '--service' in sys.argv:
00228 services()
00229 else:
00230 rostest.run(PKG, 'rospy_basic_services', TestBasicServicesClient, sys.argv)