43 from rospy.impl.transport
import Transport, INBOUND, OUTBOUND, BIDIRECTIONAL
45 for d
in [INBOUND, OUTBOUND, BIDIRECTIONAL]:
47 self.assertEquals(d, t.direction)
48 self.assertEquals(
"UNKNOWN", t.transport_type)
50 self.assertEquals(
None, t.cleanup_cb)
51 self.assertEquals(
'', t.endpoint_id)
52 self.assertEquals(
'unnamed', t.name)
53 self.assertEquals(0, t.stat_bytes)
54 self.assertEquals(0, t.stat_num_msg)
55 self.failIf(t.id
in ids)
58 t = Transport(d,
'a name')
59 self.assertEquals(d, t.direction)
60 self.assertEquals(
'a name', t.name)
63 t = Transport(INBOUND)
66 t = Transport(INBOUND)
72 t.set_cleanup_callback(cleanup)
73 self.assertEquals(t.cleanup_cb, cleanup)
78 t = Transport(OUTBOUND)
81 t.send_message(
'msg', 1)
82 self.fail(
"send_message() should be abstract")
86 self.fail(
"write_data() should be abstract")
88 t = Transport(INBOUND)
91 self.fail(
"receive_once() should be abstract")
93 t = Transport(INBOUND)
96 self.fail(
"receive_loop() should be abstract")
100 from rospy.impl.transport
import Transport, DeadTransport, INBOUND, OUTBOUND, BIDIRECTIONAL
101 t = Transport(INBOUND,
'foo')
103 t.stat_num_msg = 5678
104 dead = DeadTransport(t)
105 self.assertEquals(INBOUND, dead.direction)
106 self.assertEquals(
'foo', dead.name)
107 self.assertEquals(1234, dead.stat_bytes)
108 self.assertEquals(5678, dead.stat_num_msg)
109 self.assertEquals(
True, dead.done)
110 self.assertEquals(
'', dead.endpoint_id)
112 t = Transport(OUTBOUND,
'bar')
113 t.endpoint_id =
'blah blah' 115 dead = DeadTransport(t)
116 self.assertEquals(OUTBOUND, dead.direction)
117 self.assertEquals(
'bar', dead.name)
118 self.assertEquals(
True, dead.done)
119 self.assertEquals(t.endpoint_id, dead.endpoint_id)
123 from rospy.impl.transport
import ProtocolHandler
124 h = ProtocolHandler()
125 self.failIf(h.supports(
'TCPROS'))
126 self.assertEquals([], h.get_supported())
128 h.create_connection(
"/topic",
'http://localhost:1234', [
'TCPROS'])
129 self.fail(
"create_connection should raise an exception")
132 h.init_publisher(
"/topic",
'TCPROS')
133 self.fail(
"init_publisher raise an exception")
def test_ProtocolHandler(self)
def test_DeadTransport(self)