00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 import os
00035 import sys
00036 import struct
00037 import unittest
00038 import time
00039 import cStringIO
00040
00041 import rospy
00042
00043 g_fileno = 1
00044
00045 class MockSock:
00046 def __init__(self, buff):
00047 global g_fileno
00048 g_fileno += 1
00049 self.buff = buff
00050 self._fileno = g_fileno
00051 def fileno(self):
00052 return self._fileno
00053 def recv(self, buff_size):
00054 return self.buff[:buff_size]
00055 def close(self):
00056 self.buff = None
00057 class MockEmptySock:
00058 def __init__(self):
00059 global g_fileno
00060 g_fileno += 1
00061 self._fileno = g_fileno
00062 def fileno(self):
00063 return self._fileno
00064 def recv(self, buff_size):
00065 return ''
00066 def close(self):
00067 self.buff = None
00068
00069 class TestRospyTcprosBase(unittest.TestCase):
00070
00071 def test_constants(self):
00072 self.assertEquals("TCPROS", rospy.impl.tcpros_base.TCPROS)
00073 self.assert_(type(rospy.impl.tcpros_base.DEFAULT_BUFF_SIZE), int)
00074
00075 def test_recv_buff(self):
00076 from rospy.impl.tcpros_base import recv_buff
00077
00078
00079 buff = cStringIO.StringIO()
00080 try:
00081 recv_buff(MockEmptySock(), buff, 1)
00082 self.fail("recv_buff should have raised TransportTerminated")
00083 except rospy.impl.tcpros_base.TransportTerminated:
00084 self.assertEquals('', buff.getvalue())
00085
00086 self.assertEquals(5, recv_buff(MockSock('1234567890'), buff, 5))
00087 self.assertEquals('12345', buff.getvalue())
00088 buff = cStringIO.StringIO()
00089
00090 self.assertEquals(10, recv_buff(MockSock('1234567890'), buff, 100))
00091 self.assertEquals('1234567890', buff.getvalue())
00092
00093 def test_TCPServer(self):
00094 from rospy.impl.tcpros_base import TCPServer
00095 def handler(sock, addr):
00096 pass
00097 s = None
00098 try:
00099 s = TCPServer(handler)
00100 self.assert_(s.port > 0)
00101 addr, port = s.get_full_addr()
00102 self.assert_(type(addr) == str)
00103 self.assertEquals(handler, s.inbound_handler)
00104 self.failIf(s.is_shutdown)
00105 finally:
00106 if s is not None:
00107 s.shutdown()
00108 self.assert_(s.is_shutdown)
00109
00110 def test_TCPROSTransportProtocol(self):
00111 import rospy
00112 import random
00113
00114 from rospy.impl.tcpros_base import TCPROSTransportProtocol
00115 from rospy.impl.transport import BIDIRECTIONAL
00116
00117 p = TCPROSTransportProtocol('Bob', rospy.AnyMsg)
00118 self.assertEquals('Bob', p.resolved_name)
00119 self.assertEquals(rospy.AnyMsg, p.recv_data_class)
00120 self.assertEquals(BIDIRECTIONAL, p.direction)
00121 self.assertEquals({}, p.get_header_fields())
00122 self.assertEquals(rospy.impl.tcpros_base.DEFAULT_BUFF_SIZE, p.buff_size)
00123
00124 v = random.randint(1, 100)
00125 p = TCPROSTransportProtocol('Bob', rospy.AnyMsg, queue_size=v)
00126 self.assertEquals(v, p.queue_size)
00127
00128 v = random.randint(1, 100)
00129 p = TCPROSTransportProtocol('Bob', rospy.AnyMsg, buff_size=v)
00130 self.assertEquals(v, p.buff_size)
00131
00132 def test_TCPROSTransport(self):
00133 import rospy.impl.tcpros_base
00134 from rospy.impl.tcpros_base import TCPROSTransport, TCPROSTransportProtocol
00135 from rospy.impl.transport import OUTBOUND
00136 p = TCPROSTransportProtocol('Bob', rospy.AnyMsg)
00137 p.direction = OUTBOUND
00138
00139 try:
00140 TCPROSTransport(p, '')
00141 self.fail("TCPROSTransport should not accept bad name")
00142 except rospy.impl.tcpros_base.TransportInitError: pass
00143
00144 t = TCPROSTransport(p, 'transport-name')
00145 self.assert_(t.socket is None)
00146 self.assert_(t.md5sum is None)
00147 self.assert_(t.type is None)
00148 self.assertEquals(p, t.protocol)
00149 self.assertEquals('TCPROS', t.transport_type)
00150 self.assertEquals(OUTBOUND, t.direction)
00151 self.assertEquals('unknown', t.endpoint_id)
00152 self.assertEquals('', t.read_buff.getvalue())
00153 self.assertEquals('', t.write_buff.getvalue())
00154
00155 s = MockSock('12345')
00156 t.set_socket(s, 'new_endpoint_id')
00157 self.assertEquals('new_endpoint_id', t.endpoint_id)
00158 self.assertEquals(s, t.socket)
00159
00160 t.close()
00161 self.assert_(t.socket is None)
00162 self.assert_(t.read_buff is None)
00163 self.assert_(t.write_buff is None)
00164 self.assert_(t.protocol is None)