Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 import os
00035 import sys
00036 import struct
00037 import unittest
00038 import time
00039
00040 class TestRospyTransport(unittest.TestCase):
00041
00042 def test_Transport(self):
00043 from rospy.impl.transport import Transport, INBOUND, OUTBOUND, BIDIRECTIONAL
00044 ids = []
00045 for d in [INBOUND, OUTBOUND, BIDIRECTIONAL]:
00046 t = Transport(d)
00047 self.assertEquals(d, t.direction)
00048 self.assertEquals("UNKNOWN", t.transport_type)
00049 self.failIf(t.done)
00050 self.assertEquals(None, t.cleanup_cb)
00051 self.assertEquals('', t.endpoint_id)
00052 self.assertEquals('unnamed', t.name)
00053 self.assertEquals(0, t.stat_bytes)
00054 self.assertEquals(0, t.stat_num_msg)
00055 self.failIf(t.id in ids)
00056 ids.append(t.id)
00057
00058 t = Transport(d, 'a name')
00059 self.assertEquals(d, t.direction)
00060 self.assertEquals('a name', t.name)
00061
00062
00063 t = Transport(INBOUND)
00064 t.close()
00065 self.assert_(t.done)
00066 t = Transport(INBOUND)
00067
00068 self.cleanup_obj = None
00069 def cleanup(obj):
00070 self.cleanup_obj = obj
00071
00072 t.set_cleanup_callback(cleanup)
00073 self.assertEquals(t.cleanup_cb, cleanup)
00074 t.close()
00075 self.assert_(t.done)
00076 self.assertEquals(self.cleanup_obj, t)
00077
00078 t = Transport(OUTBOUND)
00079 import traceback
00080 try:
00081 t.send_message('msg', 1)
00082 self.fail("send_message() should be abstract")
00083 except:
00084 traceback.print_exc()
00085 try:
00086 t.write_data('data')
00087 self.fail("write_data() should be abstract")
00088 except:
00089 traceback.print_exc()
00090 t = Transport(INBOUND)
00091 try:
00092 t.receive_once()
00093 self.fail("receive_once() should be abstract")
00094 except: pass
00095 t = Transport(INBOUND)
00096 try:
00097 t.receive_loop()
00098 self.fail("receive_loop() should be abstract")
00099 except: pass
00100
00101 def test_DeadTransport(self):
00102 from rospy.impl.transport import Transport, DeadTransport, INBOUND, OUTBOUND, BIDIRECTIONAL
00103 t = Transport(INBOUND, 'foo')
00104 t.stat_bytes = 1234
00105 t.stat_num_msg = 5678
00106 dead = DeadTransport(t)
00107 self.assertEquals(INBOUND, dead.direction)
00108 self.assertEquals('foo', dead.name)
00109 self.assertEquals(1234, dead.stat_bytes)
00110 self.assertEquals(5678, dead.stat_num_msg)
00111 self.assertEquals(True, dead.done)
00112 self.assertEquals('', dead.endpoint_id)
00113
00114 t = Transport(OUTBOUND, 'bar')
00115 t.endpoint_id = 'blah blah'
00116 t.close()
00117 dead = DeadTransport(t)
00118 self.assertEquals(OUTBOUND, dead.direction)
00119 self.assertEquals('bar', dead.name)
00120 self.assertEquals(True, dead.done)
00121 self.assertEquals(t.endpoint_id, dead.endpoint_id)
00122
00123 def test_ProtocolHandler(self):
00124
00125 from rospy.impl.transport import ProtocolHandler
00126 h = ProtocolHandler()
00127 self.failIf(h.supports('TCPROS'))
00128 self.assertEquals([], h.get_supported())
00129 try:
00130 h.create_connection("/topic", 'http://localhost:1234', ['TCPROS'])
00131 self.fail("create_connection should raise an exception")
00132 except: pass
00133 try:
00134 h.init_publisher("/topic", 'TCPROS')
00135 self.fail("init_publisher raise an exception")
00136 except: pass
00137 h.shutdown()