$search
00001 #!/usr/bin/env python 00002 # Software License Agreement (BSD License) 00003 # 00004 # Copyright (c) 2008, Willow Garage, Inc. 00005 # All rights reserved. 00006 # 00007 # Redistribution and use in source and binary forms, with or without 00008 # modification, are permitted provided that the following conditions 00009 # are met: 00010 # 00011 # * Redistributions of source code must retain the above copyright 00012 # notice, this list of conditions and the following disclaimer. 00013 # * Redistributions in binary form must reproduce the above 00014 # copyright notice, this list of conditions and the following 00015 # disclaimer in the documentation and/or other materials provided 00016 # with the distribution. 00017 # * Neither the name of Willow Garage, Inc. nor the names of its 00018 # contributors may be used to endorse or promote products derived 00019 # from this software without specific prior written permission. 00020 # 00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 # POSSIBILITY OF SUCH DAMAGE. 00033 # 00034 # Revision $Id: test_rospy_api.py 4898 2009-06-17 01:31:18Z sfkwc $ 00035 00036 import roslib; roslib.load_manifest('test_rospy') 00037 00038 import os 00039 import sys 00040 import socket 00041 import struct 00042 import unittest 00043 import time 00044 00045 class FakeSocket(object): 00046 def __init__(self): 00047 self.data = '' 00048 self.sockopt = None 00049 def fileno(self): 00050 # fool select logic by giving it stdout fileno 00051 return 1 00052 def setblocking(self, *args): 00053 pass 00054 def setsockopt(self, *args): 00055 self.sockopt = args 00056 def send(self, d): 00057 self.data = self.data+d 00058 return len(d) 00059 def sendall(self, d): 00060 self.data = self.data+d 00061 def close(self): 00062 pass 00063 00064 # test rospy API verifies that the rospy module exports the required symbols 00065 class TestRospyTcprosPubsub(unittest.TestCase): 00066 00067 def test_TCPROSSub(self): 00068 import rospy.impl.transport 00069 from rospy.impl.tcpros_pubsub import TCPROSSub 00070 import test_rospy.msg 00071 00072 callerid = 'test_TCPROSSub' 00073 import rospy.names 00074 rospy.names._set_caller_id(callerid) 00075 00076 #name, recv_data_class, queue_size=None, buff_size=DEFAULT_BUFF_SIZE 00077 name = 'name-%s'%time.time() 00078 recv_data_class = test_rospy.msg.Val 00079 s = TCPROSSub(name, recv_data_class) 00080 self.assertEquals(name, s.resolved_name) 00081 self.assertEquals(rospy.impl.transport.INBOUND, s.direction) 00082 self.assertEquals(recv_data_class, s.recv_data_class) 00083 self.assert_(s.buff_size > -1) 00084 self.failIf(s.tcp_nodelay) 00085 self.assertEquals(None, s.queue_size) 00086 00087 fields = s.get_header_fields() 00088 self.assertEquals(name, fields['topic']) 00089 self.assertEquals(recv_data_class._md5sum, fields['md5sum']) 00090 self.assertEquals(recv_data_class._full_text, fields['message_definition']) 00091 self.assertEquals('test_rospy/Val', fields['type']) 00092 self.assert_(callerid, fields['callerid']) 00093 if 'tcp_nodelay' in fields: 00094 self.assertEquals('0', fields['tcp_nodelay']) 00095 00096 v = int(time.time()) 00097 s = TCPROSSub(name, recv_data_class, queue_size=v) 00098 self.assertEquals(v, s.queue_size) 00099 00100 s = TCPROSSub(name, recv_data_class, buff_size=v) 00101 self.assertEquals(v, s.buff_size) 00102 00103 s = TCPROSSub(name, recv_data_class, tcp_nodelay=True) 00104 self.assert_(s.tcp_nodelay) 00105 self.assertEquals('1', s.get_header_fields()['tcp_nodelay']) 00106 00107 def test_TCPROSPub(self): 00108 import rospy.impl.transport 00109 from rospy.impl.tcpros_pubsub import TCPROSPub 00110 import test_rospy.msg 00111 00112 callerid = 'test_TCPROSPub' 00113 import rospy.names 00114 rospy.names._set_caller_id(callerid) 00115 00116 #name, pub_data_class 00117 name = 'name-%s'%time.time() 00118 pub_data_class = test_rospy.msg.Val 00119 p = TCPROSPub(name, pub_data_class) 00120 self.assertEquals(name, p.resolved_name) 00121 self.assertEquals(rospy.impl.transport.OUTBOUND, p.direction) 00122 self.assertEquals(pub_data_class, p.pub_data_class) 00123 self.assert_(p.buff_size > -1) 00124 self.failIf(p.is_latch) 00125 00126 fields = p.get_header_fields() 00127 self.assertEquals(name, fields['topic']) 00128 self.assertEquals(pub_data_class._md5sum, fields['md5sum']) 00129 self.assertEquals(pub_data_class._full_text, fields['message_definition']) 00130 self.assertEquals('test_rospy/Val', fields['type']) 00131 self.assert_(callerid, fields['callerid']) 00132 if 'latching' in fields: 00133 self.assertEquals('0', fields['latching']) 00134 00135 p = TCPROSPub(name, pub_data_class, is_latch=True) 00136 self.assert_(p.is_latch) 00137 self.assertEquals('1', p.get_header_fields()['latching']) 00138 00139 # test additional header fields 00140 p = TCPROSPub(name, pub_data_class, headers={'foo': 'bar', 'hoge': 'fuga'}) 00141 fields = p.get_header_fields() 00142 self.assertEquals(name, fields['topic']) 00143 self.assertEquals('fuga', fields['hoge']) 00144 self.assertEquals('bar', fields['foo']) 00145 00146 def test_configure_pub_socket(self): 00147 # #1241 regression test to make sure that imports don't get messed up again 00148 from rospy.impl.tcpros_pubsub import _configure_pub_socket 00149 import socket 00150 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 00151 _configure_pub_socket(sock, True) 00152 sock.close() 00153 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 00154 _configure_pub_socket(sock, False) 00155 sock.close() 00156 00157 def test_TCPROSHandler_topic_connection_handler(self): 00158 00159 import rospy 00160 from rospy.impl.registration import Registration 00161 from rospy.impl.tcpros_pubsub import TCPROSHandler 00162 import test_rospy.msg 00163 00164 handler = TCPROSHandler() 00165 tch = handler.topic_connection_handler 00166 sock = FakeSocket() 00167 client_addr = '127.0.0.1' 00168 data_class = test_rospy.msg.Val 00169 topic_name = '/foo-tch' 00170 00171 headers = { 'topic': topic_name, 'md5sum': data_class._md5sum, 'callerid': '/node'} 00172 # test required logic 00173 for k in headers.iterkeys(): 00174 header_copy = headers.copy() 00175 del header_copy[k] 00176 err = tch(sock, client_addr, header_copy) 00177 self.assertNotEquals('', err) 00178 00179 # '/foo-tch' is not registered, so this should error 00180 err = tch(sock, client_addr, headers) 00181 self.assert_(err) 00182 00183 # register '/foo-tch' 00184 tm = rospy.impl.registration.get_topic_manager() 00185 impl = tm.acquire_impl(Registration.PUB, topic_name, data_class) 00186 self.assert_(impl is not None) 00187 00188 # test with mismatched md5sum 00189 header_copy = headers.copy() 00190 header_copy['md5sum'] = 'bad' 00191 md5_err = tch(sock, client_addr, header_copy) 00192 self.assert_("md5sum" in md5_err, md5_err) 00193 00194 # now test with correct params 00195 err = tch(sock, client_addr, headers) 00196 self.failIf(err) 00197 self.assertEquals(None, sock.sockopt) 00198 00199 # test with mismatched type 00200 # - if md5sums match, this should not error 00201 header_copy = headers.copy() 00202 header_copy['type'] = 'bad_type/Bad' 00203 err = tch(sock, client_addr, header_copy) 00204 self.failIf(err) 00205 # - now give mismatched md5sum, this will test different error message 00206 header_copy['md5sum'] = 'bad' 00207 type_md5_err = tch(sock, client_addr, header_copy) 00208 self.assert_("types" in type_md5_err, type_md5_err) 00209 00210 # - these error messages should be different 00211 self.assertNotEquals(md5_err, type_md5_err) 00212 00213 # test tcp_nodelay 00214 # - should be equivalent to above 00215 headers['tcp_nodelay'] = '0' 00216 err = tch(sock, client_addr, headers) 00217 self.failIf(err) 00218 self.assertEquals(None, sock.sockopt) 00219 00220 # - now test actual sock opt 00221 headers['tcp_nodelay'] = '1' 00222 err = tch(sock, client_addr, headers) 00223 self.failIf(err) 00224 self.assertEquals((socket.IPPROTO_TCP, socket.TCP_NODELAY, 1), sock.sockopt) 00225 # test connection headers 00226 impl.headers = {'foo': 'baz', 'hoge': 'fuga'} 00227 headers['tcp_nodelay'] = '0' 00228 err = tch(sock, client_addr, headers) 00229 self.failIf(err) 00230 connection = impl.connections[-1] 00231 fields = connection.protocol.get_header_fields() 00232 self.assertEquals(impl.resolved_name, fields['topic']) 00233 self.assertEquals('fuga', fields['hoge']) 00234 self.assertEquals('baz', fields['foo']) 00235 00236 if __name__ == '__main__': 00237 import rostest 00238 rostest.unitrun('test_rospy', 'test_rospy_tcpros_pubsub', TestRospyTcprosPubsub, coverage_packages=['rospy.impl.tcpros_pubsub'])