40 NAME =
'basic_services' 51 CONSTANTS_SERVICE_NAKED =
'constants_service_naked' 52 CONSTANTS_SERVICE_WRAPPED =
'constants_service_wrapped' 54 ADD_TWO_INTS_SERVICE_NAKED =
'a2i_naked' 55 ADD_TWO_INTS_SERVICE_WRAPPED =
'a2i_wrapped' 57 STRING_CAT_SERVICE_NAKED =
'string_lower_naked' 58 STRING_CAT_SERVICE_WRAPPED =
'string_lower_wrapped' 60 FAULTY_SERVICE =
'faulty_service' 61 FAULTY_SERVICE_RESULT =
'faulty_service_result' 65 STRING_SERVICE =
'string_service' 66 EMBEDDED_MSG_SERVICE =
'embedded_msg_service' 71 from test_rosmaster.srv
import AddTwoIntsResponse
72 return AddTwoIntsResponse(req.a + req.b)
78 return String(req.str.data + req.str2.val)
82 return StringStringResponse(String(req.str.data + req.str2.val))
85 cmr = ConstantsMultiplexRequest
86 Resp = ConstantsMultiplexResponse
87 if req.selection == cmr.SELECT_X:
89 return Resp(req.selection,
90 cmr.BYTE_X, cmr.INT32_X, cmr.UINT32_X, cmr.FLOAT32_X)
91 elif req.selection == cmr.SELECT_Y:
92 return Resp(req.selection,
93 cmr.BYTE_Y, cmr.INT32_Y, cmr.UINT32_Y, cmr.FLOAT32_Y)
94 elif req.selection == cmr.SELECT_Z:
95 return Resp(req.selection,
96 cmr.BYTE_Z, cmr.INT32_Z, cmr.UINT32_Z, cmr.FLOAT32_Z)
98 print(
"test failed, req.selection not in (X,Y,Z)", req.selection)
101 cmr = ConstantsMultiplexRequest
102 if req.selection == cmr.SELECT_X:
103 return req.selection,cmr.BYTE_X, cmr.INT32_X, cmr.UINT32_X, cmr.FLOAT32_X
104 elif req.selection == cmr.SELECT_Y:
105 return req.selection, cmr.BYTE_Y, cmr.INT32_Y, cmr.UINT32_Y, cmr.FLOAT32_Y
106 elif req.selection == cmr.SELECT_Z:
107 return req.selection, cmr.BYTE_Z, cmr.INT32_Z, cmr.UINT32_Z, cmr.FLOAT32_Z
109 print(
"test failed, req.selection not in (X,Y,Z)", req.selection)
115 class FaultyHandler(object):
126 resp = EmptyReqSrvResponse()
127 resp.fake_secret = 1
if self.
test_call else 0
132 from test_rosmaster.srv
import AddTwoInts
133 rospy.init_node(NAME)
134 s1 = rospy.Service(CONSTANTS_SERVICE_NAKED, ConstantsMultiplex, handle_constants_naked)
135 s2 = rospy.Service(CONSTANTS_SERVICE_WRAPPED, ConstantsMultiplex, handle_constants_wrapped)
137 s3 = rospy.Service(ADD_TWO_INTS_SERVICE_NAKED, AddTwoInts, add_two_ints_naked)
138 s4 = rospy.Service(ADD_TWO_INTS_SERVICE_WRAPPED, AddTwoInts, add_two_ints_wrapped)
140 s5 = rospy.Service(STRING_CAT_SERVICE_NAKED, StringString, string_cat_naked)
141 s6 = rospy.Service(STRING_CAT_SERVICE_WRAPPED, StringString, string_cat_wrapped)
144 s7 = rospy.Service(FAULTY_SERVICE, EmptySrv, faulty_handler.call_handler, error_handler=faulty_handler.custom_error_handler)
145 s8 = rospy.Service(FAULTY_SERVICE_RESULT, EmptyReqSrv, faulty_handler.result_handler)
153 rospy.wait_for_service(name, WAIT_TIMEOUT)
154 s = rospy.ServiceProxy(name, srv)
156 self.assert_(resp
is not None)
161 rospy.wait_for_service(name, WAIT_TIMEOUT)
162 s = rospy.ServiceProxy(name, srv)
164 self.assert_(resp
is not None)
169 rospy.wait_for_service(name, WAIT_TIMEOUT)
170 s = rospy.ServiceProxy(name, srv)
171 resp = s.call(**kwds)
172 self.assert_(resp
is not None)
177 s = rospy.ServiceProxy(CONSTANTS_SERVICE_WRAPPED, ConstantsMultiplex)
178 s.call(EmptySrvRequest())
179 self.fail(
"rospy failed to raise TypeError when request type does not match service type")
185 self.
_test(CONSTANTS_SERVICE_WRAPPED, EmptySrv, EmptySrvRequest())
186 self.fail(
"Service failed to throw exception on type mismatch [sent EmptySrvRequest to ConstantsMultiplex service")
187 except rospy.ServiceException:
193 from test_rosmaster.srv
import AddTwoInts, AddTwoIntsRequest
195 Req = AddTwoIntsRequest
197 for name
in [ADD_TWO_INTS_SERVICE_NAKED, ADD_TWO_INTS_SERVICE_WRAPPED]:
198 resp_req = self.
_test(name, Cls, Req(1, 2))
201 for resp
in [resp_req, resp_req_naked, resp_req_kwds]:
202 self.assertEquals(3, resp.sum)
207 from test_rospy.msg
import Val
209 Req = StringStringRequest
211 for name
in [STRING_CAT_SERVICE_NAKED, STRING_CAT_SERVICE_WRAPPED]:
212 resp_req = self.
_test(name, Cls, Req(String(
'FOO'), Val(
'bar')))
213 resp_req_naked = self.
_test_req_naked(name, Cls, (String(
'FOO'), Val(
'bar'),))
214 resp_req_kwds = self.
_test_req_kwds(name, Cls, {
'str': String(
'FOO'),
'str2': Val(
'bar')})
215 for resp
in [resp_req, resp_req_naked, resp_req_kwds]:
216 self.assertEquals(
'FOObar', resp.str.data)
221 from test_rospy.msg
import Val
223 Req = StringStringRequest
225 for name
in [STRING_CAT_SERVICE_NAKED, STRING_CAT_SERVICE_WRAPPED]:
226 resp_req = self.
_test(name, Cls, Req(String(
u'ロボット'), Val(
u'机器人')))
227 resp_req_naked = self.
_test_req_naked(name, Cls, (String(
u'ロボット'), Val(
u'机器人'),))
228 resp_req_kwds = self.
_test_req_kwds(name, Cls, {
'str': String(
u'ロボット'),
'str2': Val(
u'机器人')})
229 for resp
in [resp_req, resp_req_naked, resp_req_kwds]:
230 self.assertEquals(
'ロボット机器人', resp.str.data)
233 Cls = ConstantsMultiplex
234 Req = ConstantsMultiplexRequest
236 for name
in [CONSTANTS_SERVICE_NAKED, CONSTANTS_SERVICE_WRAPPED]:
238 resp_req = self.
_test(name, Cls, Req(Req.SELECT_X))
240 resp_req_kwds = self.
_test_req_kwds(name, Cls, {
'selection': Req.SELECT_X})
242 for resp
in [resp_req, resp_req_naked, resp_req_kwds]:
243 self.assertEquals(ConstantsMultiplexResponse.CONFIRM_X,
245 self.assertEquals(Req.BYTE_X, resp.ret_byte)
246 self.assertEquals(Req.INT32_X, resp.ret_int32)
247 self.assertEquals(Req.UINT32_X, resp.ret_uint32)
248 self.assert_(math.fabs(Req.FLOAT32_X - resp.ret_float32) < 0.001)
250 resp = self.
_test(name, Cls,
251 ConstantsMultiplexRequest(Req.SELECT_Y))
252 self.assertEquals(ConstantsMultiplexResponse.CONFIRM_Y,
254 self.assertEquals(Req.BYTE_Y, resp.ret_byte)
255 self.assertEquals(Req.INT32_Y, resp.ret_int32)
256 self.assertEquals(Req.UINT32_Y, resp.ret_uint32)
257 self.assert_(math.fabs(Req.FLOAT32_Y - resp.ret_float32) < 0.001)
259 resp = self.
_test(name, Cls,
260 ConstantsMultiplexRequest(Req.SELECT_Z))
261 self.assertEquals(ConstantsMultiplexResponse.CONFIRM_Z,
263 self.assertEquals(Req.BYTE_Z, resp.ret_byte)
264 self.assertEquals(Req.INT32_Z, resp.ret_int32)
265 self.assertEquals(Req.UINT32_Z, resp.ret_uint32)
266 self.assert_(math.fabs(Req.FLOAT32_Z - resp.ret_float32) < 0.001)
269 rospy.wait_for_service(FAULTY_SERVICE, WAIT_TIMEOUT)
270 sproxy = rospy.ServiceProxy(FAULTY_SERVICE, EmptySrv)
272 rospy.wait_for_service(FAULTY_SERVICE_RESULT, WAIT_TIMEOUT)
273 sproxy_result = rospy.ServiceProxy(FAULTY_SERVICE_RESULT, EmptyReqSrv)
275 resp = sproxy_result.call(EmptyReqSrvRequest())
276 self.assertEquals(resp.fake_secret, 0)
278 resp = sproxy.call(EmptySrvRequest())
280 except rospy.ServiceException:
282 resp = sproxy_result.call(EmptyReqSrvRequest())
283 self.assertEquals(resp.fake_secret, 1)
285 if __name__ ==
'__main__':
286 if '--service' in sys.argv:
289 rostest.run(PKG,
'rospy_basic_services', TestBasicServicesClient, sys.argv)
def test_String_String_unicode(self)
def handle_constants_wrapped(req)
def test_type_mismatch(self)
def _test_req_kwds(self, name, srv, kwds)
def handle_constants_naked(req)
def result_handler(self, req)
def test_add_two_ints(self)
def add_two_ints_wrapped(req)
def test_calltype_mismatch(self)
def call_handler(self, req)
def add_two_ints_naked(req)
def test_String_String(self)
def custom_error_handler(self, e, exc_type, exc_value, tb)
def string_cat_wrapped(req)
def string_cat_naked(req)
def _test(self, name, srv, req)
def test_faulty_service(self)
def _test_req_naked(self, name, srv, args)