00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 import os
00035 import sys
00036 import struct
00037 import unittest
00038 import time
00039 try:
00040 from cStringIO import StringIO
00041 except ImportError:
00042 from io import StringIO
00043
00044 import rospy
00045
00046 g_fileno = 1
00047
00048 class MockSock:
00049 def __init__(self, buff):
00050 global g_fileno
00051 g_fileno += 1
00052 self.buff = buff
00053 self._fileno = g_fileno
00054 def fileno(self):
00055 return self._fileno
00056 def recv(self, buff_size):
00057 return self.buff[:buff_size]
00058 def close(self):
00059 self.buff = None
00060 def getsockname(self):
00061 return (None, None)
00062 class MockEmptySock:
00063 def __init__(self):
00064 global g_fileno
00065 g_fileno += 1
00066 self._fileno = g_fileno
00067 def fileno(self):
00068 return self._fileno
00069 def recv(self, buff_size):
00070 return ''
00071 def close(self):
00072 self.buff = None
00073
00074 class TestRospyTcprosBase(unittest.TestCase):
00075
00076 def test_constants(self):
00077 self.assertEquals("TCPROS", rospy.impl.tcpros_base.TCPROS)
00078 self.assert_(type(rospy.impl.tcpros_base.DEFAULT_BUFF_SIZE), int)
00079
00080 def test_recv_buff(self):
00081 from rospy.impl.tcpros_base import recv_buff
00082
00083
00084 buff = StringIO()
00085 try:
00086 recv_buff(MockEmptySock(), buff, 1)
00087 self.fail("recv_buff should have raised TransportTerminated")
00088 except rospy.impl.tcpros_base.TransportTerminated:
00089 self.assertEquals('', buff.getvalue())
00090
00091 self.assertEquals(5, recv_buff(MockSock('1234567890'), buff, 5))
00092 self.assertEquals('12345', buff.getvalue())
00093 buff = StringIO()
00094
00095 self.assertEquals(10, recv_buff(MockSock('1234567890'), buff, 100))
00096 self.assertEquals('1234567890', buff.getvalue())
00097
00098 def test_TCPServer(self):
00099 from rospy.impl.tcpros_base import TCPServer
00100 def handler(sock, addr):
00101 pass
00102 s = None
00103 try:
00104 s = TCPServer(handler)
00105 self.assert_(s.port > 0)
00106 addr, port = s.get_full_addr()
00107 self.assert_(type(addr) == str)
00108 self.assertEquals(handler, s.inbound_handler)
00109 self.failIf(s.is_shutdown)
00110 finally:
00111 if s is not None:
00112 s.shutdown()
00113 self.assert_(s.is_shutdown)
00114
00115 def test_TCPROSTransportProtocol(self):
00116 import rospy
00117 import random
00118
00119 from rospy.impl.tcpros_base import TCPROSTransportProtocol
00120 from rospy.impl.transport import BIDIRECTIONAL
00121
00122 p = TCPROSTransportProtocol('Bob', rospy.AnyMsg)
00123 self.assertEquals('Bob', p.resolved_name)
00124 self.assertEquals(rospy.AnyMsg, p.recv_data_class)
00125 self.assertEquals(BIDIRECTIONAL, p.direction)
00126 self.assertEquals({}, p.get_header_fields())
00127 self.assertEquals(rospy.impl.tcpros_base.DEFAULT_BUFF_SIZE, p.buff_size)
00128
00129 v = random.randint(1, 100)
00130 p = TCPROSTransportProtocol('Bob', rospy.AnyMsg, queue_size=v)
00131 self.assertEquals(v, p.queue_size)
00132
00133 v = random.randint(1, 100)
00134 p = TCPROSTransportProtocol('Bob', rospy.AnyMsg, buff_size=v)
00135 self.assertEquals(v, p.buff_size)
00136
00137 def test_TCPROSTransport(self):
00138 import rospy.impl.tcpros_base
00139 from rospy.impl.tcpros_base import TCPROSTransport, TCPROSTransportProtocol
00140 from rospy.impl.transport import OUTBOUND
00141 p = TCPROSTransportProtocol('Bob', rospy.AnyMsg)
00142 p.direction = OUTBOUND
00143
00144 try:
00145 TCPROSTransport(p, '')
00146 self.fail("TCPROSTransport should not accept bad name")
00147 except rospy.impl.tcpros_base.TransportInitError: pass
00148
00149 t = TCPROSTransport(p, 'transport-name')
00150 self.assert_(t.socket is None)
00151 self.assert_(t.md5sum is None)
00152 self.assert_(t.type is None)
00153 self.assertEquals(p, t.protocol)
00154 self.assertEquals('TCPROS', t.transport_type)
00155 self.assertEquals(OUTBOUND, t.direction)
00156 self.assertEquals('unknown', t.endpoint_id)
00157 self.assertEquals('', t.read_buff.getvalue())
00158 self.assertEquals('', t.write_buff.getvalue())
00159
00160 s = MockSock('12345')
00161 t.set_socket(s, 'new_endpoint_id')
00162 self.assertEquals('new_endpoint_id', t.endpoint_id)
00163 self.assertEquals(s, t.socket)
00164
00165 t.close()
00166 self.assert_(t.socket is None)
00167 self.assert_(t.read_buff is None)
00168 self.assert_(t.write_buff is None)
00169 self.assert_(t.protocol is None)