43 from rospy.impl.transport
import Transport, INBOUND, OUTBOUND, BIDIRECTIONAL
45 for d
in [INBOUND, OUTBOUND, BIDIRECTIONAL]:
47 self.assertEqual(d, t.direction)
48 self.assertEqual(
"UNKNOWN", t.transport_type)
49 self.assertFalse(t.done)
50 self.assertEqual(
None, t.cleanup_cb)
51 self.assertEqual(
'', t.endpoint_id)
52 self.assertEqual(
'unnamed', t.name)
53 self.assertEqual(0, t.stat_bytes)
54 self.assertEqual(0, t.stat_num_msg)
55 self.assertFalse(t.id
in ids)
58 t = Transport(d,
'a name')
59 self.assertEqual(d, t.direction)
60 self.assertEqual(
'a name', t.name)
63 t = Transport(INBOUND)
65 self.assertTrue(t.done)
66 t = Transport(INBOUND)
72 t.set_cleanup_callback(cleanup)
73 self.assertEqual(t.cleanup_cb, cleanup)
75 self.assertTrue(t.done)
78 t = Transport(OUTBOUND)
81 t.send_message(
'msg', 1)
82 self.fail(
"send_message() should be abstract")
86 self.fail(
"write_data() should be abstract")
88 t = Transport(INBOUND)
91 self.fail(
"receive_once() should be abstract")
93 t = Transport(INBOUND)
96 self.fail(
"receive_loop() should be abstract")
100 from rospy.impl.transport
import Transport, DeadTransport, INBOUND, OUTBOUND, BIDIRECTIONAL
101 t = Transport(INBOUND,
'foo')
103 t.stat_num_msg = 5678
104 dead = DeadTransport(t)
105 self.assertEqual(INBOUND, dead.direction)
106 self.assertEqual(
'foo', dead.name)
107 self.assertEqual(1234, dead.stat_bytes)
108 self.assertEqual(5678, dead.stat_num_msg)
109 self.assertEqual(
True, dead.done)
110 self.assertEqual(
'', dead.endpoint_id)
112 t = Transport(OUTBOUND,
'bar')
113 t.endpoint_id =
'blah blah'
115 dead = DeadTransport(t)
116 self.assertEqual(OUTBOUND, dead.direction)
117 self.assertEqual(
'bar', dead.name)
118 self.assertEqual(
True, dead.done)
119 self.assertEqual(t.endpoint_id, dead.endpoint_id)
123 from rospy.impl.transport
import ProtocolHandler
124 h = ProtocolHandler()
125 self.assertFalse(h.supports(
'TCPROS'))
126 self.assertEqual([], h.get_supported())
128 h.create_connection(
"/topic",
'http://localhost:1234', [
'TCPROS'])
129 self.fail(
"create_connection should raise an exception")
132 h.init_publisher(
"/topic",
'TCPROS')
133 self.fail(
"init_publisher raise an exception")