66 import rospy.impl.transport
67 from rospy.impl.tcpros_pubsub
import TCPROSSub
70 callerid =
'test_TCPROSSub' 72 rospy.names._set_caller_id(callerid)
75 name =
'name-%s'%time.time()
76 recv_data_class = test_rospy.msg.Val
77 s = TCPROSSub(name, recv_data_class)
78 self.assertEquals(name, s.resolved_name)
79 self.assertEquals(rospy.impl.transport.INBOUND, s.direction)
80 self.assertEquals(recv_data_class, s.recv_data_class)
81 self.assert_(s.buff_size > -1)
82 self.failIf(s.tcp_nodelay)
83 self.assertEquals(
None, s.queue_size)
85 fields = s.get_header_fields()
86 self.assertEquals(name, fields[
'topic'])
87 self.assertEquals(recv_data_class._md5sum, fields[
'md5sum'])
88 self.assertEquals(recv_data_class._full_text, fields[
'message_definition'])
89 self.assertEquals(
'test_rospy/Val', fields[
'type'])
90 self.assert_(callerid, fields[
'callerid'])
91 if 'tcp_nodelay' in fields:
92 self.assertEquals(
'0', fields[
'tcp_nodelay'])
95 s = TCPROSSub(name, recv_data_class, queue_size=v)
96 self.assertEquals(v, s.queue_size)
98 s = TCPROSSub(name, recv_data_class, buff_size=v)
99 self.assertEquals(v, s.buff_size)
101 s = TCPROSSub(name, recv_data_class, tcp_nodelay=
True)
102 self.assert_(s.tcp_nodelay)
103 self.assertEquals(
'1', s.get_header_fields()[
'tcp_nodelay'])
106 import rospy.impl.transport
107 from rospy.impl.tcpros_pubsub
import TCPROSPub
108 import test_rospy.msg
110 callerid =
'test_TCPROSPub' 112 rospy.names._set_caller_id(callerid)
115 name =
'name-%s'%time.time()
116 pub_data_class = test_rospy.msg.Val
117 p = TCPROSPub(name, pub_data_class)
118 self.assertEquals(name, p.resolved_name)
119 self.assertEquals(rospy.impl.transport.OUTBOUND, p.direction)
120 self.assertEquals(pub_data_class, p.pub_data_class)
121 self.assert_(p.buff_size > -1)
122 self.failIf(p.is_latch)
124 fields = p.get_header_fields()
125 self.assertEquals(name, fields[
'topic'])
126 self.assertEquals(pub_data_class._md5sum, fields[
'md5sum'])
127 self.assertEquals(pub_data_class._full_text, fields[
'message_definition'])
128 self.assertEquals(
'test_rospy/Val', fields[
'type'])
129 self.assert_(callerid, fields[
'callerid'])
130 if 'latching' in fields:
131 self.assertEquals(
'0', fields[
'latching'])
133 p = TCPROSPub(name, pub_data_class, is_latch=
True)
134 self.assert_(p.is_latch)
135 self.assertEquals(
'1', p.get_header_fields()[
'latching'])
138 p = TCPROSPub(name, pub_data_class, headers={
'foo':
'bar',
'hoge':
'fuga'})
139 fields = p.get_header_fields()
140 self.assertEquals(name, fields[
'topic'])
141 self.assertEquals(
'fuga', fields[
'hoge'])
142 self.assertEquals(
'bar', fields[
'foo'])
146 from rospy.impl.tcpros_pubsub
import _configure_pub_socket
148 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
149 _configure_pub_socket(sock,
True)
151 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
152 _configure_pub_socket(sock,
False)
160 rospy.core._shutdown_flag =
False 161 rospy.core._in_shutdown =
False 162 from rospy.impl.registration
import Registration
163 from rospy.impl.tcpros_pubsub
import TCPROSHandler
164 import test_rospy.msg
166 handler = TCPROSHandler()
167 tch = handler.topic_connection_handler
169 client_addr =
'127.0.0.1' 170 data_class = test_rospy.msg.Val
171 topic_name =
'/foo-tch' 173 headers = {
'topic': topic_name,
'md5sum': data_class._md5sum,
'callerid':
'/node'}
175 for k
in headers.keys():
176 header_copy = headers.copy()
178 err = tch(sock, client_addr, header_copy)
179 self.assertNotEquals(
'', err)
182 err = tch(sock, client_addr, headers)
186 tm = rospy.impl.registration.get_topic_manager()
187 impl = tm.acquire_impl(Registration.PUB, topic_name, data_class)
188 self.assert_(impl
is not None)
191 header_copy = headers.copy()
192 header_copy[
'md5sum'] =
'bad' 193 md5_err = tch(sock, client_addr, header_copy)
194 self.assert_(
"md5sum" in md5_err, md5_err)
197 err = tch(sock, client_addr, headers)
199 self.assertEquals(
None, sock.sockopt)
203 header_copy = headers.copy()
204 header_copy[
'type'] =
'bad_type/Bad' 205 err = tch(sock, client_addr, header_copy)
208 header_copy[
'md5sum'] =
'bad' 209 type_md5_err = tch(sock, client_addr, header_copy)
210 self.assert_(
"types" in type_md5_err, type_md5_err)
213 self.assertNotEquals(md5_err, type_md5_err)
217 headers[
'tcp_nodelay'] =
'0' 218 err = tch(sock, client_addr, headers)
220 self.assertEquals(
None, sock.sockopt)
223 headers[
'tcp_nodelay'] =
'1' 224 err = tch(sock, client_addr, headers)
226 self.assertEquals((socket.IPPROTO_TCP, socket.TCP_NODELAY, 1), sock.sockopt)
228 impl.headers = {
'foo':
'baz',
'hoge':
'fuga'}
229 headers[
'tcp_nodelay'] =
'0' 230 err = tch(sock, client_addr, headers)
232 connection = impl.connections[-1]
233 fields = connection.protocol.get_header_fields()
234 self.assertEquals(impl.resolved_name, fields[
'topic'])
235 self.assertEquals(
'fuga', fields[
'hoge'])
236 self.assertEquals(
'baz', fields[
'foo'])
def setsockopt(self, args)
def test_TCPROSHandler_topic_connection_handler(self)
def test_configure_pub_socket(self)
def setblocking(self, args)