00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 import os
00035 import sys
00036 import socket
00037 import struct
00038 import unittest
00039 import time
00040
00041 class FakeSocket(object):
00042 def __init__(self):
00043 self.data = ''
00044 self.sockopt = None
00045 def fileno(self):
00046
00047 return 1
00048 def setblocking(self, *args):
00049 pass
00050 def setsockopt(self, *args):
00051 self.sockopt = args
00052 def send(self, d):
00053 self.data = self.data+d
00054 return len(d)
00055 def sendall(self, d):
00056 self.data = self.data+d
00057 def close(self):
00058 pass
00059
00060
00061 class TestRospyTcprosService(unittest.TestCase):
00062
00063 def test_convert_return_to_response(self):
00064 import rospy
00065 from rospy.impl.tcpros_service import convert_return_to_response
00066 from test_ros.srv import AddTwoIntsResponse
00067
00068 cls = AddTwoIntsResponse
00069 v = cls(3)
00070
00071
00072
00073
00074 self.assertEquals(v, convert_return_to_response(v, cls))
00075 self.assertEquals(v, convert_return_to_response(3, cls))
00076 self.assertEquals(v, convert_return_to_response((3), cls))
00077 self.assertEquals(v, convert_return_to_response([3], cls))
00078 self.assertEquals(v, convert_return_to_response({'sum':3}, cls))
00079 for bad in [[1, 2, 3], {'fake': 1}]:
00080 try:
00081 convert_return_to_response(bad, cls)
00082 self.fail("should have raised: %s"%str(bad))
00083 except rospy.ServiceException:
00084 pass
00085
00086
00087 from test_rospy.srv import MultipleAddTwoIntsResponse
00088 cls = MultipleAddTwoIntsResponse
00089 v = cls(1, 2)
00090 self.assertEquals(v, convert_return_to_response(v, cls))
00091 self.assertEquals(v, convert_return_to_response((1, 2), cls))
00092 self.assertEquals(v, convert_return_to_response([1, 2], cls))
00093 self.assertEquals(v, convert_return_to_response({'ab':1, 'cd': 2}, cls))
00094 for bad in [1, AddTwoIntsResponse(), [1, 2, 3], {'fake': 1}]:
00095 try:
00096 convert_return_to_response(bad, cls)
00097 self.fail("should have raised: %s"%str(bad))
00098 except rospy.ServiceException:
00099 pass
00100
00101
00102 from test_rospy.srv import ListReturnResponse
00103 cls = ListReturnResponse
00104 v = cls([1, 2, 3])
00105 self.assertEquals(v, convert_return_to_response(v, cls))
00106 self.assertEquals(v, convert_return_to_response(((1, 2, 3),), cls))
00107 self.assertEquals(v, convert_return_to_response(([1, 2, 3],), cls))
00108 self.assertEquals(v, convert_return_to_response([[1, 2, 3]], cls))
00109 self.assertEquals(v, convert_return_to_response({'abcd':[1,2,3]}, cls))
00110 for bad in [[1, 2, 3], {'fake': 1}]:
00111 try:
00112 convert_return_to_response(bad, cls)
00113 self.fail("should have raised: %s"%str(bad))
00114 except rospy.ServiceException:
00115 pass
00116
00117
00118 from test_rospy.srv import EmptySrvResponse
00119 cls = EmptySrvResponse
00120 v = cls()
00121
00122 self.assertEquals(v, convert_return_to_response(v, cls))
00123 self.assertEquals(v, convert_return_to_response([], cls))
00124 self.assertEquals(v, convert_return_to_response({}, cls))
00125
00126
00127
00128 if 0:
00129 for bad in [1, AddTwoIntsResponse(), [1, 2, 3], {'fake': 1}]:
00130 try:
00131 convert_return_to_response(bad, cls)
00132 self.fail("should have raised: %s"%str(bad))
00133 except rospy.ServiceException:
00134 pass
00135
00136 def test_service_connection_handler(self):
00137 import test_rospy.srv
00138 from rospy.impl.registration import get_service_manager
00139 import rospy.service
00140
00141 sock = FakeSocket()
00142 from rospy.impl.tcpros_service import service_connection_handler
00143 client_addr = '10.0.0.1'
00144
00145
00146 self.assert_("Missing" in service_connection_handler(sock, client_addr, {}))
00147 header = { 'service' : '/service', 'md5sum': '*', 'callerid': '/bob' }
00148 for k in header:
00149 c = header.copy()
00150 del c[k]
00151 msg = service_connection_handler(sock, client_addr, c)
00152 self.assert_("Missing" in msg, str(c) + msg)
00153 self.assert_(k in msg, msg)
00154
00155
00156 header['service'] = '/doesnotexist'
00157 msg = service_connection_handler(sock, client_addr, header)
00158 self.assert_('is not a provider' in msg, msg)
00159
00160
00161
00162 name = '/service'
00163 sm = get_service_manager()
00164 fake_service = \
00165 rospy.service._Service(name, test_rospy.srv.EmptySrv)
00166 sm.register(name, fake_service)
00167
00168 header['service'] = name
00169 header['md5sum'] = 'X'
00170
00171 msg = service_connection_handler(sock, client_addr, header)
00172 self.assert_('md5sums do not match' in msg, msg)
00173
00174
00175 def test_TCPROSServiceClient(self):
00176 import rospy.impl.transport
00177 from rospy.impl.tcpros_service import TCPROSServiceClient
00178 import test_rospy.srv
00179
00180 callerid = 'test_TCPROSServiceClient'
00181 import rospy.names
00182 rospy.names._set_caller_id(callerid)
00183
00184
00185 name = 'name-%s'%time.time()
00186 srv_data_class = test_rospy.srv.EmptySrv
00187 p = TCPROSServiceClient(name, srv_data_class)
00188 self.assertEquals(name, p.resolved_name)
00189 self.assertEquals(rospy.impl.transport.BIDIRECTIONAL, p.direction)
00190 self.assertEquals(srv_data_class, p.service_class)
00191 self.assert_(p.buff_size > -1)
00192
00193 p = TCPROSServiceClient(name, srv_data_class, buff_size=1)
00194 self.assert_(1, p.buff_size)
00195
00196 fields = p.get_header_fields()
00197 self.assertEquals(name, fields['service'])
00198 self.assertEquals(srv_data_class._md5sum, fields['md5sum'])
00199
00200
00201 headers = {'sessionid': '123456', 'persistent': '1'}
00202 p = TCPROSServiceClient(name, srv_data_class, headers=headers)
00203 self.assertEquals('123456', p.get_header_fields()['sessionid'])
00204 self.assertEquals('1', p.get_header_fields()['persistent'])
00205
00206 self.assertEquals(name, fields['service'])
00207 self.assertEquals(srv_data_class._md5sum, fields['md5sum'])
00208
00209
00210 def test_TCPService(self):
00211 import rospy.impl.transport
00212 from rospy.impl.tcpros_service import TCPService
00213 import test_rospy.srv
00214
00215 callerid = 'test_TCPService'
00216 import rospy.names
00217 rospy.names._set_caller_id(callerid)
00218
00219
00220 name = 'name-%s'%time.time()
00221 srv_data_class = test_rospy.srv.EmptySrv
00222 p = TCPService(name, srv_data_class)
00223 self.assertEquals(name, p.resolved_name)
00224 self.assertEquals(rospy.impl.transport.BIDIRECTIONAL, p.direction)
00225 self.assertEquals(srv_data_class, p.service_class)
00226 self.assert_(p.buff_size > -1)
00227
00228 p = TCPService(name, srv_data_class, buff_size=1)
00229 self.assert_(1, p.buff_size)
00230
00231 fields = p.get_header_fields()
00232 self.assertEquals(name, fields['service'])
00233 self.assertEquals(srv_data_class._md5sum, fields['md5sum'])
00234 self.assertEquals(srv_data_class._type, fields['type'])