40 from cStringIO
import StringIO
42 from io
import StringIO
57 return self.
buff[:buff_size]
77 self.assertEqual(
"TCPROS", rospy.impl.tcpros_base.TCPROS)
78 self.assertTrue(type(rospy.impl.tcpros_base.DEFAULT_BUFF_SIZE), int)
81 from rospy.impl.tcpros_base
import recv_buff
87 self.fail(
"recv_buff should have raised TransportTerminated")
88 except rospy.impl.tcpros_base.TransportTerminated:
89 self.assertEqual(
'', buff.getvalue())
91 self.assertEqual(5, recv_buff(
MockSock(
'1234567890'), buff, 5))
92 self.assertEqual(
'12345', buff.getvalue())
95 self.assertEqual(10, recv_buff(
MockSock(
'1234567890'), buff, 100))
96 self.assertEqual(
'1234567890', buff.getvalue())
99 from rospy.impl.tcpros_base
import TCPServer
100 def handler(sock, addr):
104 s = TCPServer(handler)
105 self.assertTrue(s.port > 0)
106 addr, port = s.get_full_addr()
107 self.assertTrue(type(addr) == str)
108 self.assertEqual(handler, s.inbound_handler)
109 self.assertFalse(s.is_shutdown)
113 self.assertTrue(s.is_shutdown)
119 from rospy.impl.tcpros_base
import TCPROSTransportProtocol
120 from rospy.impl.transport
import BIDIRECTIONAL
122 p = TCPROSTransportProtocol(
'Bob', rospy.AnyMsg)
123 self.assertEqual(
'Bob', p.resolved_name)
124 self.assertEqual(rospy.AnyMsg, p.recv_data_class)
125 self.assertEqual(BIDIRECTIONAL, p.direction)
126 self.assertEqual({}, p.get_header_fields())
127 self.assertEqual(rospy.impl.tcpros_base.DEFAULT_BUFF_SIZE, p.buff_size)
129 v = random.randint(1, 100)
130 p = TCPROSTransportProtocol(
'Bob', rospy.AnyMsg, queue_size=v)
131 self.assertEqual(v, p.queue_size)
133 v = random.randint(1, 100)
134 p = TCPROSTransportProtocol(
'Bob', rospy.AnyMsg, buff_size=v)
135 self.assertEqual(v, p.buff_size)
138 import rospy.impl.tcpros_base
139 from rospy.impl.tcpros_base
import TCPROSTransport, TCPROSTransportProtocol
140 from rospy.impl.transport
import OUTBOUND
141 p = TCPROSTransportProtocol(
'Bob', rospy.AnyMsg)
142 p.direction = OUTBOUND
145 TCPROSTransport(p,
'')
146 self.fail(
"TCPROSTransport should not accept bad name")
147 except rospy.impl.tcpros_base.TransportInitError:
pass
149 t = TCPROSTransport(p,
'transport-name')
150 self.assertTrue(t.socket
is None)
151 self.assertTrue(t.md5sum
is None)
152 self.assertTrue(t.type
is None)
153 self.assertEqual(p, t.protocol)
154 self.assertEqual(
'TCPROS', t.transport_type)
155 self.assertEqual(OUTBOUND, t.direction)
156 self.assertEqual(
'unknown', t.endpoint_id)
157 self.assertEqual(b
'', t.read_buff.getvalue())
158 self.assertEqual(b
'', t.write_buff.getvalue())
161 t.set_socket(s,
'new_endpoint_id')
162 self.assertEqual(
'new_endpoint_id', t.endpoint_id)
163 self.assertEqual(s, t.socket)
166 self.assertTrue(t.socket
is None)
167 self.assertTrue(t.read_buff
is None)
168 self.assertTrue(t.write_buff
is None)
169 self.assertTrue(t.protocol
is None)