00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 import os
00035 import sys
00036 import socket
00037 import struct
00038 import unittest
00039 import time
00040
00041 class FakeSocket(object):
00042 def __init__(self):
00043 self.data = ''
00044 self.sockopt = None
00045 def fileno(self):
00046
00047 return 1
00048 def setblocking(self, *args):
00049 pass
00050 def setsockopt(self, *args):
00051 self.sockopt = args
00052 def send(self, d):
00053 self.data = self.data+d
00054 return len(d)
00055 def sendall(self, d):
00056 self.data = self.data+d
00057 def close(self):
00058 pass
00059 def getsockname(self):
00060 return (None, None)
00061
00062
00063 class TestRospyTcprosPubsub(unittest.TestCase):
00064
00065 def test_TCPROSSub(self):
00066 import rospy.impl.transport
00067 from rospy.impl.tcpros_pubsub import TCPROSSub
00068 import test_rospy.msg
00069
00070 callerid = 'test_TCPROSSub'
00071 import rospy.names
00072 rospy.names._set_caller_id(callerid)
00073
00074
00075 name = 'name-%s'%time.time()
00076 recv_data_class = test_rospy.msg.Val
00077 s = TCPROSSub(name, recv_data_class)
00078 self.assertEquals(name, s.resolved_name)
00079 self.assertEquals(rospy.impl.transport.INBOUND, s.direction)
00080 self.assertEquals(recv_data_class, s.recv_data_class)
00081 self.assert_(s.buff_size > -1)
00082 self.failIf(s.tcp_nodelay)
00083 self.assertEquals(None, s.queue_size)
00084
00085 fields = s.get_header_fields()
00086 self.assertEquals(name, fields['topic'])
00087 self.assertEquals(recv_data_class._md5sum, fields['md5sum'])
00088 self.assertEquals(recv_data_class._full_text, fields['message_definition'])
00089 self.assertEquals('test_rospy/Val', fields['type'])
00090 self.assert_(callerid, fields['callerid'])
00091 if 'tcp_nodelay' in fields:
00092 self.assertEquals('0', fields['tcp_nodelay'])
00093
00094 v = int(time.time())
00095 s = TCPROSSub(name, recv_data_class, queue_size=v)
00096 self.assertEquals(v, s.queue_size)
00097
00098 s = TCPROSSub(name, recv_data_class, buff_size=v)
00099 self.assertEquals(v, s.buff_size)
00100
00101 s = TCPROSSub(name, recv_data_class, tcp_nodelay=True)
00102 self.assert_(s.tcp_nodelay)
00103 self.assertEquals('1', s.get_header_fields()['tcp_nodelay'])
00104
00105 def test_TCPROSPub(self):
00106 import rospy.impl.transport
00107 from rospy.impl.tcpros_pubsub import TCPROSPub
00108 import test_rospy.msg
00109
00110 callerid = 'test_TCPROSPub'
00111 import rospy.names
00112 rospy.names._set_caller_id(callerid)
00113
00114
00115 name = 'name-%s'%time.time()
00116 pub_data_class = test_rospy.msg.Val
00117 p = TCPROSPub(name, pub_data_class)
00118 self.assertEquals(name, p.resolved_name)
00119 self.assertEquals(rospy.impl.transport.OUTBOUND, p.direction)
00120 self.assertEquals(pub_data_class, p.pub_data_class)
00121 self.assert_(p.buff_size > -1)
00122 self.failIf(p.is_latch)
00123
00124 fields = p.get_header_fields()
00125 self.assertEquals(name, fields['topic'])
00126 self.assertEquals(pub_data_class._md5sum, fields['md5sum'])
00127 self.assertEquals(pub_data_class._full_text, fields['message_definition'])
00128 self.assertEquals('test_rospy/Val', fields['type'])
00129 self.assert_(callerid, fields['callerid'])
00130 if 'latching' in fields:
00131 self.assertEquals('0', fields['latching'])
00132
00133 p = TCPROSPub(name, pub_data_class, is_latch=True)
00134 self.assert_(p.is_latch)
00135 self.assertEquals('1', p.get_header_fields()['latching'])
00136
00137
00138 p = TCPROSPub(name, pub_data_class, headers={'foo': 'bar', 'hoge': 'fuga'})
00139 fields = p.get_header_fields()
00140 self.assertEquals(name, fields['topic'])
00141 self.assertEquals('fuga', fields['hoge'])
00142 self.assertEquals('bar', fields['foo'])
00143
00144 def test_configure_pub_socket(self):
00145
00146 from rospy.impl.tcpros_pubsub import _configure_pub_socket
00147 import socket
00148 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
00149 _configure_pub_socket(sock, True)
00150 sock.close()
00151 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
00152 _configure_pub_socket(sock, False)
00153 sock.close()
00154
00155 def test_TCPROSHandler_topic_connection_handler(self):
00156
00157 import rospy
00158 import rospy.core
00159
00160 rospy.core._shutdown_flag = False
00161 rospy.core._in_shutdown = False
00162 from rospy.impl.registration import Registration
00163 from rospy.impl.tcpros_pubsub import TCPROSHandler
00164 import test_rospy.msg
00165
00166 handler = TCPROSHandler()
00167 tch = handler.topic_connection_handler
00168 sock = FakeSocket()
00169 client_addr = '127.0.0.1'
00170 data_class = test_rospy.msg.Val
00171 topic_name = '/foo-tch'
00172
00173 headers = { 'topic': topic_name, 'md5sum': data_class._md5sum, 'callerid': '/node'}
00174
00175 for k in headers.keys():
00176 header_copy = headers.copy()
00177 del header_copy[k]
00178 err = tch(sock, client_addr, header_copy)
00179 self.assertNotEquals('', err)
00180
00181
00182 err = tch(sock, client_addr, headers)
00183 self.assert_(err)
00184
00185
00186 tm = rospy.impl.registration.get_topic_manager()
00187 impl = tm.acquire_impl(Registration.PUB, topic_name, data_class)
00188 self.assert_(impl is not None)
00189
00190
00191 header_copy = headers.copy()
00192 header_copy['md5sum'] = 'bad'
00193 md5_err = tch(sock, client_addr, header_copy)
00194 self.assert_("md5sum" in md5_err, md5_err)
00195
00196
00197 err = tch(sock, client_addr, headers)
00198 self.failIf(err)
00199 self.assertEquals(None, sock.sockopt)
00200
00201
00202
00203 header_copy = headers.copy()
00204 header_copy['type'] = 'bad_type/Bad'
00205 err = tch(sock, client_addr, header_copy)
00206 self.failIf(err)
00207
00208 header_copy['md5sum'] = 'bad'
00209 type_md5_err = tch(sock, client_addr, header_copy)
00210 self.assert_("types" in type_md5_err, type_md5_err)
00211
00212
00213 self.assertNotEquals(md5_err, type_md5_err)
00214
00215
00216
00217 headers['tcp_nodelay'] = '0'
00218 err = tch(sock, client_addr, headers)
00219 self.failIf(err)
00220 self.assertEquals(None, sock.sockopt)
00221
00222
00223 headers['tcp_nodelay'] = '1'
00224 err = tch(sock, client_addr, headers)
00225 self.failIf(err)
00226 self.assertEquals((socket.IPPROTO_TCP, socket.TCP_NODELAY, 1), sock.sockopt)
00227
00228 impl.headers = {'foo': 'baz', 'hoge': 'fuga'}
00229 headers['tcp_nodelay'] = '0'
00230 err = tch(sock, client_addr, headers)
00231 self.failIf(err)
00232 connection = impl.connections[-1]
00233 fields = connection.protocol.get_header_fields()
00234 self.assertEquals(impl.resolved_name, fields['topic'])
00235 self.assertEquals('fuga', fields['hoge'])
00236 self.assertEquals('baz', fields['foo'])
00237