test_rospy_tcpros_pubsub.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2008, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 
00034 import os
00035 import sys
00036 import socket
00037 import struct
00038 import unittest
00039 import time
00040 
00041 class FakeSocket(object):
00042     def __init__(self):
00043         self.data = ''
00044         self.sockopt = None
00045     def fileno(self):
00046         # fool select logic by giving it stdout fileno
00047         return 1
00048     def setblocking(self, *args):
00049         pass
00050     def setsockopt(self, *args):
00051         self.sockopt = args
00052     def send(self, d):
00053         self.data = self.data+d
00054         return len(d)
00055     def sendall(self, d):
00056         self.data = self.data+d
00057     def close(self):
00058         pass
00059     def getsockname(self):
00060         return (None, None)
00061 
00062 # test rospy API verifies that the rospy module exports the required symbols
00063 class TestRospyTcprosPubsub(unittest.TestCase):
00064 
00065     def test_TCPROSSub(self):
00066         import rospy.impl.transport
00067         from rospy.impl.tcpros_pubsub import TCPROSSub        
00068         import test_rospy.msg
00069 
00070         callerid = 'test_TCPROSSub'
00071         import rospy.names
00072         rospy.names._set_caller_id(callerid)
00073 
00074         #name, recv_data_class, queue_size=None, buff_size=DEFAULT_BUFF_SIZE
00075         name = 'name-%s'%time.time()
00076         recv_data_class = test_rospy.msg.Val
00077         s = TCPROSSub(name, recv_data_class)
00078         self.assertEquals(name, s.resolved_name)
00079         self.assertEquals(rospy.impl.transport.INBOUND, s.direction)
00080         self.assertEquals(recv_data_class, s.recv_data_class)
00081         self.assert_(s.buff_size > -1)
00082         self.failIf(s.tcp_nodelay)
00083         self.assertEquals(None, s.queue_size)
00084 
00085         fields = s.get_header_fields()
00086         self.assertEquals(name, fields['topic'])
00087         self.assertEquals(recv_data_class._md5sum, fields['md5sum'])
00088         self.assertEquals(recv_data_class._full_text, fields['message_definition'])
00089         self.assertEquals('test_rospy/Val', fields['type'])
00090         self.assert_(callerid, fields['callerid'])
00091         if 'tcp_nodelay' in fields:
00092             self.assertEquals('0', fields['tcp_nodelay'])
00093         
00094         v = int(time.time())
00095         s = TCPROSSub(name, recv_data_class, queue_size=v)
00096         self.assertEquals(v, s.queue_size)        
00097 
00098         s = TCPROSSub(name, recv_data_class, buff_size=v)
00099         self.assertEquals(v, s.buff_size)
00100 
00101         s = TCPROSSub(name, recv_data_class, tcp_nodelay=True)
00102         self.assert_(s.tcp_nodelay)
00103         self.assertEquals('1', s.get_header_fields()['tcp_nodelay'])
00104         
00105     def test_TCPROSPub(self):
00106         import rospy.impl.transport
00107         from rospy.impl.tcpros_pubsub import TCPROSPub        
00108         import test_rospy.msg
00109 
00110         callerid = 'test_TCPROSPub'
00111         import rospy.names
00112         rospy.names._set_caller_id(callerid)
00113 
00114         #name, pub_data_class
00115         name = 'name-%s'%time.time()
00116         pub_data_class = test_rospy.msg.Val
00117         p = TCPROSPub(name, pub_data_class)
00118         self.assertEquals(name, p.resolved_name)
00119         self.assertEquals(rospy.impl.transport.OUTBOUND, p.direction)
00120         self.assertEquals(pub_data_class, p.pub_data_class)
00121         self.assert_(p.buff_size > -1)
00122         self.failIf(p.is_latch)
00123 
00124         fields = p.get_header_fields()
00125         self.assertEquals(name, fields['topic'])
00126         self.assertEquals(pub_data_class._md5sum, fields['md5sum'])
00127         self.assertEquals(pub_data_class._full_text, fields['message_definition'])
00128         self.assertEquals('test_rospy/Val', fields['type'])
00129         self.assert_(callerid, fields['callerid'])
00130         if 'latching' in fields:
00131             self.assertEquals('0', fields['latching'])
00132 
00133         p = TCPROSPub(name, pub_data_class, is_latch=True)
00134         self.assert_(p.is_latch)
00135         self.assertEquals('1', p.get_header_fields()['latching'])
00136 
00137         # test additional header fields
00138         p = TCPROSPub(name, pub_data_class, headers={'foo': 'bar', 'hoge': 'fuga'})
00139         fields = p.get_header_fields()        
00140         self.assertEquals(name, fields['topic'])
00141         self.assertEquals('fuga', fields['hoge'])
00142         self.assertEquals('bar', fields['foo'])        
00143         
00144     def test_configure_pub_socket(self):
00145         # #1241 regression test to make sure that imports don't get messed up again
00146         from rospy.impl.tcpros_pubsub import _configure_pub_socket
00147         import socket
00148         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
00149         _configure_pub_socket(sock, True)
00150         sock.close()
00151         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)        
00152         _configure_pub_socket(sock, False)
00153         sock.close()
00154 
00155     def test_TCPROSHandler_topic_connection_handler(self):
00156         
00157         import rospy
00158         import rospy.core
00159         # very ugly hack to handle bad design choice in rospy and bad isolation inside of nose
00160         rospy.core._shutdown_flag  = False        
00161         rospy.core._in_shutdown  = False        
00162         from rospy.impl.registration import Registration
00163         from rospy.impl.tcpros_pubsub import TCPROSHandler
00164         import test_rospy.msg
00165 
00166         handler = TCPROSHandler()
00167         tch = handler.topic_connection_handler 
00168         sock = FakeSocket()
00169         client_addr = '127.0.0.1'
00170         data_class = test_rospy.msg.Val
00171         topic_name = '/foo-tch'
00172         
00173         headers = { 'topic': topic_name, 'md5sum': data_class._md5sum, 'callerid': '/node'}
00174         # test required logic
00175         for k in headers.keys():
00176             header_copy = headers.copy()
00177             del header_copy[k]
00178             err = tch(sock, client_addr, header_copy)
00179             self.assertNotEquals('', err)
00180 
00181         # '/foo-tch' is not registered, so this should error
00182         err = tch(sock, client_addr, headers)
00183         self.assert_(err)
00184 
00185         # register '/foo-tch'
00186         tm = rospy.impl.registration.get_topic_manager()
00187         impl = tm.acquire_impl(Registration.PUB, topic_name, data_class)
00188         self.assert_(impl is not None)
00189 
00190         # test with mismatched md5sum
00191         header_copy = headers.copy()
00192         header_copy['md5sum'] = 'bad'
00193         md5_err = tch(sock, client_addr, header_copy)
00194         self.assert_("md5sum" in md5_err, md5_err) 
00195 
00196         # now test with correct params
00197         err = tch(sock, client_addr, headers)        
00198         self.failIf(err)
00199         self.assertEquals(None, sock.sockopt)
00200 
00201         # test with mismatched type
00202         # - if md5sums match, this should not error
00203         header_copy = headers.copy()
00204         header_copy['type'] = 'bad_type/Bad'
00205         err = tch(sock, client_addr, header_copy)        
00206         self.failIf(err)
00207         # - now give mismatched md5sum, this will test different error message
00208         header_copy['md5sum'] = 'bad'
00209         type_md5_err = tch(sock, client_addr, header_copy)
00210         self.assert_("types" in type_md5_err, type_md5_err)
00211         
00212         # - these error messages should be different
00213         self.assertNotEquals(md5_err, type_md5_err)
00214         
00215         # test tcp_nodelay
00216         # - should be equivalent to above
00217         headers['tcp_nodelay'] = '0'
00218         err = tch(sock, client_addr, headers)        
00219         self.failIf(err)
00220         self.assertEquals(None, sock.sockopt)        
00221         
00222         # - now test actual sock opt
00223         headers['tcp_nodelay'] = '1'        
00224         err = tch(sock, client_addr, headers)        
00225         self.failIf(err)
00226         self.assertEquals((socket.IPPROTO_TCP, socket.TCP_NODELAY, 1), sock.sockopt)
00227         # test connection headers
00228         impl.headers = {'foo': 'baz', 'hoge': 'fuga'}        
00229         headers['tcp_nodelay'] = '0'
00230         err = tch(sock, client_addr, headers)        
00231         self.failIf(err)
00232         connection = impl.connections[-1]
00233         fields = connection.protocol.get_header_fields()
00234         self.assertEquals(impl.resolved_name, fields['topic'])
00235         self.assertEquals('fuga', fields['hoge'])
00236         self.assertEquals('baz', fields['foo'])        
00237             


test_rospy
Author(s): Ken Conley
autogenerated on Thu Jun 6 2019 21:10:57