test_rospy_tcpros_service.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2008, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 
34 import os
35 import sys
36 import socket
37 import struct
38 import unittest
39 import time
40 
41 class FakeSocket(object):
42  def __init__(self):
43  self.data = ''
44  self.sockopt = None
45  def fileno(self):
46  # fool select logic by giving it stdout fileno
47  return 1
48  def setblocking(self, *args):
49  pass
50  def setsockopt(self, *args):
51  self.sockopt = args
52  def send(self, d):
53  self.data = self.data+d
54  return len(d)
55  def sendall(self, d):
56  self.data = self.data+d
57  def close(self):
58  pass
59 
60 # test service implementation
61 class TestRospyTcprosService(unittest.TestCase):
62 
64  import rospy
65  from rospy.impl.tcpros_service import convert_return_to_response
66  from test_rosmaster.srv import AddTwoIntsResponse
67 
68  cls = AddTwoIntsResponse
69  v = cls(3)
70 
71  # test various ways that a user could reasonable return a
72  # value for a single-arg message. This is actually our hardest
73  # case.
74  self.assertEquals(v, convert_return_to_response(v, cls))
75  self.assertEquals(v, convert_return_to_response(3, cls))
76  self.assertEquals(v, convert_return_to_response((3), cls))
77  self.assertEquals(v, convert_return_to_response([3], cls))
78  self.assertEquals(v, convert_return_to_response({'sum':3}, cls))
79  for bad in [[1, 2, 3], {'fake': 1}]:
80  try:
81  convert_return_to_response(bad, cls)
82  self.fail("should have raised: %s"%str(bad))
83  except rospy.ServiceException:
84  pass
85 
86  # test multi-arg services
87  from test_rospy.srv import MultipleAddTwoIntsResponse
88  cls = MultipleAddTwoIntsResponse
89  v = cls(1, 2)
90  self.assertEquals(v, convert_return_to_response(v, cls))
91  self.assertEquals(v, convert_return_to_response((1, 2), cls))
92  self.assertEquals(v, convert_return_to_response([1, 2], cls))
93  self.assertEquals(v, convert_return_to_response({'ab':1, 'cd': 2}, cls))
94  for bad in [1, AddTwoIntsResponse(), [1, 2, 3], {'fake': 1}]:
95  try:
96  convert_return_to_response(bad, cls)
97  self.fail("should have raised: %s"%str(bad))
98  except rospy.ServiceException:
99  pass
100 
101  # test response with single, array field
102  from test_rospy.srv import ListReturnResponse
103  cls = ListReturnResponse
104  v = cls([1, 2, 3])
105  self.assertEquals(v, convert_return_to_response(v, cls))
106  self.assertEquals(v, convert_return_to_response(((1, 2, 3),), cls))
107  self.assertEquals(v, convert_return_to_response(([1, 2, 3],), cls))
108  self.assertEquals(v, convert_return_to_response([[1, 2, 3]], cls))
109  self.assertEquals(v, convert_return_to_response({'abcd':[1,2,3]}, cls))
110  for bad in [[1, 2, 3], {'fake': 1}]:
111  try:
112  convert_return_to_response(bad, cls)
113  self.fail("should have raised: %s"%str(bad))
114  except rospy.ServiceException:
115  pass
116 
117  # test with empty response
118  from test_rospy.srv import EmptySrvResponse
119  cls = EmptySrvResponse
120  v = cls()
121  # - only valid return values are empty list, empty dict and a response instance
122  self.assertEquals(v, convert_return_to_response(v, cls))
123  self.assertEquals(v, convert_return_to_response([], cls))
124  self.assertEquals(v, convert_return_to_response({}, cls))
125 
126  # #2185: currently empty does not do any checking whatsoever,
127  # disabling this test as it is not convert()s fault
128  if 0:
129  for bad in [1, AddTwoIntsResponse(), [1, 2, 3], {'fake': 1}]:
130  try:
131  convert_return_to_response(bad, cls)
132  self.fail("should have raised: %s"%str(bad))
133  except rospy.ServiceException:
134  pass
135 
137  import test_rospy.srv
138  from rospy.impl.registration import get_service_manager
139  import rospy.service
140 
141  sock = FakeSocket()
142  from rospy.impl.tcpros_service import service_connection_handler
143  client_addr = '10.0.0.1'
144 
145  # check error conditions on missing headers
146  self.assert_("Missing" in service_connection_handler(sock, client_addr, {}))
147  header = { 'service' : '/service', 'md5sum': '*', 'callerid': '/bob' }
148  for k in header:
149  c = header.copy()
150  del c[k]
151  msg = service_connection_handler(sock, client_addr, c)
152  self.assert_("Missing" in msg, str(c) + msg)
153  self.assert_(k in msg, msg)
154 
155  # check error condition on invalid service
156  header['service'] = '/doesnotexist'
157  msg = service_connection_handler(sock, client_addr, header)
158  self.assert_('is not a provider' in msg, msg)
159 
160  # check invalid md5sums
161 
162  name = '/service'
163  sm = get_service_manager()
164  fake_service = \
165  rospy.service._Service(name, test_rospy.srv.EmptySrv)
166  sm.register(name, fake_service)
167 
168  header['service'] = name
169  header['md5sum'] = 'X'
170 
171  msg = service_connection_handler(sock, client_addr, header)
172  self.assert_('md5sums do not match' in msg, msg)
173 
174 
176  import rospy.impl.transport
177  from rospy.impl.tcpros_service import TCPROSServiceClient
178  import test_rospy.srv
179 
180  callerid = 'test_TCPROSServiceClient'
181  import rospy.names
182  rospy.names._set_caller_id(callerid)
183 
184  #name, pub_data_class
185  name = 'name-%s'%time.time()
186  srv_data_class = test_rospy.srv.EmptySrv
187  p = TCPROSServiceClient(name, srv_data_class)
188  self.assertEquals(name, p.resolved_name)
189  self.assertEquals(rospy.impl.transport.BIDIRECTIONAL, p.direction)
190  self.assertEquals(srv_data_class, p.service_class)
191  self.assert_(p.buff_size > -1)
192 
193  p = TCPROSServiceClient(name, srv_data_class, buff_size=1)
194  self.assert_(1, p.buff_size)
195 
196  fields = p.get_header_fields()
197  self.assertEquals(name, fields['service'])
198  self.assertEquals(srv_data_class._md5sum, fields['md5sum'])
199 
200  # test custom headers
201  headers = {'sessionid': '123456', 'persistent': '1'}
202  p = TCPROSServiceClient(name, srv_data_class, headers=headers)
203  self.assertEquals('123456', p.get_header_fields()['sessionid'])
204  self.assertEquals('1', p.get_header_fields()['persistent'])
205  # - make sure builtins are still there
206  self.assertEquals(name, fields['service'])
207  self.assertEquals(srv_data_class._md5sum, fields['md5sum'])
208 
209 
210  def test_TCPService(self):
211  import rospy.impl.transport
212  from rospy.impl.tcpros_service import TCPService
213  import test_rospy.srv
214 
215  callerid = 'test_TCPService'
216  import rospy.names
217  rospy.names._set_caller_id(callerid)
218 
219  #name, pub_data_class
220  name = 'name-%s'%time.time()
221  srv_data_class = test_rospy.srv.EmptySrv
222  p = TCPService(name, srv_data_class)
223  self.assertEquals(name, p.resolved_name)
224  self.assertEquals(rospy.impl.transport.BIDIRECTIONAL, p.direction)
225  self.assertEquals(srv_data_class, p.service_class)
226  self.assert_(p.buff_size > -1)
227 
228  p = TCPService(name, srv_data_class, buff_size=1)
229  self.assert_(1, p.buff_size)
230 
231  fields = p.get_header_fields()
232  self.assertEquals(name, fields['service'])
233  self.assertEquals(srv_data_class._md5sum, fields['md5sum'])
234  self.assertEquals(srv_data_class._type, fields['type'])


test_rospy
Author(s): Ken Conley, Dirk Thomas
autogenerated on Mon Nov 2 2020 03:52:56