test_rospy_tcpros_pubsub.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2008, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 
00034 import os
00035 import sys
00036 import socket
00037 import struct
00038 import unittest
00039 import time
00040 
00041 class FakeSocket(object):
00042     def __init__(self):
00043         self.data = ''
00044         self.sockopt = None
00045     def fileno(self):
00046         # fool select logic by giving it stdout fileno
00047         return 1
00048     def setblocking(self, *args):
00049         pass
00050     def setsockopt(self, *args):
00051         self.sockopt = args
00052     def send(self, d):
00053         self.data = self.data+d
00054         return len(d)
00055     def sendall(self, d):
00056         self.data = self.data+d
00057     def close(self):
00058         pass
00059 
00060 # test rospy API verifies that the rospy module exports the required symbols
00061 class TestRospyTcprosPubsub(unittest.TestCase):
00062 
00063     def test_TCPROSSub(self):
00064         import rospy.impl.transport
00065         from rospy.impl.tcpros_pubsub import TCPROSSub        
00066         import test_rospy.msg
00067 
00068         callerid = 'test_TCPROSSub'
00069         import rospy.names
00070         rospy.names._set_caller_id(callerid)
00071 
00072         #name, recv_data_class, queue_size=None, buff_size=DEFAULT_BUFF_SIZE
00073         name = 'name-%s'%time.time()
00074         recv_data_class = test_rospy.msg.Val
00075         s = TCPROSSub(name, recv_data_class)
00076         self.assertEquals(name, s.resolved_name)
00077         self.assertEquals(rospy.impl.transport.INBOUND, s.direction)
00078         self.assertEquals(recv_data_class, s.recv_data_class)
00079         self.assert_(s.buff_size > -1)
00080         self.failIf(s.tcp_nodelay)
00081         self.assertEquals(None, s.queue_size)
00082 
00083         fields = s.get_header_fields()
00084         self.assertEquals(name, fields['topic'])
00085         self.assertEquals(recv_data_class._md5sum, fields['md5sum'])
00086         self.assertEquals(recv_data_class._full_text, fields['message_definition'])
00087         self.assertEquals('test_rospy/Val', fields['type'])
00088         self.assert_(callerid, fields['callerid'])
00089         if 'tcp_nodelay' in fields:
00090             self.assertEquals('0', fields['tcp_nodelay'])
00091         
00092         v = int(time.time())
00093         s = TCPROSSub(name, recv_data_class, queue_size=v)
00094         self.assertEquals(v, s.queue_size)        
00095 
00096         s = TCPROSSub(name, recv_data_class, buff_size=v)
00097         self.assertEquals(v, s.buff_size)
00098 
00099         s = TCPROSSub(name, recv_data_class, tcp_nodelay=True)
00100         self.assert_(s.tcp_nodelay)
00101         self.assertEquals('1', s.get_header_fields()['tcp_nodelay'])
00102         
00103     def test_TCPROSPub(self):
00104         import rospy.impl.transport
00105         from rospy.impl.tcpros_pubsub import TCPROSPub        
00106         import test_rospy.msg
00107 
00108         callerid = 'test_TCPROSPub'
00109         import rospy.names
00110         rospy.names._set_caller_id(callerid)
00111 
00112         #name, pub_data_class
00113         name = 'name-%s'%time.time()
00114         pub_data_class = test_rospy.msg.Val
00115         p = TCPROSPub(name, pub_data_class)
00116         self.assertEquals(name, p.resolved_name)
00117         self.assertEquals(rospy.impl.transport.OUTBOUND, p.direction)
00118         self.assertEquals(pub_data_class, p.pub_data_class)
00119         self.assert_(p.buff_size > -1)
00120         self.failIf(p.is_latch)
00121 
00122         fields = p.get_header_fields()
00123         self.assertEquals(name, fields['topic'])
00124         self.assertEquals(pub_data_class._md5sum, fields['md5sum'])
00125         self.assertEquals(pub_data_class._full_text, fields['message_definition'])
00126         self.assertEquals('test_rospy/Val', fields['type'])
00127         self.assert_(callerid, fields['callerid'])
00128         if 'latching' in fields:
00129             self.assertEquals('0', fields['latching'])
00130 
00131         p = TCPROSPub(name, pub_data_class, is_latch=True)
00132         self.assert_(p.is_latch)
00133         self.assertEquals('1', p.get_header_fields()['latching'])
00134 
00135         # test additional header fields
00136         p = TCPROSPub(name, pub_data_class, headers={'foo': 'bar', 'hoge': 'fuga'})
00137         fields = p.get_header_fields()        
00138         self.assertEquals(name, fields['topic'])
00139         self.assertEquals('fuga', fields['hoge'])
00140         self.assertEquals('bar', fields['foo'])        
00141         
00142     def test_configure_pub_socket(self):
00143         # #1241 regression test to make sure that imports don't get messed up again
00144         from rospy.impl.tcpros_pubsub import _configure_pub_socket
00145         import socket
00146         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
00147         _configure_pub_socket(sock, True)
00148         sock.close()
00149         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)        
00150         _configure_pub_socket(sock, False)
00151         sock.close()
00152 
00153     def test_TCPROSHandler_topic_connection_handler(self):
00154         
00155         import rospy
00156         import rospy.core
00157         # very ugly hack to handle bad design choice in rospy and bad isolation inside of nose
00158         rospy.core._shutdown_flag  = False        
00159         rospy.core._in_shutdown  = False        
00160         from rospy.impl.registration import Registration
00161         from rospy.impl.tcpros_pubsub import TCPROSHandler
00162         import test_rospy.msg
00163 
00164         handler = TCPROSHandler()
00165         tch = handler.topic_connection_handler 
00166         sock = FakeSocket()
00167         client_addr = '127.0.0.1'
00168         data_class = test_rospy.msg.Val
00169         topic_name = '/foo-tch'
00170         
00171         headers = { 'topic': topic_name, 'md5sum': data_class._md5sum, 'callerid': '/node'}
00172         # test required logic
00173         for k in headers.iterkeys():
00174             header_copy = headers.copy()
00175             del header_copy[k]
00176             err = tch(sock, client_addr, header_copy)
00177             self.assertNotEquals('', err)
00178 
00179         # '/foo-tch' is not registered, so this should error
00180         err = tch(sock, client_addr, headers)
00181         self.assert_(err)
00182 
00183         # register '/foo-tch'
00184         tm = rospy.impl.registration.get_topic_manager()
00185         impl = tm.acquire_impl(Registration.PUB, topic_name, data_class)
00186         self.assert_(impl is not None)
00187 
00188         # test with mismatched md5sum
00189         header_copy = headers.copy()
00190         header_copy['md5sum'] = 'bad'
00191         md5_err = tch(sock, client_addr, header_copy)
00192         self.assert_("md5sum" in md5_err, md5_err) 
00193 
00194         # now test with correct params
00195         err = tch(sock, client_addr, headers)        
00196         self.failIf(err)
00197         self.assertEquals(None, sock.sockopt)
00198 
00199         # test with mismatched type
00200         # - if md5sums match, this should not error
00201         header_copy = headers.copy()
00202         header_copy['type'] = 'bad_type/Bad'
00203         err = tch(sock, client_addr, header_copy)        
00204         self.failIf(err)
00205         # - now give mismatched md5sum, this will test different error message
00206         header_copy['md5sum'] = 'bad'
00207         type_md5_err = tch(sock, client_addr, header_copy)
00208         self.assert_("types" in type_md5_err, type_md5_err)
00209         
00210         # - these error messages should be different
00211         self.assertNotEquals(md5_err, type_md5_err)
00212         
00213         # test tcp_nodelay
00214         # - should be equivalent to above
00215         headers['tcp_nodelay'] = '0'
00216         err = tch(sock, client_addr, headers)        
00217         self.failIf(err)
00218         self.assertEquals(None, sock.sockopt)        
00219         
00220         # - now test actual sock opt
00221         headers['tcp_nodelay'] = '1'        
00222         err = tch(sock, client_addr, headers)        
00223         self.failIf(err)
00224         self.assertEquals((socket.IPPROTO_TCP, socket.TCP_NODELAY, 1), sock.sockopt)
00225         # test connection headers
00226         impl.headers = {'foo': 'baz', 'hoge': 'fuga'}        
00227         headers['tcp_nodelay'] = '0'
00228         err = tch(sock, client_addr, headers)        
00229         self.failIf(err)
00230         connection = impl.connections[-1]
00231         fields = connection.protocol.get_header_fields()
00232         self.assertEquals(impl.resolved_name, fields['topic'])
00233         self.assertEquals('fuga', fields['hoge'])
00234         self.assertEquals('baz', fields['foo'])        
00235             


test_rospy
Author(s): Ken Conley
autogenerated on Mon Oct 6 2014 11:47:19