00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 import roslib; roslib.load_manifest('test_rospy')
00037
00038 import os
00039 import sys
00040 import struct
00041 import unittest
00042 import time
00043 import cStringIO
00044
00045 import rospy
00046
00047 class MockSock:
00048 def __init__(self, buff):
00049 self.buff = buff
00050 def recv(self, buff_size):
00051 return self.buff[:buff_size]
00052 def close(self):
00053 self.buff = None
00054 class MockEmptySock:
00055 def recv(self, buff_size):
00056 return ''
00057 def close(self):
00058 self.buff = None
00059
00060 class TestRospyTcprosBase(unittest.TestCase):
00061
00062 def test_constants(self):
00063 self.assertEquals("TCPROS", rospy.impl.tcpros_base.TCPROS)
00064 self.assert_(type(rospy.impl.tcpros_base.DEFAULT_BUFF_SIZE), int)
00065
00066 def test_recv_buff(self):
00067 from rospy.impl.tcpros_base import recv_buff
00068
00069
00070 buff = cStringIO.StringIO()
00071 try:
00072 recv_buff(MockEmptySock(), buff, 1)
00073 self.fail("recv_buff should have raised TransportTerminated")
00074 except rospy.impl.tcpros_base.TransportTerminated:
00075 self.assertEquals('', buff.getvalue())
00076
00077 self.assertEquals(5, recv_buff(MockSock('1234567890'), buff, 5))
00078 self.assertEquals('12345', buff.getvalue())
00079 buff = cStringIO.StringIO()
00080
00081 self.assertEquals(10, recv_buff(MockSock('1234567890'), buff, 100))
00082 self.assertEquals('1234567890', buff.getvalue())
00083
00084 def test_TCPServer(self):
00085 from rospy.impl.tcpros_base import TCPServer
00086 def handler(sock, addr):
00087 pass
00088 s = None
00089 try:
00090 s = TCPServer(handler)
00091 self.assert_(s.port > 0)
00092 addr, port = s.get_full_addr()
00093 self.assert_(type(addr) == str)
00094 self.assertEquals(handler, s.inbound_handler)
00095 self.failIf(s.is_shutdown)
00096 finally:
00097 if s is not None:
00098 s.shutdown()
00099 self.assert_(s.is_shutdown)
00100
00101 def test_TCPROSTransportProtocol(self):
00102 import rospy
00103 import random
00104
00105 from rospy.impl.tcpros_base import TCPROSTransportProtocol
00106 from rospy.impl.transport import BIDIRECTIONAL
00107
00108 p = TCPROSTransportProtocol('Bob', rospy.AnyMsg)
00109 self.assertEquals('Bob', p.resolved_name)
00110 self.assertEquals(rospy.AnyMsg, p.recv_data_class)
00111 self.assertEquals(BIDIRECTIONAL, p.direction)
00112 self.assertEquals({}, p.get_header_fields())
00113 self.assertEquals(rospy.impl.tcpros_base.DEFAULT_BUFF_SIZE, p.buff_size)
00114
00115 v = random.randint(1, 100)
00116 p = TCPROSTransportProtocol('Bob', rospy.AnyMsg, queue_size=v)
00117 self.assertEquals(v, p.queue_size)
00118
00119 v = random.randint(1, 100)
00120 p = TCPROSTransportProtocol('Bob', rospy.AnyMsg, buff_size=v)
00121 self.assertEquals(v, p.buff_size)
00122
00123 def test_TCPROSTransport(self):
00124 import rospy.impl.tcpros_base
00125 from rospy.impl.tcpros_base import TCPROSTransport, TCPROSTransportProtocol
00126 from rospy.impl.transport import OUTBOUND
00127 p = TCPROSTransportProtocol('Bob', rospy.AnyMsg)
00128 p.direction = OUTBOUND
00129
00130 try:
00131 TCPROSTransport(p, '')
00132 self.fail("TCPROSTransport should not accept bad name")
00133 except rospy.impl.tcpros_base.TransportInitError: pass
00134
00135 t = TCPROSTransport(p, 'transport-name')
00136 self.assert_(t.socket is None)
00137 self.assert_(t.md5sum is None)
00138 self.assert_(t.type is None)
00139 self.assertEquals(p, t.protocol)
00140 self.assertEquals('TCPROS', t.transport_type)
00141 self.assertEquals(OUTBOUND, t.direction)
00142 self.assertEquals('unknown', t.endpoint_id)
00143 self.assertEquals('', t.read_buff.getvalue())
00144 self.assertEquals('', t.write_buff.getvalue())
00145
00146 s = MockSock('12345')
00147 t.set_socket(s, 'new_endpoint_id')
00148 self.assertEquals('new_endpoint_id', t.endpoint_id)
00149 self.assertEquals(s, t.socket)
00150
00151 t.close()
00152 self.assert_(t.socket is None)
00153 self.assert_(t.read_buff is None)
00154 self.assert_(t.write_buff is None)
00155 self.assert_(t.protocol is None)
00156
00157 if __name__ == '__main__':
00158 import rostest
00159 rostest.unitrun('test_rospy', sys.argv[0], TestRospyTcprosBase, coverage_packages=['rospy.impl.tcpros_base'])