00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 PKG = 'test_rospy'
00040 NAME = 'basic_services'
00041 import roslib; roslib.load_manifest(PKG)
00042
00043 import sys
00044 import time
00045 import math
00046 import unittest
00047
00048 import rospy
00049 import rostest
00050 from test_rospy.srv import *
00051
00052 CONSTANTS_SERVICE_NAKED = 'constants_service_naked'
00053 CONSTANTS_SERVICE_WRAPPED = 'constants_service_wrapped'
00054
00055 ADD_TWO_INTS_SERVICE_NAKED = 'a2i_naked'
00056 ADD_TWO_INTS_SERVICE_WRAPPED = 'a2i_wrapped'
00057
00058 STRING_CAT_SERVICE_NAKED = 'string_lower_naked'
00059 STRING_CAT_SERVICE_WRAPPED = 'string_lower_wrapped'
00060
00061
00062
00063 STRING_SERVICE = 'string_service'
00064 EMBEDDED_MSG_SERVICE = 'embedded_msg_service'
00065
00066 WAIT_TIMEOUT = 10.0
00067
00068 def add_two_ints_wrapped(req):
00069 from test_ros.srv import AddTwoIntsResponse
00070 return AddTwoIntsResponse(req.a + req.b)
00071 def add_two_ints_naked(req):
00072 return req.a + req.b
00073
00074 def string_cat_naked(req):
00075 from std_msgs.msg import String
00076 return String(req.str.data + req.str2.val)
00077 def string_cat_wrapped(req):
00078 from test_rospy.srv import StringStringResponse
00079 from std_msgs.msg import String
00080 return StringStringResponse(String(req.str.data + req.str2.val))
00081
00082 def handle_constants_wrapped(req):
00083 cmr = ConstantsMultiplexRequest
00084 Resp = ConstantsMultiplexResponse
00085 if req.selection == cmr.SELECT_X:
00086
00087 return Resp(req.selection,
00088 cmr.BYTE_X, cmr.INT32_X, cmr.UINT32_X, cmr.FLOAT32_X)
00089 elif req.selection == cmr.SELECT_Y:
00090 return Resp(req.selection,
00091 cmr.BYTE_Y, cmr.INT32_Y, cmr.UINT32_Y, cmr.FLOAT32_Y)
00092 elif req.selection == cmr.SELECT_Z:
00093 return Resp(req.selection,
00094 cmr.BYTE_Z, cmr.INT32_Z, cmr.UINT32_Z, cmr.FLOAT32_Z)
00095 else:
00096 print "test failed, req.selection not in (X,Y,Z)", req.selection
00097
00098 def handle_constants_naked(req):
00099 cmr = ConstantsMultiplexRequest
00100 if req.selection == cmr.SELECT_X:
00101 return req.selection,cmr.BYTE_X, cmr.INT32_X, cmr.UINT32_X, cmr.FLOAT32_X
00102 elif req.selection == cmr.SELECT_Y:
00103 return req.selection, cmr.BYTE_Y, cmr.INT32_Y, cmr.UINT32_Y, cmr.FLOAT32_Y
00104 elif req.selection == cmr.SELECT_Z:
00105 return req.selection, cmr.BYTE_Z, cmr.INT32_Z, cmr.UINT32_Z, cmr.FLOAT32_Z
00106 else:
00107 print "test failed, req.selection not in (X,Y,Z)", req.selection
00108
00109 def services():
00110 from test_ros.srv import AddTwoInts
00111 rospy.init_node(NAME)
00112 s1 = rospy.Service(CONSTANTS_SERVICE_NAKED, ConstantsMultiplex, handle_constants_naked)
00113 s2 = rospy.Service(CONSTANTS_SERVICE_WRAPPED, ConstantsMultiplex, handle_constants_wrapped)
00114
00115 s3 = rospy.Service(ADD_TWO_INTS_SERVICE_NAKED, AddTwoInts, add_two_ints_naked)
00116 s4 = rospy.Service(ADD_TWO_INTS_SERVICE_WRAPPED, AddTwoInts, add_two_ints_wrapped)
00117
00118 s5 = rospy.Service(STRING_CAT_SERVICE_NAKED, StringString, string_cat_naked)
00119 s6 = rospy.Service(STRING_CAT_SERVICE_WRAPPED, StringString, string_cat_wrapped)
00120
00121 rospy.spin()
00122
00123 class TestBasicServicesClient(unittest.TestCase):
00124
00125
00126 def _test(self, name, srv, req):
00127 rospy.wait_for_service(name, WAIT_TIMEOUT)
00128 s = rospy.ServiceProxy(name, srv)
00129 resp = s.call(req)
00130 self.assert_(resp is not None)
00131 return resp
00132
00133
00134 def _test_req_naked(self, name, srv, args):
00135 rospy.wait_for_service(name, WAIT_TIMEOUT)
00136 s = rospy.ServiceProxy(name, srv)
00137 resp = s.call(*args)
00138 self.assert_(resp is not None)
00139 return resp
00140
00141
00142 def _test_req_kwds(self, name, srv, kwds):
00143 rospy.wait_for_service(name, WAIT_TIMEOUT)
00144 s = rospy.ServiceProxy(name, srv)
00145 resp = s.call(**kwds)
00146 self.assert_(resp is not None)
00147 return resp
00148
00149 def test_calltype_mismatch(self):
00150 try:
00151 s = rospy.ServiceProxy(CONSTANTS_SERVICE_WRAPPED, ConstantsMultiplex)
00152 s.call(EmptySrvRequest())
00153 self.fail("rospy failed to raise TypeError when request type does not match service type")
00154 except TypeError:
00155 pass
00156
00157 def test_type_mismatch(self):
00158 try:
00159 self._test(CONSTANTS_SERVICE_WRAPPED, EmptySrv, EmptySrvRequest())
00160 self.fail("Service failed to throw exception on type mismatch [sent EmptySrvRequest to ConstantsMultiplex service")
00161 except rospy.ServiceException:
00162 pass
00163
00164 def test_add_two_ints(self):
00165
00166
00167 from test_ros.srv import AddTwoInts, AddTwoIntsRequest
00168 Cls = AddTwoInts
00169 Req = AddTwoIntsRequest
00170
00171 for name in [ADD_TWO_INTS_SERVICE_NAKED, ADD_TWO_INTS_SERVICE_WRAPPED]:
00172 resp_req = self._test(name, Cls, Req(1, 2))
00173 resp_req_naked = self._test_req_naked(name, Cls, (1, 2))
00174 resp_req_kwds = self._test_req_kwds(name, Cls, {'a': 3})
00175 for resp in [resp_req, resp_req_naked, resp_req_kwds]:
00176 self.assertEquals(3, resp.sum)
00177
00178 def test_String_String(self):
00179 from std_msgs.msg import String
00180 from test_rospy.srv import StringString, StringStringRequest
00181 from test_rospy.msg import Val
00182 Cls = StringString
00183 Req = StringStringRequest
00184
00185 for name in [STRING_CAT_SERVICE_NAKED, STRING_CAT_SERVICE_WRAPPED]:
00186 resp_req = self._test(name, Cls, Req(String('FOO'), Val('bar')))
00187 resp_req_naked = self._test_req_naked(name, Cls, (String('FOO'), Val('bar'),))
00188 resp_req_kwds = self._test_req_kwds(name, Cls, {'str': String('FOO'), 'str2': Val('bar')})
00189 for resp in [resp_req, resp_req_naked, resp_req_kwds]:
00190 self.assertEquals('FOObar', resp.str.data)
00191
00192 def test_constants(self):
00193 Cls = ConstantsMultiplex
00194 Req = ConstantsMultiplexRequest
00195
00196 for name in [CONSTANTS_SERVICE_NAKED, CONSTANTS_SERVICE_WRAPPED]:
00197
00198 resp_req = self._test(name, Cls, Req(Req.SELECT_X))
00199 resp_req_naked = self._test_req_naked(name, Cls, (Req.SELECT_X,))
00200 resp_req_kwds = self._test_req_kwds(name, Cls, {'selection': Req.SELECT_X})
00201
00202 for resp in [resp_req, resp_req_naked, resp_req_kwds]:
00203 self.assertEquals(ConstantsMultiplexResponse.CONFIRM_X,
00204 resp.select_confirm)
00205 self.assertEquals(Req.BYTE_X, resp.ret_byte)
00206 self.assertEquals(Req.INT32_X, resp.ret_int32)
00207 self.assertEquals(Req.UINT32_X, resp.ret_uint32)
00208 self.assert_(math.fabs(Req.FLOAT32_X - resp.ret_float32) < 0.001)
00209
00210 resp = self._test(name, Cls,
00211 ConstantsMultiplexRequest(Req.SELECT_Y))
00212 self.assertEquals(ConstantsMultiplexResponse.CONFIRM_Y,
00213 resp.select_confirm)
00214 self.assertEquals(Req.BYTE_Y, resp.ret_byte)
00215 self.assertEquals(Req.INT32_Y, resp.ret_int32)
00216 self.assertEquals(Req.UINT32_Y, resp.ret_uint32)
00217 self.assert_(math.fabs(Req.FLOAT32_Y - resp.ret_float32) < 0.001)
00218
00219 resp = self._test(name, Cls,
00220 ConstantsMultiplexRequest(Req.SELECT_Z))
00221 self.assertEquals(ConstantsMultiplexResponse.CONFIRM_Z,
00222 resp.select_confirm)
00223 self.assertEquals(Req.BYTE_Z, resp.ret_byte)
00224 self.assertEquals(Req.INT32_Z, resp.ret_int32)
00225 self.assertEquals(Req.UINT32_Z, resp.ret_uint32)
00226 self.assert_(math.fabs(Req.FLOAT32_Z - resp.ret_float32) < 0.001)
00227
00228 if __name__ == '__main__':
00229 if '--service' in sys.argv:
00230 services()
00231 else:
00232 rostest.run(PKG, 'rospy_basic_services', TestBasicServicesClient, sys.argv)