test_rospy_tcpros_service.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2008, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 
00034 import os
00035 import sys
00036 import socket
00037 import struct
00038 import unittest
00039 import time
00040 
00041 class FakeSocket(object):
00042     def __init__(self):
00043         self.data = ''
00044         self.sockopt = None
00045     def fileno(self):
00046         # fool select logic by giving it stdout fileno
00047         return 1
00048     def setblocking(self, *args):
00049         pass
00050     def setsockopt(self, *args):
00051         self.sockopt = args
00052     def send(self, d):
00053         self.data = self.data+d
00054         return len(d)
00055     def sendall(self, d):
00056         self.data = self.data+d
00057     def close(self):
00058         pass
00059 
00060 # test service implementation
00061 class TestRospyTcprosService(unittest.TestCase):
00062 
00063     def test_convert_return_to_response(self):
00064         import rospy
00065         from rospy.impl.tcpros_service import convert_return_to_response
00066         from test_rosmaster.srv import AddTwoIntsResponse
00067 
00068         cls = AddTwoIntsResponse
00069         v = cls(3)
00070         
00071         # test various ways that a user could reasonable return a
00072         # value for a single-arg message. This is actually our hardest
00073         # case.
00074         self.assertEquals(v, convert_return_to_response(v, cls))
00075         self.assertEquals(v, convert_return_to_response(3, cls))
00076         self.assertEquals(v, convert_return_to_response((3), cls))        
00077         self.assertEquals(v, convert_return_to_response([3], cls))        
00078         self.assertEquals(v, convert_return_to_response({'sum':3}, cls))
00079         for bad in [[1, 2, 3], {'fake': 1}]:
00080             try:
00081                 convert_return_to_response(bad, cls)
00082                 self.fail("should have raised: %s"%str(bad))
00083             except rospy.ServiceException:
00084                 pass
00085 
00086         # test multi-arg services
00087         from test_rospy.srv import MultipleAddTwoIntsResponse
00088         cls = MultipleAddTwoIntsResponse
00089         v = cls(1, 2)
00090         self.assertEquals(v, convert_return_to_response(v, cls))
00091         self.assertEquals(v, convert_return_to_response((1, 2), cls))        
00092         self.assertEquals(v, convert_return_to_response([1, 2], cls))        
00093         self.assertEquals(v, convert_return_to_response({'ab':1, 'cd': 2}, cls))
00094         for bad in [1, AddTwoIntsResponse(), [1, 2, 3], {'fake': 1}]:
00095             try:
00096                 convert_return_to_response(bad, cls)
00097                 self.fail("should have raised: %s"%str(bad))
00098             except rospy.ServiceException:
00099                 pass
00100 
00101         # test response with single, array field
00102         from test_rospy.srv import ListReturnResponse
00103         cls = ListReturnResponse
00104         v = cls([1, 2, 3])
00105         self.assertEquals(v, convert_return_to_response(v, cls))
00106         self.assertEquals(v, convert_return_to_response(((1, 2, 3),), cls))        
00107         self.assertEquals(v, convert_return_to_response(([1, 2, 3],), cls))
00108         self.assertEquals(v, convert_return_to_response([[1, 2, 3]], cls))        
00109         self.assertEquals(v, convert_return_to_response({'abcd':[1,2,3]}, cls))
00110         for bad in [[1, 2, 3], {'fake': 1}]:
00111             try:
00112                 convert_return_to_response(bad, cls)
00113                 self.fail("should have raised: %s"%str(bad))
00114             except rospy.ServiceException:
00115                 pass
00116 
00117         # test with empty response
00118         from test_rospy.srv import EmptySrvResponse
00119         cls = EmptySrvResponse
00120         v = cls()
00121         # - only valid return values are empty list, empty dict and a response instance
00122         self.assertEquals(v, convert_return_to_response(v, cls))
00123         self.assertEquals(v, convert_return_to_response([], cls))
00124         self.assertEquals(v, convert_return_to_response({}, cls))
00125         
00126         # #2185: currently empty does not do any checking whatsoever,
00127         # disabling this test as it is not convert()s fault
00128         if 0:
00129             for bad in [1, AddTwoIntsResponse(), [1, 2, 3], {'fake': 1}]:
00130                 try:
00131                     convert_return_to_response(bad, cls)
00132                     self.fail("should have raised: %s"%str(bad))
00133                 except rospy.ServiceException:
00134                     pass
00135         
00136     def test_service_connection_handler(self):
00137         import test_rospy.srv
00138         from rospy.impl.registration import get_service_manager        
00139         import rospy.service
00140 
00141         sock = FakeSocket()
00142         from rospy.impl.tcpros_service import service_connection_handler
00143         client_addr = '10.0.0.1'
00144 
00145         # check error conditions on missing headers
00146         self.assert_("Missing" in service_connection_handler(sock, client_addr, {}))
00147         header = { 'service' : '/service', 'md5sum': '*', 'callerid': '/bob' }
00148         for k in header:
00149             c = header.copy()
00150             del c[k]
00151             msg = service_connection_handler(sock, client_addr, c)
00152             self.assert_("Missing" in msg, str(c) + msg)
00153             self.assert_(k in msg, msg)
00154 
00155         # check error condition on invalid service
00156         header['service'] = '/doesnotexist'
00157         msg = service_connection_handler(sock, client_addr, header)
00158         self.assert_('is not a provider' in msg, msg)
00159 
00160         # check invalid md5sums
00161 
00162         name = '/service'
00163         sm = get_service_manager()
00164         fake_service = \
00165             rospy.service._Service(name, test_rospy.srv.EmptySrv)
00166         sm.register(name, fake_service)
00167         
00168         header['service'] = name
00169         header['md5sum'] = 'X'
00170 
00171         msg = service_connection_handler(sock, client_addr, header)
00172         self.assert_('md5sums do not match' in msg, msg)
00173         
00174         
00175     def test_TCPROSServiceClient(self):
00176         import rospy.impl.transport
00177         from rospy.impl.tcpros_service import TCPROSServiceClient        
00178         import test_rospy.srv
00179 
00180         callerid = 'test_TCPROSServiceClient'
00181         import rospy.names
00182         rospy.names._set_caller_id(callerid)
00183 
00184         #name, pub_data_class
00185         name = 'name-%s'%time.time()
00186         srv_data_class = test_rospy.srv.EmptySrv
00187         p = TCPROSServiceClient(name, srv_data_class)
00188         self.assertEquals(name, p.resolved_name)
00189         self.assertEquals(rospy.impl.transport.BIDIRECTIONAL, p.direction)
00190         self.assertEquals(srv_data_class, p.service_class)
00191         self.assert_(p.buff_size > -1)
00192 
00193         p = TCPROSServiceClient(name, srv_data_class, buff_size=1)
00194         self.assert_(1, p.buff_size)
00195 
00196         fields = p.get_header_fields()
00197         self.assertEquals(name, fields['service'])
00198         self.assertEquals(srv_data_class._md5sum, fields['md5sum'])
00199 
00200         # test custom headers
00201         headers = {'sessionid': '123456', 'persistent': '1'}
00202         p = TCPROSServiceClient(name, srv_data_class, headers=headers)
00203         self.assertEquals('123456', p.get_header_fields()['sessionid'])
00204         self.assertEquals('1', p.get_header_fields()['persistent'])
00205         # - make sure builtins are still there
00206         self.assertEquals(name, fields['service'])
00207         self.assertEquals(srv_data_class._md5sum, fields['md5sum'])
00208         
00209 
00210     def test_TCPService(self):
00211         import rospy.impl.transport
00212         from rospy.impl.tcpros_service import TCPService        
00213         import test_rospy.srv
00214 
00215         callerid = 'test_TCPService'
00216         import rospy.names
00217         rospy.names._set_caller_id(callerid)
00218 
00219         #name, pub_data_class
00220         name = 'name-%s'%time.time()
00221         srv_data_class = test_rospy.srv.EmptySrv
00222         p = TCPService(name, srv_data_class)
00223         self.assertEquals(name, p.resolved_name)
00224         self.assertEquals(rospy.impl.transport.BIDIRECTIONAL, p.direction)
00225         self.assertEquals(srv_data_class, p.service_class)
00226         self.assert_(p.buff_size > -1)
00227 
00228         p = TCPService(name, srv_data_class, buff_size=1)
00229         self.assert_(1, p.buff_size)
00230 
00231         fields = p.get_header_fields()
00232         self.assertEquals(name, fields['service'])
00233         self.assertEquals(srv_data_class._md5sum, fields['md5sum'])
00234         self.assertEquals(srv_data_class._type, fields['type'])        


test_rospy
Author(s): Ken Conley
autogenerated on Thu Jun 6 2019 21:10:57