test_rospy_tcpros_base.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2008, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 
00034 import os
00035 import sys
00036 import struct
00037 import unittest
00038 import time
00039 import cStringIO
00040         
00041 import rospy
00042 
00043 g_fileno = 1
00044 
00045 class MockSock:
00046     def __init__(self, buff):
00047         global g_fileno
00048         g_fileno += 1
00049         self.buff = buff
00050         self._fileno = g_fileno
00051     def fileno(self):
00052         return self._fileno
00053     def recv(self, buff_size):
00054         return self.buff[:buff_size]
00055     def close(self):
00056         self.buff = None
00057 class MockEmptySock:
00058     def __init__(self):
00059         global g_fileno
00060         g_fileno += 1
00061         self._fileno = g_fileno
00062     def fileno(self):
00063         return self._fileno
00064     def recv(self, buff_size):
00065         return ''
00066     def close(self):
00067         self.buff = None
00068 
00069 class TestRospyTcprosBase(unittest.TestCase):
00070 
00071     def test_constants(self):
00072         self.assertEquals("TCPROS", rospy.impl.tcpros_base.TCPROS)
00073         self.assert_(type(rospy.impl.tcpros_base.DEFAULT_BUFF_SIZE), int)
00074 
00075     def test_recv_buff(self):
00076         from rospy.impl.tcpros_base import recv_buff
00077 
00078 
00079         buff = cStringIO.StringIO()
00080         try:
00081             recv_buff(MockEmptySock(), buff, 1)
00082             self.fail("recv_buff should have raised TransportTerminated")
00083         except rospy.impl.tcpros_base.TransportTerminated:
00084             self.assertEquals('', buff.getvalue())
00085 
00086         self.assertEquals(5, recv_buff(MockSock('1234567890'), buff, 5))
00087         self.assertEquals('12345', buff.getvalue())
00088         buff = cStringIO.StringIO()
00089         
00090         self.assertEquals(10, recv_buff(MockSock('1234567890'), buff, 100))
00091         self.assertEquals('1234567890', buff.getvalue())
00092 
00093     def test_TCPServer(self):
00094         from rospy.impl.tcpros_base import TCPServer
00095         def handler(sock, addr):
00096             pass
00097         s = None
00098         try:
00099             s = TCPServer(handler)
00100             self.assert_(s.port > 0)
00101             addr, port = s.get_full_addr()
00102             self.assert_(type(addr) == str)
00103             self.assertEquals(handler, s.inbound_handler)        
00104             self.failIf(s.is_shutdown)
00105         finally:
00106             if s is not None:
00107                 s.shutdown()
00108                 self.assert_(s.is_shutdown)
00109 
00110     def test_TCPROSTransportProtocol(self):
00111         import rospy
00112         import random
00113 
00114         from rospy.impl.tcpros_base import TCPROSTransportProtocol
00115         from rospy.impl.transport import BIDIRECTIONAL
00116         
00117         p = TCPROSTransportProtocol('Bob', rospy.AnyMsg)
00118         self.assertEquals('Bob', p.resolved_name)
00119         self.assertEquals(rospy.AnyMsg, p.recv_data_class)
00120         self.assertEquals(BIDIRECTIONAL, p.direction)
00121         self.assertEquals({}, p.get_header_fields())
00122         self.assertEquals(rospy.impl.tcpros_base.DEFAULT_BUFF_SIZE, p.buff_size)
00123 
00124         v = random.randint(1, 100)
00125         p = TCPROSTransportProtocol('Bob', rospy.AnyMsg, queue_size=v)
00126         self.assertEquals(v, p.queue_size)
00127 
00128         v = random.randint(1, 100)        
00129         p = TCPROSTransportProtocol('Bob', rospy.AnyMsg, buff_size=v)
00130         self.assertEquals(v, p.buff_size)
00131 
00132     def test_TCPROSTransport(self):
00133         import rospy.impl.tcpros_base
00134         from rospy.impl.tcpros_base import TCPROSTransport, TCPROSTransportProtocol
00135         from rospy.impl.transport import OUTBOUND
00136         p = TCPROSTransportProtocol('Bob', rospy.AnyMsg)
00137         p.direction = OUTBOUND
00138 
00139         try:
00140             TCPROSTransport(p, '')
00141             self.fail("TCPROSTransport should not accept bad name")
00142         except rospy.impl.tcpros_base.TransportInitError: pass
00143         
00144         t = TCPROSTransport(p, 'transport-name')
00145         self.assert_(t.socket is None)
00146         self.assert_(t.md5sum is None)
00147         self.assert_(t.type is None)         
00148         self.assertEquals(p, t.protocol)
00149         self.assertEquals('TCPROS', t.transport_type)        
00150         self.assertEquals(OUTBOUND, t.direction)        
00151         self.assertEquals('unknown', t.endpoint_id)        
00152         self.assertEquals('', t.read_buff.getvalue())
00153         self.assertEquals('', t.write_buff.getvalue())
00154 
00155         s = MockSock('12345')
00156         t.set_socket(s, 'new_endpoint_id')
00157         self.assertEquals('new_endpoint_id', t.endpoint_id)
00158         self.assertEquals(s, t.socket)
00159 
00160         t.close()
00161         self.assert_(t.socket is None)
00162         self.assert_(t.read_buff is None)
00163         self.assert_(t.write_buff is None)
00164         self.assert_(t.protocol is None)


test_rospy
Author(s): Ken Conley
autogenerated on Mon Oct 6 2014 11:47:19