00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 import roslib; roslib.load_manifest('test_rospy')
00037
00038 import os
00039 import sys
00040 import socket
00041 import struct
00042 import unittest
00043 import time
00044
00045 class FakeSocket(object):
00046 def __init__(self):
00047 self.data = ''
00048 self.sockopt = None
00049 def fileno(self):
00050
00051 return 1
00052 def setblocking(self, *args):
00053 pass
00054 def setsockopt(self, *args):
00055 self.sockopt = args
00056 def send(self, d):
00057 self.data = self.data+d
00058 return len(d)
00059 def sendall(self, d):
00060 self.data = self.data+d
00061 def close(self):
00062 pass
00063
00064
00065 class TestRospyTcprosService(unittest.TestCase):
00066
00067 def test_convert_return_to_response(self):
00068 import rospy
00069 from rospy.impl.tcpros_service import convert_return_to_response
00070 from test_ros.srv import AddTwoIntsResponse
00071
00072 cls = AddTwoIntsResponse
00073 v = cls(3)
00074
00075
00076
00077
00078 self.assertEquals(v, convert_return_to_response(v, cls))
00079 self.assertEquals(v, convert_return_to_response(3, cls))
00080 self.assertEquals(v, convert_return_to_response((3), cls))
00081 self.assertEquals(v, convert_return_to_response([3], cls))
00082 self.assertEquals(v, convert_return_to_response({'sum':3}, cls))
00083 for bad in [[1, 2, 3], {'fake': 1}]:
00084 try:
00085 convert_return_to_response(bad, cls)
00086 self.fail("should have raised: %s"%str(bad))
00087 except rospy.ServiceException:
00088 pass
00089
00090
00091 from test_rospy.srv import MultipleAddTwoIntsResponse
00092 cls = MultipleAddTwoIntsResponse
00093 v = cls(1, 2)
00094 self.assertEquals(v, convert_return_to_response(v, cls))
00095 self.assertEquals(v, convert_return_to_response((1, 2), cls))
00096 self.assertEquals(v, convert_return_to_response([1, 2], cls))
00097 self.assertEquals(v, convert_return_to_response({'ab':1, 'cd': 2}, cls))
00098 for bad in [1, AddTwoIntsResponse(), [1, 2, 3], {'fake': 1}]:
00099 try:
00100 convert_return_to_response(bad, cls)
00101 self.fail("should have raised: %s"%str(bad))
00102 except rospy.ServiceException:
00103 pass
00104
00105
00106 from test_rospy.srv import ListReturnResponse
00107 cls = ListReturnResponse
00108 v = cls([1, 2, 3])
00109 self.assertEquals(v, convert_return_to_response(v, cls))
00110 self.assertEquals(v, convert_return_to_response(((1, 2, 3),), cls))
00111 self.assertEquals(v, convert_return_to_response(([1, 2, 3],), cls))
00112 self.assertEquals(v, convert_return_to_response([[1, 2, 3]], cls))
00113 self.assertEquals(v, convert_return_to_response({'abcd':[1,2,3]}, cls))
00114 for bad in [[1, 2, 3], {'fake': 1}]:
00115 try:
00116 convert_return_to_response(bad, cls)
00117 self.fail("should have raised: %s"%str(bad))
00118 except rospy.ServiceException:
00119 pass
00120
00121
00122 from test_rospy.srv import EmptySrvResponse
00123 cls = EmptySrvResponse
00124 v = cls()
00125
00126 self.assertEquals(v, convert_return_to_response(v, cls))
00127 self.assertEquals(v, convert_return_to_response([], cls))
00128 self.assertEquals(v, convert_return_to_response({}, cls))
00129
00130
00131
00132 if 0:
00133 for bad in [1, AddTwoIntsResponse(), [1, 2, 3], {'fake': 1}]:
00134 try:
00135 convert_return_to_response(bad, cls)
00136 self.fail("should have raised: %s"%str(bad))
00137 except rospy.ServiceException:
00138 pass
00139
00140 def test_service_connection_handler(self):
00141 import test_rospy.srv
00142 from rospy.impl.registration import get_service_manager
00143 import rospy.service
00144
00145 sock = FakeSocket()
00146 from rospy.impl.tcpros_service import service_connection_handler
00147 client_addr = '10.0.0.1'
00148
00149
00150 self.assert_("Missing" in service_connection_handler(sock, client_addr, {}))
00151 header = { 'service' : '/service', 'md5sum': '*', 'callerid': '/bob' }
00152 for k in header:
00153 c = header.copy()
00154 del c[k]
00155 msg = service_connection_handler(sock, client_addr, c)
00156 self.assert_("Missing" in msg, str(c) + msg)
00157 self.assert_(k in msg, msg)
00158
00159
00160 header['service'] = '/doesnotexist'
00161 msg = service_connection_handler(sock, client_addr, header)
00162 self.assert_('is not a provider' in msg, msg)
00163
00164
00165
00166 name = '/service'
00167 sm = get_service_manager()
00168 fake_service = \
00169 rospy.service._Service(name, test_rospy.srv.EmptySrv)
00170 sm.register(name, fake_service)
00171
00172 header['service'] = name
00173 header['md5sum'] = 'X'
00174
00175 msg = service_connection_handler(sock, client_addr, header)
00176 self.assert_('md5sums do not match' in msg, msg)
00177
00178
00179 def test_TCPROSServiceClient(self):
00180 import rospy.impl.transport
00181 from rospy.impl.tcpros_service import TCPROSServiceClient
00182 import test_rospy.srv
00183
00184 callerid = 'test_TCPROSServiceClient'
00185 import rospy.names
00186 rospy.names._set_caller_id(callerid)
00187
00188
00189 name = 'name-%s'%time.time()
00190 srv_data_class = test_rospy.srv.EmptySrv
00191 p = TCPROSServiceClient(name, srv_data_class)
00192 self.assertEquals(name, p.resolved_name)
00193 self.assertEquals(rospy.impl.transport.BIDIRECTIONAL, p.direction)
00194 self.assertEquals(srv_data_class, p.service_class)
00195 self.assert_(p.buff_size > -1)
00196
00197 p = TCPROSServiceClient(name, srv_data_class, buff_size=1)
00198 self.assert_(1, p.buff_size)
00199
00200 fields = p.get_header_fields()
00201 self.assertEquals(name, fields['service'])
00202 self.assertEquals(srv_data_class._md5sum, fields['md5sum'])
00203
00204
00205 headers = {'sessionid': '123456', 'persistent': '1'}
00206 p = TCPROSServiceClient(name, srv_data_class, headers=headers)
00207 self.assertEquals('123456', p.get_header_fields()['sessionid'])
00208 self.assertEquals('1', p.get_header_fields()['persistent'])
00209
00210 self.assertEquals(name, fields['service'])
00211 self.assertEquals(srv_data_class._md5sum, fields['md5sum'])
00212
00213
00214 def test_TCPService(self):
00215 import rospy.impl.transport
00216 from rospy.impl.tcpros_service import TCPService
00217 import test_rospy.srv
00218
00219 callerid = 'test_TCPService'
00220 import rospy.names
00221 rospy.names._set_caller_id(callerid)
00222
00223
00224 name = 'name-%s'%time.time()
00225 srv_data_class = test_rospy.srv.EmptySrv
00226 p = TCPService(name, srv_data_class)
00227 self.assertEquals(name, p.resolved_name)
00228 self.assertEquals(rospy.impl.transport.BIDIRECTIONAL, p.direction)
00229 self.assertEquals(srv_data_class, p.service_class)
00230 self.assert_(p.buff_size > -1)
00231
00232 p = TCPService(name, srv_data_class, buff_size=1)
00233 self.assert_(1, p.buff_size)
00234
00235 fields = p.get_header_fields()
00236 self.assertEquals(name, fields['service'])
00237 self.assertEquals(srv_data_class._md5sum, fields['md5sum'])
00238 self.assertEquals(srv_data_class._type, fields['type'])
00239
00240
00241 if __name__ == '__main__':
00242 import rostest
00243 rostest.unitrun('test_rospy', 'test_rospy_tcpros_service', TestRospyTcprosService, coverage_packages=['rospy.impl.tcpros_service'])