Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 import os
00035 import sys
00036 import struct
00037 import unittest
00038 import time
00039
00040 class TestRospyTransport(unittest.TestCase):
00041
00042 def test_Transport(self):
00043 from rospy.impl.transport import Transport, INBOUND, OUTBOUND, BIDIRECTIONAL
00044 ids = []
00045 for d in [INBOUND, OUTBOUND, BIDIRECTIONAL]:
00046 t = Transport(d)
00047 self.assertEquals(d, t.direction)
00048 self.assertEquals("UNKNOWN", t.transport_type)
00049 self.failIf(t.done)
00050 self.assertEquals(None, t.cleanup_cb)
00051 self.assertEquals('', t.endpoint_id)
00052 self.assertEquals('unnamed', t.name)
00053 self.assertEquals(0, t.stat_bytes)
00054 self.assertEquals(0, t.stat_num_msg)
00055 self.failIf(t.id in ids)
00056 ids.append(t.id)
00057
00058 t = Transport(d, 'a name')
00059 self.assertEquals(d, t.direction)
00060 self.assertEquals('a name', t.name)
00061
00062
00063 t = Transport(INBOUND)
00064 t.close()
00065 self.assert_(t.done)
00066 t = Transport(INBOUND)
00067
00068 self.cleanup_obj = None
00069 def cleanup(obj):
00070 self.cleanup_obj = obj
00071
00072 t.set_cleanup_callback(cleanup)
00073 self.assertEquals(t.cleanup_cb, cleanup)
00074 t.close()
00075 self.assert_(t.done)
00076 self.assertEquals(self.cleanup_obj, t)
00077
00078 t = Transport(OUTBOUND)
00079 import traceback
00080 try:
00081 t.send_message('msg', 1)
00082 self.fail("send_message() should be abstract")
00083 except: pass
00084 try:
00085 t.write_data('data')
00086 self.fail("write_data() should be abstract")
00087 except: pass
00088 t = Transport(INBOUND)
00089 try:
00090 t.receive_once()
00091 self.fail("receive_once() should be abstract")
00092 except: pass
00093 t = Transport(INBOUND)
00094 try:
00095 t.receive_loop()
00096 self.fail("receive_loop() should be abstract")
00097 except: pass
00098
00099 def test_DeadTransport(self):
00100 from rospy.impl.transport import Transport, DeadTransport, INBOUND, OUTBOUND, BIDIRECTIONAL
00101 t = Transport(INBOUND, 'foo')
00102 t.stat_bytes = 1234
00103 t.stat_num_msg = 5678
00104 dead = DeadTransport(t)
00105 self.assertEquals(INBOUND, dead.direction)
00106 self.assertEquals('foo', dead.name)
00107 self.assertEquals(1234, dead.stat_bytes)
00108 self.assertEquals(5678, dead.stat_num_msg)
00109 self.assertEquals(True, dead.done)
00110 self.assertEquals('', dead.endpoint_id)
00111
00112 t = Transport(OUTBOUND, 'bar')
00113 t.endpoint_id = 'blah blah'
00114 t.close()
00115 dead = DeadTransport(t)
00116 self.assertEquals(OUTBOUND, dead.direction)
00117 self.assertEquals('bar', dead.name)
00118 self.assertEquals(True, dead.done)
00119 self.assertEquals(t.endpoint_id, dead.endpoint_id)
00120
00121 def test_ProtocolHandler(self):
00122
00123 from rospy.impl.transport import ProtocolHandler
00124 h = ProtocolHandler()
00125 self.failIf(h.supports('TCPROS'))
00126 self.assertEquals([], h.get_supported())
00127 try:
00128 h.create_connection("/topic", 'http://localhost:1234', ['TCPROS'])
00129 self.fail("create_connection should raise an exception")
00130 except: pass
00131 try:
00132 h.init_publisher("/topic", 'TCPROS')
00133 self.fail("init_publisher raise an exception")
00134 except: pass
00135 h.shutdown()