$search
00001 #!/usr/bin/env python 00002 # Software License Agreement (BSD License) 00003 # 00004 # Copyright (c) 2008, Willow Garage, Inc. 00005 # All rights reserved. 00006 # 00007 # Redistribution and use in source and binary forms, with or without 00008 # modification, are permitted provided that the following conditions 00009 # are met: 00010 # 00011 # * Redistributions of source code must retain the above copyright 00012 # notice, this list of conditions and the following disclaimer. 00013 # * Redistributions in binary form must reproduce the above 00014 # copyright notice, this list of conditions and the following 00015 # disclaimer in the documentation and/or other materials provided 00016 # with the distribution. 00017 # * Neither the name of Willow Garage, Inc. nor the names of its 00018 # contributors may be used to endorse or promote products derived 00019 # from this software without specific prior written permission. 00020 # 00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 # POSSIBILITY OF SUCH DAMAGE. 00033 # 00034 # Revision $Id: test_rospy_api.py 4898 2009-06-17 01:31:18Z sfkwc $ 00035 00036 import roslib; roslib.load_manifest('test_rospy') 00037 00038 import os 00039 import sys 00040 import socket 00041 import struct 00042 import unittest 00043 import time 00044 00045 class FakeSocket(object): 00046 def __init__(self): 00047 self.data = '' 00048 self.sockopt = None 00049 def fileno(self): 00050 # fool select logic by giving it stdout fileno 00051 return 1 00052 def setblocking(self, *args): 00053 pass 00054 def setsockopt(self, *args): 00055 self.sockopt = args 00056 def send(self, d): 00057 self.data = self.data+d 00058 return len(d) 00059 def sendall(self, d): 00060 self.data = self.data+d 00061 def close(self): 00062 pass 00063 00064 # test service implementation 00065 class TestRospyTcprosService(unittest.TestCase): 00066 00067 def test_convert_return_to_response(self): 00068 import rospy 00069 from rospy.impl.tcpros_service import convert_return_to_response 00070 from test_ros.srv import AddTwoIntsResponse 00071 00072 cls = AddTwoIntsResponse 00073 v = cls(3) 00074 00075 # test various ways that a user could reasonable return a 00076 # value for a single-arg message. This is actually our hardest 00077 # case. 00078 self.assertEquals(v, convert_return_to_response(v, cls)) 00079 self.assertEquals(v, convert_return_to_response(3, cls)) 00080 self.assertEquals(v, convert_return_to_response((3), cls)) 00081 self.assertEquals(v, convert_return_to_response([3], cls)) 00082 self.assertEquals(v, convert_return_to_response({'sum':3}, cls)) 00083 for bad in [[1, 2, 3], {'fake': 1}]: 00084 try: 00085 convert_return_to_response(bad, cls) 00086 self.fail("should have raised: %s"%str(bad)) 00087 except rospy.ServiceException: 00088 pass 00089 00090 # test multi-arg services 00091 from test_rospy.srv import MultipleAddTwoIntsResponse 00092 cls = MultipleAddTwoIntsResponse 00093 v = cls(1, 2) 00094 self.assertEquals(v, convert_return_to_response(v, cls)) 00095 self.assertEquals(v, convert_return_to_response((1, 2), cls)) 00096 self.assertEquals(v, convert_return_to_response([1, 2], cls)) 00097 self.assertEquals(v, convert_return_to_response({'ab':1, 'cd': 2}, cls)) 00098 for bad in [1, AddTwoIntsResponse(), [1, 2, 3], {'fake': 1}]: 00099 try: 00100 convert_return_to_response(bad, cls) 00101 self.fail("should have raised: %s"%str(bad)) 00102 except rospy.ServiceException: 00103 pass 00104 00105 # test response with single, array field 00106 from test_rospy.srv import ListReturnResponse 00107 cls = ListReturnResponse 00108 v = cls([1, 2, 3]) 00109 self.assertEquals(v, convert_return_to_response(v, cls)) 00110 self.assertEquals(v, convert_return_to_response(((1, 2, 3),), cls)) 00111 self.assertEquals(v, convert_return_to_response(([1, 2, 3],), cls)) 00112 self.assertEquals(v, convert_return_to_response([[1, 2, 3]], cls)) 00113 self.assertEquals(v, convert_return_to_response({'abcd':[1,2,3]}, cls)) 00114 for bad in [[1, 2, 3], {'fake': 1}]: 00115 try: 00116 convert_return_to_response(bad, cls) 00117 self.fail("should have raised: %s"%str(bad)) 00118 except rospy.ServiceException: 00119 pass 00120 00121 # test with empty response 00122 from test_rospy.srv import EmptySrvResponse 00123 cls = EmptySrvResponse 00124 v = cls() 00125 # - only valid return values are empty list, empty dict and a response instance 00126 self.assertEquals(v, convert_return_to_response(v, cls)) 00127 self.assertEquals(v, convert_return_to_response([], cls)) 00128 self.assertEquals(v, convert_return_to_response({}, cls)) 00129 00130 # #2185: currently empty does not do any checking whatsoever, 00131 # disabling this test as it is not convert()s fault 00132 if 0: 00133 for bad in [1, AddTwoIntsResponse(), [1, 2, 3], {'fake': 1}]: 00134 try: 00135 convert_return_to_response(bad, cls) 00136 self.fail("should have raised: %s"%str(bad)) 00137 except rospy.ServiceException: 00138 pass 00139 00140 def test_service_connection_handler(self): 00141 import test_rospy.srv 00142 from rospy.impl.registration import get_service_manager 00143 import rospy.service 00144 00145 sock = FakeSocket() 00146 from rospy.impl.tcpros_service import service_connection_handler 00147 client_addr = '10.0.0.1' 00148 00149 # check error conditions on missing headers 00150 self.assert_("Missing" in service_connection_handler(sock, client_addr, {})) 00151 header = { 'service' : '/service', 'md5sum': '*', 'callerid': '/bob' } 00152 for k in header: 00153 c = header.copy() 00154 del c[k] 00155 msg = service_connection_handler(sock, client_addr, c) 00156 self.assert_("Missing" in msg, str(c) + msg) 00157 self.assert_(k in msg, msg) 00158 00159 # check error condition on invalid service 00160 header['service'] = '/doesnotexist' 00161 msg = service_connection_handler(sock, client_addr, header) 00162 self.assert_('is not a provider' in msg, msg) 00163 00164 # check invalid md5sums 00165 00166 name = '/service' 00167 sm = get_service_manager() 00168 fake_service = \ 00169 rospy.service._Service(name, test_rospy.srv.EmptySrv) 00170 sm.register(name, fake_service) 00171 00172 header['service'] = name 00173 header['md5sum'] = 'X' 00174 00175 msg = service_connection_handler(sock, client_addr, header) 00176 self.assert_('md5sums do not match' in msg, msg) 00177 00178 00179 def test_TCPROSServiceClient(self): 00180 import rospy.impl.transport 00181 from rospy.impl.tcpros_service import TCPROSServiceClient 00182 import test_rospy.srv 00183 00184 callerid = 'test_TCPROSServiceClient' 00185 import rospy.names 00186 rospy.names._set_caller_id(callerid) 00187 00188 #name, pub_data_class 00189 name = 'name-%s'%time.time() 00190 srv_data_class = test_rospy.srv.EmptySrv 00191 p = TCPROSServiceClient(name, srv_data_class) 00192 self.assertEquals(name, p.resolved_name) 00193 self.assertEquals(rospy.impl.transport.BIDIRECTIONAL, p.direction) 00194 self.assertEquals(srv_data_class, p.service_class) 00195 self.assert_(p.buff_size > -1) 00196 00197 p = TCPROSServiceClient(name, srv_data_class, buff_size=1) 00198 self.assert_(1, p.buff_size) 00199 00200 fields = p.get_header_fields() 00201 self.assertEquals(name, fields['service']) 00202 self.assertEquals(srv_data_class._md5sum, fields['md5sum']) 00203 00204 # test custom headers 00205 headers = {'sessionid': '123456', 'persistent': '1'} 00206 p = TCPROSServiceClient(name, srv_data_class, headers=headers) 00207 self.assertEquals('123456', p.get_header_fields()['sessionid']) 00208 self.assertEquals('1', p.get_header_fields()['persistent']) 00209 # - make sure builtins are still there 00210 self.assertEquals(name, fields['service']) 00211 self.assertEquals(srv_data_class._md5sum, fields['md5sum']) 00212 00213 00214 def test_TCPService(self): 00215 import rospy.impl.transport 00216 from rospy.impl.tcpros_service import TCPService 00217 import test_rospy.srv 00218 00219 callerid = 'test_TCPService' 00220 import rospy.names 00221 rospy.names._set_caller_id(callerid) 00222 00223 #name, pub_data_class 00224 name = 'name-%s'%time.time() 00225 srv_data_class = test_rospy.srv.EmptySrv 00226 p = TCPService(name, srv_data_class) 00227 self.assertEquals(name, p.resolved_name) 00228 self.assertEquals(rospy.impl.transport.BIDIRECTIONAL, p.direction) 00229 self.assertEquals(srv_data_class, p.service_class) 00230 self.assert_(p.buff_size > -1) 00231 00232 p = TCPService(name, srv_data_class, buff_size=1) 00233 self.assert_(1, p.buff_size) 00234 00235 fields = p.get_header_fields() 00236 self.assertEquals(name, fields['service']) 00237 self.assertEquals(srv_data_class._md5sum, fields['md5sum']) 00238 self.assertEquals(srv_data_class._type, fields['type']) 00239 00240 00241 if __name__ == '__main__': 00242 import rostest 00243 rostest.unitrun('test_rospy', 'test_rospy_tcpros_service', TestRospyTcprosService, coverage_packages=['rospy.impl.tcpros_service'])