40 from cStringIO
import StringIO
42 from io
import StringIO
57 return self.
buff[:buff_size]
77 self.assertEquals(
"TCPROS", rospy.impl.tcpros_base.TCPROS)
78 self.assert_(type(rospy.impl.tcpros_base.DEFAULT_BUFF_SIZE), int)
81 from rospy.impl.tcpros_base
import recv_buff
87 self.fail(
"recv_buff should have raised TransportTerminated")
88 except rospy.impl.tcpros_base.TransportTerminated:
89 self.assertEquals(
'', buff.getvalue())
91 self.assertEquals(5, recv_buff(
MockSock(
'1234567890'), buff, 5))
92 self.assertEquals(
'12345', buff.getvalue())
95 self.assertEquals(10, recv_buff(
MockSock(
'1234567890'), buff, 100))
96 self.assertEquals(
'1234567890', buff.getvalue())
99 from rospy.impl.tcpros_base
import TCPServer
100 def handler(sock, addr):
104 s = TCPServer(handler)
105 self.assert_(s.port > 0)
106 addr, port = s.get_full_addr()
107 self.assert_(type(addr) == str)
108 self.assertEquals(handler, s.inbound_handler)
109 self.failIf(s.is_shutdown)
113 self.assert_(s.is_shutdown)
119 from rospy.impl.tcpros_base
import TCPROSTransportProtocol
120 from rospy.impl.transport
import BIDIRECTIONAL
122 p = TCPROSTransportProtocol(
'Bob', rospy.AnyMsg)
123 self.assertEquals(
'Bob', p.resolved_name)
124 self.assertEquals(rospy.AnyMsg, p.recv_data_class)
125 self.assertEquals(BIDIRECTIONAL, p.direction)
126 self.assertEquals({}, p.get_header_fields())
127 self.assertEquals(rospy.impl.tcpros_base.DEFAULT_BUFF_SIZE, p.buff_size)
129 v = random.randint(1, 100)
130 p = TCPROSTransportProtocol(
'Bob', rospy.AnyMsg, queue_size=v)
131 self.assertEquals(v, p.queue_size)
133 v = random.randint(1, 100)
134 p = TCPROSTransportProtocol(
'Bob', rospy.AnyMsg, buff_size=v)
135 self.assertEquals(v, p.buff_size)
138 import rospy.impl.tcpros_base
139 from rospy.impl.tcpros_base
import TCPROSTransport, TCPROSTransportProtocol
140 from rospy.impl.transport
import OUTBOUND
141 p = TCPROSTransportProtocol(
'Bob', rospy.AnyMsg)
142 p.direction = OUTBOUND
145 TCPROSTransport(p,
'')
146 self.fail(
"TCPROSTransport should not accept bad name")
147 except rospy.impl.tcpros_base.TransportInitError:
pass 149 t = TCPROSTransport(p,
'transport-name')
150 self.assert_(t.socket
is None)
151 self.assert_(t.md5sum
is None)
152 self.assert_(t.type
is None)
153 self.assertEquals(p, t.protocol)
154 self.assertEquals(
'TCPROS', t.transport_type)
155 self.assertEquals(OUTBOUND, t.direction)
156 self.assertEquals(
'unknown', t.endpoint_id)
157 self.assertEquals(b
'', t.read_buff.getvalue())
158 self.assertEquals(b
'', t.write_buff.getvalue())
161 t.set_socket(s,
'new_endpoint_id')
162 self.assertEquals(
'new_endpoint_id', t.endpoint_id)
163 self.assertEquals(s, t.socket)
166 self.assert_(t.socket
is None)
167 self.assert_(t.read_buff
is None)
168 self.assert_(t.write_buff
is None)
169 self.assert_(t.protocol
is None)
def test_TCPROSTransportProtocol(self)
def recv(self, buff_size)
def recv(self, buff_size)
def test_TCPROSTransport(self)