test_basic_services.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # Software License Agreement (BSD License)
4 #
5 # Copyright (c) 2008, Willow Garage, Inc.
6 # All rights reserved.
7 #
8 # Redistribution and use in source and binary forms, with or without
9 # modification, are permitted provided that the following conditions
10 # are met:
11 #
12 # * Redistributions of source code must retain the above copyright
13 # notice, this list of conditions and the following disclaimer.
14 # * Redistributions in binary form must reproduce the above
15 # copyright notice, this list of conditions and the following
16 # disclaimer in the documentation and/or other materials provided
17 # with the distribution.
18 # * Neither the name of Willow Garage, Inc. nor the names of its
19 # contributors may be used to endorse or promote products derived
20 # from this software without specific prior written permission.
21 #
22 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
26 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
28 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
30 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
31 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
32 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
33 # POSSIBILITY OF SUCH DAMAGE.
34 
35 
36 ## Integration test for empty services to test serializers
37 ## and transport
38 
39 PKG = 'test_rospy'
40 NAME = 'basic_services'
41 
42 import sys
43 import time
44 import math
45 import unittest
46 
47 import rospy
48 import rostest
49 from test_rospy.srv import *
50 
51 CONSTANTS_SERVICE_NAKED = 'constants_service_naked'
52 CONSTANTS_SERVICE_WRAPPED = 'constants_service_wrapped'
53 
54 ADD_TWO_INTS_SERVICE_NAKED = 'a2i_naked'
55 ADD_TWO_INTS_SERVICE_WRAPPED = 'a2i_wrapped'
56 
57 STRING_CAT_SERVICE_NAKED = 'string_lower_naked'
58 STRING_CAT_SERVICE_WRAPPED = 'string_lower_wrapped'
59 
60 FAULTY_SERVICE = 'faulty_service'
61 FAULTY_SERVICE_RESULT = 'faulty_service_result'
62 
63 #TODO:
64 
65 STRING_SERVICE = 'string_service'
66 EMBEDDED_MSG_SERVICE = 'embedded_msg_service'
67 
68 WAIT_TIMEOUT = 10.0 #s
69 
71  from test_rosmaster.srv import AddTwoIntsResponse
72  return AddTwoIntsResponse(req.a + req.b)
74  return req.a + req.b
75 
77  from std_msgs.msg import String
78  return String(req.str.data + req.str2.val)
80  from test_rospy.srv import StringStringResponse
81  from std_msgs.msg import String
82  return StringStringResponse(String(req.str.data + req.str2.val))
83 
85  cmr = ConstantsMultiplexRequest
86  Resp = ConstantsMultiplexResponse
87  if req.selection == cmr.SELECT_X:
88  # test w/o wrapper
89  return Resp(req.selection,
90  cmr.BYTE_X, cmr.INT32_X, cmr.UINT32_X, cmr.FLOAT32_X)
91  elif req.selection == cmr.SELECT_Y:
92  return Resp(req.selection,
93  cmr.BYTE_Y, cmr.INT32_Y, cmr.UINT32_Y, cmr.FLOAT32_Y)
94  elif req.selection == cmr.SELECT_Z:
95  return Resp(req.selection,
96  cmr.BYTE_Z, cmr.INT32_Z, cmr.UINT32_Z, cmr.FLOAT32_Z)
97  else:
98  print("test failed, req.selection not in (X,Y,Z)", req.selection)
99 
101  cmr = ConstantsMultiplexRequest
102  if req.selection == cmr.SELECT_X:
103  return req.selection,cmr.BYTE_X, cmr.INT32_X, cmr.UINT32_X, cmr.FLOAT32_X
104  elif req.selection == cmr.SELECT_Y:
105  return req.selection, cmr.BYTE_Y, cmr.INT32_Y, cmr.UINT32_Y, cmr.FLOAT32_Y
106  elif req.selection == cmr.SELECT_Z:
107  return req.selection, cmr.BYTE_Z, cmr.INT32_Z, cmr.UINT32_Z, cmr.FLOAT32_Z
108  else:
109  print("test failed, req.selection not in (X,Y,Z)", req.selection)
110 
111 class UnexpectedException(Exception):
112  pass
113 
114 
115 class FaultyHandler(object):
116  def __init__(self):
117  self.test_call = False
118 
119  def custom_error_handler(self, e, exc_type, exc_value, tb):
120  self.test_call = True
121 
122  def call_handler(self, req):
123  raise UnexpectedException('Call raised an exception')
124 
125  def result_handler(self, req):
126  resp = EmptyReqSrvResponse()
127  resp.fake_secret = 1 if self.test_call else 0
128  return resp
129 
130 
131 def services():
132  from test_rosmaster.srv import AddTwoInts
133  rospy.init_node(NAME)
134  s1 = rospy.Service(CONSTANTS_SERVICE_NAKED, ConstantsMultiplex, handle_constants_naked)
135  s2 = rospy.Service(CONSTANTS_SERVICE_WRAPPED, ConstantsMultiplex, handle_constants_wrapped)
136 
137  s3 = rospy.Service(ADD_TWO_INTS_SERVICE_NAKED, AddTwoInts, add_two_ints_naked)
138  s4 = rospy.Service(ADD_TWO_INTS_SERVICE_WRAPPED, AddTwoInts, add_two_ints_wrapped)
139 
140  s5 = rospy.Service(STRING_CAT_SERVICE_NAKED, StringString, string_cat_naked)
141  s6 = rospy.Service(STRING_CAT_SERVICE_WRAPPED, StringString, string_cat_wrapped)
142 
143  faulty_handler = FaultyHandler()
144  s7 = rospy.Service(FAULTY_SERVICE, EmptySrv, faulty_handler.call_handler, error_handler=faulty_handler.custom_error_handler)
145  s8 = rospy.Service(FAULTY_SERVICE_RESULT, EmptyReqSrv, faulty_handler.result_handler)
146 
147  rospy.spin()
148 
149 class TestBasicServicesClient(unittest.TestCase):
150 
151  # call with request instance style
152  def _test(self, name, srv, req):
153  rospy.wait_for_service(name, WAIT_TIMEOUT)
154  s = rospy.ServiceProxy(name, srv)
155  resp = s.call(req)
156  self.assert_(resp is not None)
157  return resp
158 
159  # call with args style
160  def _test_req_naked(self, name, srv, args):
161  rospy.wait_for_service(name, WAIT_TIMEOUT)
162  s = rospy.ServiceProxy(name, srv)
163  resp = s.call(*args)
164  self.assert_(resp is not None)
165  return resp
166 
167  # call with keyword style
168  def _test_req_kwds(self, name, srv, kwds):
169  rospy.wait_for_service(name, WAIT_TIMEOUT)
170  s = rospy.ServiceProxy(name, srv)
171  resp = s.call(**kwds)
172  self.assert_(resp is not None)
173  return resp
174 
176  try:
177  s = rospy.ServiceProxy(CONSTANTS_SERVICE_WRAPPED, ConstantsMultiplex)
178  s.call(EmptySrvRequest())
179  self.fail("rospy failed to raise TypeError when request type does not match service type")
180  except TypeError:
181  pass
182 
184  try:
185  self._test(CONSTANTS_SERVICE_WRAPPED, EmptySrv, EmptySrvRequest())
186  self.fail("Service failed to throw exception on type mismatch [sent EmptySrvRequest to ConstantsMultiplex service")
187  except rospy.ServiceException:
188  pass
189 
190  def test_add_two_ints(self):
191  # add two ints checks single, integer return value, which is
192  # an interesting in the naked case
193  from test_rosmaster.srv import AddTwoInts, AddTwoIntsRequest
194  Cls = AddTwoInts
195  Req = AddTwoIntsRequest
196 
197  for name in [ADD_TWO_INTS_SERVICE_NAKED, ADD_TWO_INTS_SERVICE_WRAPPED]:
198  resp_req = self._test(name, Cls, Req(1, 2))
199  resp_req_naked = self._test_req_naked(name, Cls, (1, 2))
200  resp_req_kwds = self._test_req_kwds(name, Cls, {'a': 3})
201  for resp in [resp_req, resp_req_naked, resp_req_kwds]:
202  self.assertEquals(3, resp.sum)
203 
205  from std_msgs.msg import String
206  from test_rospy.srv import StringString, StringStringRequest
207  from test_rospy.msg import Val
208  Cls = StringString
209  Req = StringStringRequest
210 
211  for name in [STRING_CAT_SERVICE_NAKED, STRING_CAT_SERVICE_WRAPPED]:
212  resp_req = self._test(name, Cls, Req(String('FOO'), Val('bar')))
213  resp_req_naked = self._test_req_naked(name, Cls, (String('FOO'), Val('bar'),))
214  resp_req_kwds = self._test_req_kwds(name, Cls, {'str': String('FOO'), 'str2': Val('bar')})
215  for resp in [resp_req, resp_req_naked, resp_req_kwds]:
216  self.assertEquals('FOObar', resp.str.data)
217 
219  from std_msgs.msg import String
220  from test_rospy.srv import StringString, StringStringRequest
221  from test_rospy.msg import Val
222  Cls = StringString
223  Req = StringStringRequest
224 
225  for name in [STRING_CAT_SERVICE_NAKED, STRING_CAT_SERVICE_WRAPPED]:
226  resp_req = self._test(name, Cls, Req(String(u'ロボット'), Val(u'机器人')))
227  resp_req_naked = self._test_req_naked(name, Cls, (String(u'ロボット'), Val(u'机器人'),))
228  resp_req_kwds = self._test_req_kwds(name, Cls, {'str': String(u'ロボット'), 'str2': Val(u'机器人')})
229  for resp in [resp_req, resp_req_naked, resp_req_kwds]:
230  self.assertEquals('ロボット机器人', resp.str.data) # if you send in unicode, you'll receive in str
231 
232  def test_constants(self):
233  Cls = ConstantsMultiplex
234  Req = ConstantsMultiplexRequest
235 
236  for name in [CONSTANTS_SERVICE_NAKED, CONSTANTS_SERVICE_WRAPPED]:
237  # test all three call forms
238  resp_req = self._test(name, Cls, Req(Req.SELECT_X))
239  resp_req_naked = self._test_req_naked(name, Cls, (Req.SELECT_X,))
240  resp_req_kwds = self._test_req_kwds(name, Cls, {'selection': Req.SELECT_X})
241 
242  for resp in [resp_req, resp_req_naked, resp_req_kwds]:
243  self.assertEquals(ConstantsMultiplexResponse.CONFIRM_X,
244  resp.select_confirm)
245  self.assertEquals(Req.BYTE_X, resp.ret_byte)
246  self.assertEquals(Req.INT32_X, resp.ret_int32)
247  self.assertEquals(Req.UINT32_X, resp.ret_uint32)
248  self.assert_(math.fabs(Req.FLOAT32_X - resp.ret_float32) < 0.001)
249 
250  resp = self._test(name, Cls,
251  ConstantsMultiplexRequest(Req.SELECT_Y))
252  self.assertEquals(ConstantsMultiplexResponse.CONFIRM_Y,
253  resp.select_confirm)
254  self.assertEquals(Req.BYTE_Y, resp.ret_byte)
255  self.assertEquals(Req.INT32_Y, resp.ret_int32)
256  self.assertEquals(Req.UINT32_Y, resp.ret_uint32)
257  self.assert_(math.fabs(Req.FLOAT32_Y - resp.ret_float32) < 0.001)
258 
259  resp = self._test(name, Cls,
260  ConstantsMultiplexRequest(Req.SELECT_Z))
261  self.assertEquals(ConstantsMultiplexResponse.CONFIRM_Z,
262  resp.select_confirm)
263  self.assertEquals(Req.BYTE_Z, resp.ret_byte)
264  self.assertEquals(Req.INT32_Z, resp.ret_int32)
265  self.assertEquals(Req.UINT32_Z, resp.ret_uint32)
266  self.assert_(math.fabs(Req.FLOAT32_Z - resp.ret_float32) < 0.001)
267 
269  rospy.wait_for_service(FAULTY_SERVICE, WAIT_TIMEOUT)
270  sproxy = rospy.ServiceProxy(FAULTY_SERVICE, EmptySrv)
271 
272  rospy.wait_for_service(FAULTY_SERVICE_RESULT, WAIT_TIMEOUT)
273  sproxy_result = rospy.ServiceProxy(FAULTY_SERVICE_RESULT, EmptyReqSrv)
274 
275  resp = sproxy_result.call(EmptyReqSrvRequest())
276  self.assertEquals(resp.fake_secret, 0)
277  try:
278  resp = sproxy.call(EmptySrvRequest())
279  self.assert_(False)
280  except rospy.ServiceException:
281  pass
282  resp = sproxy_result.call(EmptyReqSrvRequest())
283  self.assertEquals(resp.fake_secret, 1)
284 
285 if __name__ == '__main__':
286  if '--service' in sys.argv:
287  services()
288  else:
289  rostest.run(PKG, 'rospy_basic_services', TestBasicServicesClient, sys.argv)
def custom_error_handler(self, e, exc_type, exc_value, tb)


test_rospy
Author(s): Ken Conley, Dirk Thomas
autogenerated on Mon Nov 2 2020 03:52:56