61 class TestRospyTcprosService(unittest.TestCase):
65 from rospy.impl.tcpros_service
import convert_return_to_response
66 from test_rosmaster.srv
import AddTwoIntsResponse
68 cls = AddTwoIntsResponse
74 self.assertEquals(v, convert_return_to_response(v, cls))
75 self.assertEquals(v, convert_return_to_response(3, cls))
76 self.assertEquals(v, convert_return_to_response((3), cls))
77 self.assertEquals(v, convert_return_to_response([3], cls))
78 self.assertEquals(v, convert_return_to_response({
'sum':3}, cls))
79 for bad
in [[1, 2, 3], {
'fake': 1}]:
81 convert_return_to_response(bad, cls)
82 self.fail(
"should have raised: %s"%str(bad))
83 except rospy.ServiceException:
88 cls = MultipleAddTwoIntsResponse
90 self.assertEquals(v, convert_return_to_response(v, cls))
91 self.assertEquals(v, convert_return_to_response((1, 2), cls))
92 self.assertEquals(v, convert_return_to_response([1, 2], cls))
93 self.assertEquals(v, convert_return_to_response({
'ab':1,
'cd': 2}, cls))
94 for bad
in [1, AddTwoIntsResponse(), [1, 2, 3], {
'fake': 1}]:
96 convert_return_to_response(bad, cls)
97 self.fail(
"should have raised: %s"%str(bad))
98 except rospy.ServiceException:
103 cls = ListReturnResponse
105 self.assertEquals(v, convert_return_to_response(v, cls))
106 self.assertEquals(v, convert_return_to_response(((1, 2, 3),), cls))
107 self.assertEquals(v, convert_return_to_response(([1, 2, 3],), cls))
108 self.assertEquals(v, convert_return_to_response([[1, 2, 3]], cls))
109 self.assertEquals(v, convert_return_to_response({
'abcd':[1,2,3]}, cls))
110 for bad
in [[1, 2, 3], {
'fake': 1}]:
112 convert_return_to_response(bad, cls)
113 self.fail(
"should have raised: %s"%str(bad))
114 except rospy.ServiceException:
119 cls = EmptySrvResponse
122 self.assertEquals(v, convert_return_to_response(v, cls))
123 self.assertEquals(v, convert_return_to_response([], cls))
124 self.assertEquals(v, convert_return_to_response({}, cls))
129 for bad
in [1, AddTwoIntsResponse(), [1, 2, 3], {
'fake': 1}]:
131 convert_return_to_response(bad, cls)
132 self.fail(
"should have raised: %s"%str(bad))
133 except rospy.ServiceException:
138 from rospy.impl.registration
import get_service_manager
142 from rospy.impl.tcpros_service
import service_connection_handler
143 client_addr =
'10.0.0.1' 146 self.assert_(
"Missing" in service_connection_handler(sock, client_addr, {}))
147 header = {
'service' :
'/service',
'md5sum':
'*',
'callerid':
'/bob' }
151 msg = service_connection_handler(sock, client_addr, c)
152 self.assert_(
"Missing" in msg, str(c) + msg)
153 self.assert_(k
in msg, msg)
156 header[
'service'] =
'/doesnotexist' 157 msg = service_connection_handler(sock, client_addr, header)
158 self.assert_(
'is not a provider' in msg, msg)
163 sm = get_service_manager()
165 rospy.service._Service(name, test_rospy.srv.EmptySrv)
166 sm.register(name, fake_service)
168 header[
'service'] = name
169 header[
'md5sum'] =
'X' 171 msg = service_connection_handler(sock, client_addr, header)
172 self.assert_(
'md5sums do not match' in msg, msg)
176 import rospy.impl.transport
177 from rospy.impl.tcpros_service
import TCPROSServiceClient
180 callerid =
'test_TCPROSServiceClient' 182 rospy.names._set_caller_id(callerid)
185 name =
'name-%s'%time.time()
186 srv_data_class = test_rospy.srv.EmptySrv
187 p = TCPROSServiceClient(name, srv_data_class)
188 self.assertEquals(name, p.resolved_name)
189 self.assertEquals(rospy.impl.transport.BIDIRECTIONAL, p.direction)
190 self.assertEquals(srv_data_class, p.service_class)
191 self.assert_(p.buff_size > -1)
193 p = TCPROSServiceClient(name, srv_data_class, buff_size=1)
194 self.assert_(1, p.buff_size)
196 fields = p.get_header_fields()
197 self.assertEquals(name, fields[
'service'])
198 self.assertEquals(srv_data_class._md5sum, fields[
'md5sum'])
201 headers = {
'sessionid':
'123456',
'persistent':
'1'}
202 p = TCPROSServiceClient(name, srv_data_class, headers=headers)
203 self.assertEquals(
'123456', p.get_header_fields()[
'sessionid'])
204 self.assertEquals(
'1', p.get_header_fields()[
'persistent'])
206 self.assertEquals(name, fields[
'service'])
207 self.assertEquals(srv_data_class._md5sum, fields[
'md5sum'])
211 import rospy.impl.transport
212 from rospy.impl.tcpros_service
import TCPService
215 callerid =
'test_TCPService' 217 rospy.names._set_caller_id(callerid)
220 name =
'name-%s'%time.time()
221 srv_data_class = test_rospy.srv.EmptySrv
222 p = TCPService(name, srv_data_class)
223 self.assertEquals(name, p.resolved_name)
224 self.assertEquals(rospy.impl.transport.BIDIRECTIONAL, p.direction)
225 self.assertEquals(srv_data_class, p.service_class)
226 self.assert_(p.buff_size > -1)
228 p = TCPService(name, srv_data_class, buff_size=1)
229 self.assert_(1, p.buff_size)
231 fields = p.get_header_fields()
232 self.assertEquals(name, fields[
'service'])
233 self.assertEquals(srv_data_class._md5sum, fields[
'md5sum'])
234 self.assertEquals(srv_data_class._type, fields[
'type'])
def test_TCPROSServiceClient(self)
def test_TCPService(self)
def test_convert_return_to_response(self)
def setsockopt(self, args)
def setblocking(self, args)
def test_service_connection_handler(self)