00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 import os
00035 import sys
00036 import socket
00037 import struct
00038 import unittest
00039 import time
00040
00041 class FakeSocket(object):
00042 def __init__(self):
00043 self.data = ''
00044 self.sockopt = None
00045 def fileno(self):
00046
00047 return 1
00048 def setblocking(self, *args):
00049 pass
00050 def setsockopt(self, *args):
00051 self.sockopt = args
00052 def send(self, d):
00053 self.data = self.data+d
00054 return len(d)
00055 def sendall(self, d):
00056 self.data = self.data+d
00057 def close(self):
00058 pass
00059
00060
00061 class TestRospyTcprosPubsub(unittest.TestCase):
00062
00063 def test_TCPROSSub(self):
00064 import rospy.impl.transport
00065 from rospy.impl.tcpros_pubsub import TCPROSSub
00066 import test_rospy.msg
00067
00068 callerid = 'test_TCPROSSub'
00069 import rospy.names
00070 rospy.names._set_caller_id(callerid)
00071
00072
00073 name = 'name-%s'%time.time()
00074 recv_data_class = test_rospy.msg.Val
00075 s = TCPROSSub(name, recv_data_class)
00076 self.assertEquals(name, s.resolved_name)
00077 self.assertEquals(rospy.impl.transport.INBOUND, s.direction)
00078 self.assertEquals(recv_data_class, s.recv_data_class)
00079 self.assert_(s.buff_size > -1)
00080 self.failIf(s.tcp_nodelay)
00081 self.assertEquals(None, s.queue_size)
00082
00083 fields = s.get_header_fields()
00084 self.assertEquals(name, fields['topic'])
00085 self.assertEquals(recv_data_class._md5sum, fields['md5sum'])
00086 self.assertEquals(recv_data_class._full_text, fields['message_definition'])
00087 self.assertEquals('test_rospy/Val', fields['type'])
00088 self.assert_(callerid, fields['callerid'])
00089 if 'tcp_nodelay' in fields:
00090 self.assertEquals('0', fields['tcp_nodelay'])
00091
00092 v = int(time.time())
00093 s = TCPROSSub(name, recv_data_class, queue_size=v)
00094 self.assertEquals(v, s.queue_size)
00095
00096 s = TCPROSSub(name, recv_data_class, buff_size=v)
00097 self.assertEquals(v, s.buff_size)
00098
00099 s = TCPROSSub(name, recv_data_class, tcp_nodelay=True)
00100 self.assert_(s.tcp_nodelay)
00101 self.assertEquals('1', s.get_header_fields()['tcp_nodelay'])
00102
00103 def test_TCPROSPub(self):
00104 import rospy.impl.transport
00105 from rospy.impl.tcpros_pubsub import TCPROSPub
00106 import test_rospy.msg
00107
00108 callerid = 'test_TCPROSPub'
00109 import rospy.names
00110 rospy.names._set_caller_id(callerid)
00111
00112
00113 name = 'name-%s'%time.time()
00114 pub_data_class = test_rospy.msg.Val
00115 p = TCPROSPub(name, pub_data_class)
00116 self.assertEquals(name, p.resolved_name)
00117 self.assertEquals(rospy.impl.transport.OUTBOUND, p.direction)
00118 self.assertEquals(pub_data_class, p.pub_data_class)
00119 self.assert_(p.buff_size > -1)
00120 self.failIf(p.is_latch)
00121
00122 fields = p.get_header_fields()
00123 self.assertEquals(name, fields['topic'])
00124 self.assertEquals(pub_data_class._md5sum, fields['md5sum'])
00125 self.assertEquals(pub_data_class._full_text, fields['message_definition'])
00126 self.assertEquals('test_rospy/Val', fields['type'])
00127 self.assert_(callerid, fields['callerid'])
00128 if 'latching' in fields:
00129 self.assertEquals('0', fields['latching'])
00130
00131 p = TCPROSPub(name, pub_data_class, is_latch=True)
00132 self.assert_(p.is_latch)
00133 self.assertEquals('1', p.get_header_fields()['latching'])
00134
00135
00136 p = TCPROSPub(name, pub_data_class, headers={'foo': 'bar', 'hoge': 'fuga'})
00137 fields = p.get_header_fields()
00138 self.assertEquals(name, fields['topic'])
00139 self.assertEquals('fuga', fields['hoge'])
00140 self.assertEquals('bar', fields['foo'])
00141
00142 def test_configure_pub_socket(self):
00143
00144 from rospy.impl.tcpros_pubsub import _configure_pub_socket
00145 import socket
00146 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
00147 _configure_pub_socket(sock, True)
00148 sock.close()
00149 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
00150 _configure_pub_socket(sock, False)
00151 sock.close()
00152
00153 def test_TCPROSHandler_topic_connection_handler(self):
00154
00155 import rospy
00156 import rospy.core
00157
00158 rospy.core._shutdown_flag = False
00159 rospy.core._in_shutdown = False
00160 from rospy.impl.registration import Registration
00161 from rospy.impl.tcpros_pubsub import TCPROSHandler
00162 import test_rospy.msg
00163
00164 handler = TCPROSHandler()
00165 tch = handler.topic_connection_handler
00166 sock = FakeSocket()
00167 client_addr = '127.0.0.1'
00168 data_class = test_rospy.msg.Val
00169 topic_name = '/foo-tch'
00170
00171 headers = { 'topic': topic_name, 'md5sum': data_class._md5sum, 'callerid': '/node'}
00172
00173 for k in headers.iterkeys():
00174 header_copy = headers.copy()
00175 del header_copy[k]
00176 err = tch(sock, client_addr, header_copy)
00177 self.assertNotEquals('', err)
00178
00179
00180 err = tch(sock, client_addr, headers)
00181 self.assert_(err)
00182
00183
00184 tm = rospy.impl.registration.get_topic_manager()
00185 impl = tm.acquire_impl(Registration.PUB, topic_name, data_class)
00186 self.assert_(impl is not None)
00187
00188
00189 header_copy = headers.copy()
00190 header_copy['md5sum'] = 'bad'
00191 md5_err = tch(sock, client_addr, header_copy)
00192 self.assert_("md5sum" in md5_err, md5_err)
00193
00194
00195 err = tch(sock, client_addr, headers)
00196 self.failIf(err)
00197 self.assertEquals(None, sock.sockopt)
00198
00199
00200
00201 header_copy = headers.copy()
00202 header_copy['type'] = 'bad_type/Bad'
00203 err = tch(sock, client_addr, header_copy)
00204 self.failIf(err)
00205
00206 header_copy['md5sum'] = 'bad'
00207 type_md5_err = tch(sock, client_addr, header_copy)
00208 self.assert_("types" in type_md5_err, type_md5_err)
00209
00210
00211 self.assertNotEquals(md5_err, type_md5_err)
00212
00213
00214
00215 headers['tcp_nodelay'] = '0'
00216 err = tch(sock, client_addr, headers)
00217 self.failIf(err)
00218 self.assertEquals(None, sock.sockopt)
00219
00220
00221 headers['tcp_nodelay'] = '1'
00222 err = tch(sock, client_addr, headers)
00223 self.failIf(err)
00224 self.assertEquals((socket.IPPROTO_TCP, socket.TCP_NODELAY, 1), sock.sockopt)
00225
00226 impl.headers = {'foo': 'baz', 'hoge': 'fuga'}
00227 headers['tcp_nodelay'] = '0'
00228 err = tch(sock, client_addr, headers)
00229 self.failIf(err)
00230 connection = impl.connections[-1]
00231 fields = connection.protocol.get_header_fields()
00232 self.assertEquals(impl.resolved_name, fields['topic'])
00233 self.assertEquals('fuga', fields['hoge'])
00234 self.assertEquals('baz', fields['foo'])
00235