00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 import roslib; roslib.load_manifest('test_rospy')
00037
00038 import os
00039 import sys
00040 import socket
00041 import struct
00042 import unittest
00043 import time
00044
00045 class FakeSocket(object):
00046 def __init__(self):
00047 self.data = ''
00048 self.sockopt = None
00049 def fileno(self):
00050
00051 return 1
00052 def setblocking(self, *args):
00053 pass
00054 def setsockopt(self, *args):
00055 self.sockopt = args
00056 def send(self, d):
00057 self.data = self.data+d
00058 return len(d)
00059 def sendall(self, d):
00060 self.data = self.data+d
00061 def close(self):
00062 pass
00063
00064
00065 class TestRospyTcprosPubsub(unittest.TestCase):
00066
00067 def test_TCPROSSub(self):
00068 import rospy.impl.transport
00069 from rospy.impl.tcpros_pubsub import TCPROSSub
00070 import test_rospy.msg
00071
00072 callerid = 'test_TCPROSSub'
00073 import rospy.names
00074 rospy.names._set_caller_id(callerid)
00075
00076
00077 name = 'name-%s'%time.time()
00078 recv_data_class = test_rospy.msg.Val
00079 s = TCPROSSub(name, recv_data_class)
00080 self.assertEquals(name, s.resolved_name)
00081 self.assertEquals(rospy.impl.transport.INBOUND, s.direction)
00082 self.assertEquals(recv_data_class, s.recv_data_class)
00083 self.assert_(s.buff_size > -1)
00084 self.failIf(s.tcp_nodelay)
00085 self.assertEquals(None, s.queue_size)
00086
00087 fields = s.get_header_fields()
00088 self.assertEquals(name, fields['topic'])
00089 self.assertEquals(recv_data_class._md5sum, fields['md5sum'])
00090 self.assertEquals(recv_data_class._full_text, fields['message_definition'])
00091 self.assertEquals('test_rospy/Val', fields['type'])
00092 self.assert_(callerid, fields['callerid'])
00093 if 'tcp_nodelay' in fields:
00094 self.assertEquals('0', fields['tcp_nodelay'])
00095
00096 v = int(time.time())
00097 s = TCPROSSub(name, recv_data_class, queue_size=v)
00098 self.assertEquals(v, s.queue_size)
00099
00100 s = TCPROSSub(name, recv_data_class, buff_size=v)
00101 self.assertEquals(v, s.buff_size)
00102
00103 s = TCPROSSub(name, recv_data_class, tcp_nodelay=True)
00104 self.assert_(s.tcp_nodelay)
00105 self.assertEquals('1', s.get_header_fields()['tcp_nodelay'])
00106
00107 def test_TCPROSPub(self):
00108 import rospy.impl.transport
00109 from rospy.impl.tcpros_pubsub import TCPROSPub
00110 import test_rospy.msg
00111
00112 callerid = 'test_TCPROSPub'
00113 import rospy.names
00114 rospy.names._set_caller_id(callerid)
00115
00116
00117 name = 'name-%s'%time.time()
00118 pub_data_class = test_rospy.msg.Val
00119 p = TCPROSPub(name, pub_data_class)
00120 self.assertEquals(name, p.resolved_name)
00121 self.assertEquals(rospy.impl.transport.OUTBOUND, p.direction)
00122 self.assertEquals(pub_data_class, p.pub_data_class)
00123 self.assert_(p.buff_size > -1)
00124 self.failIf(p.is_latch)
00125
00126 fields = p.get_header_fields()
00127 self.assertEquals(name, fields['topic'])
00128 self.assertEquals(pub_data_class._md5sum, fields['md5sum'])
00129 self.assertEquals(pub_data_class._full_text, fields['message_definition'])
00130 self.assertEquals('test_rospy/Val', fields['type'])
00131 self.assert_(callerid, fields['callerid'])
00132 if 'latching' in fields:
00133 self.assertEquals('0', fields['latching'])
00134
00135 p = TCPROSPub(name, pub_data_class, is_latch=True)
00136 self.assert_(p.is_latch)
00137 self.assertEquals('1', p.get_header_fields()['latching'])
00138
00139
00140 p = TCPROSPub(name, pub_data_class, headers={'foo': 'bar', 'hoge': 'fuga'})
00141 fields = p.get_header_fields()
00142 self.assertEquals(name, fields['topic'])
00143 self.assertEquals('fuga', fields['hoge'])
00144 self.assertEquals('bar', fields['foo'])
00145
00146 def test_configure_pub_socket(self):
00147
00148 from rospy.impl.tcpros_pubsub import _configure_pub_socket
00149 import socket
00150 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
00151 _configure_pub_socket(sock, True)
00152 sock.close()
00153 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
00154 _configure_pub_socket(sock, False)
00155 sock.close()
00156
00157 def test_TCPROSHandler_topic_connection_handler(self):
00158
00159 import rospy
00160 from rospy.impl.registration import Registration
00161 from rospy.impl.tcpros_pubsub import TCPROSHandler
00162 import test_rospy.msg
00163
00164 handler = TCPROSHandler()
00165 tch = handler.topic_connection_handler
00166 sock = FakeSocket()
00167 client_addr = '127.0.0.1'
00168 data_class = test_rospy.msg.Val
00169 topic_name = '/foo-tch'
00170
00171 headers = { 'topic': topic_name, 'md5sum': data_class._md5sum, 'callerid': '/node'}
00172
00173 for k in headers.iterkeys():
00174 header_copy = headers.copy()
00175 del header_copy[k]
00176 err = tch(sock, client_addr, header_copy)
00177 self.assertNotEquals('', err)
00178
00179
00180 err = tch(sock, client_addr, headers)
00181 self.assert_(err)
00182
00183
00184 tm = rospy.impl.registration.get_topic_manager()
00185 impl = tm.acquire_impl(Registration.PUB, topic_name, data_class)
00186 self.assert_(impl is not None)
00187
00188
00189 header_copy = headers.copy()
00190 header_copy['md5sum'] = 'bad'
00191 md5_err = tch(sock, client_addr, header_copy)
00192 self.assert_("md5sum" in md5_err, md5_err)
00193
00194
00195 err = tch(sock, client_addr, headers)
00196 self.failIf(err)
00197 self.assertEquals(None, sock.sockopt)
00198
00199
00200
00201 header_copy = headers.copy()
00202 header_copy['type'] = 'bad_type/Bad'
00203 err = tch(sock, client_addr, header_copy)
00204 self.failIf(err)
00205
00206 header_copy['md5sum'] = 'bad'
00207 type_md5_err = tch(sock, client_addr, header_copy)
00208 self.assert_("types" in type_md5_err, type_md5_err)
00209
00210
00211 self.assertNotEquals(md5_err, type_md5_err)
00212
00213
00214
00215 headers['tcp_nodelay'] = '0'
00216 err = tch(sock, client_addr, headers)
00217 self.failIf(err)
00218 self.assertEquals(None, sock.sockopt)
00219
00220
00221 headers['tcp_nodelay'] = '1'
00222 err = tch(sock, client_addr, headers)
00223 self.failIf(err)
00224 self.assertEquals((socket.IPPROTO_TCP, socket.TCP_NODELAY, 1), sock.sockopt)
00225
00226 impl.headers = {'foo': 'baz', 'hoge': 'fuga'}
00227 headers['tcp_nodelay'] = '0'
00228 err = tch(sock, client_addr, headers)
00229 self.failIf(err)
00230 connection = impl.connections[-1]
00231 fields = connection.protocol.get_header_fields()
00232 self.assertEquals(impl.resolved_name, fields['topic'])
00233 self.assertEquals('fuga', fields['hoge'])
00234 self.assertEquals('baz', fields['foo'])
00235
00236 if __name__ == '__main__':
00237 import rostest
00238 rostest.unitrun('test_rospy', 'test_rospy_tcpros_pubsub', TestRospyTcprosPubsub, coverage_packages=['rospy.impl.tcpros_pubsub'])