test_rospy_tcpros_pubsub.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2008, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 
34 import os
35 import sys
36 import socket
37 import struct
38 import unittest
39 import time
40 
41 class FakeSocket(object):
42  def __init__(self):
43  self.data = b''
44  self.sockopt = None
45  def fileno(self):
46  # fool select logic by giving it stdout fileno
47  return 1
48  def setblocking(self, *args):
49  pass
50  def setsockopt(self, *args):
51  self.sockopt = args
52  def send(self, d):
53  self.data = self.data+d
54  return len(d)
55  def sendall(self, d):
56  self.data = self.data+d
57  def close(self):
58  pass
59  def getsockname(self):
60  return (None, None)
61 
62 # test rospy API verifies that the rospy module exports the required symbols
63 class TestRospyTcprosPubsub(unittest.TestCase):
64 
65  def test_TCPROSSub(self):
66  import rospy.impl.transport
67  from rospy.impl.tcpros_pubsub import TCPROSSub
68  import test_rospy.msg
69 
70  callerid = 'test_TCPROSSub'
71  import rospy.names
72  rospy.names._set_caller_id(callerid)
73 
74  #name, recv_data_class, queue_size=None, buff_size=DEFAULT_BUFF_SIZE
75  name = 'name-%s'%time.time()
76  recv_data_class = test_rospy.msg.Val
77  s = TCPROSSub(name, recv_data_class)
78  self.assertEquals(name, s.resolved_name)
79  self.assertEquals(rospy.impl.transport.INBOUND, s.direction)
80  self.assertEquals(recv_data_class, s.recv_data_class)
81  self.assert_(s.buff_size > -1)
82  self.failIf(s.tcp_nodelay)
83  self.assertEquals(None, s.queue_size)
84 
85  fields = s.get_header_fields()
86  self.assertEquals(name, fields['topic'])
87  self.assertEquals(recv_data_class._md5sum, fields['md5sum'])
88  self.assertEquals(recv_data_class._full_text, fields['message_definition'])
89  self.assertEquals('test_rospy/Val', fields['type'])
90  self.assert_(callerid, fields['callerid'])
91  if 'tcp_nodelay' in fields:
92  self.assertEquals('0', fields['tcp_nodelay'])
93 
94  v = int(time.time())
95  s = TCPROSSub(name, recv_data_class, queue_size=v)
96  self.assertEquals(v, s.queue_size)
97 
98  s = TCPROSSub(name, recv_data_class, buff_size=v)
99  self.assertEquals(v, s.buff_size)
100 
101  s = TCPROSSub(name, recv_data_class, tcp_nodelay=True)
102  self.assert_(s.tcp_nodelay)
103  self.assertEquals('1', s.get_header_fields()['tcp_nodelay'])
104 
105  def test_TCPROSPub(self):
106  import rospy.impl.transport
107  from rospy.impl.tcpros_pubsub import TCPROSPub
108  import test_rospy.msg
109 
110  callerid = 'test_TCPROSPub'
111  import rospy.names
112  rospy.names._set_caller_id(callerid)
113 
114  #name, pub_data_class
115  name = 'name-%s'%time.time()
116  pub_data_class = test_rospy.msg.Val
117  p = TCPROSPub(name, pub_data_class)
118  self.assertEquals(name, p.resolved_name)
119  self.assertEquals(rospy.impl.transport.OUTBOUND, p.direction)
120  self.assertEquals(pub_data_class, p.pub_data_class)
121  self.assert_(p.buff_size > -1)
122  self.failIf(p.is_latch)
123 
124  fields = p.get_header_fields()
125  self.assertEquals(name, fields['topic'])
126  self.assertEquals(pub_data_class._md5sum, fields['md5sum'])
127  self.assertEquals(pub_data_class._full_text, fields['message_definition'])
128  self.assertEquals('test_rospy/Val', fields['type'])
129  self.assert_(callerid, fields['callerid'])
130  if 'latching' in fields:
131  self.assertEquals('0', fields['latching'])
132 
133  p = TCPROSPub(name, pub_data_class, is_latch=True)
134  self.assert_(p.is_latch)
135  self.assertEquals('1', p.get_header_fields()['latching'])
136 
137  # test additional header fields
138  p = TCPROSPub(name, pub_data_class, headers={'foo': 'bar', 'hoge': 'fuga'})
139  fields = p.get_header_fields()
140  self.assertEquals(name, fields['topic'])
141  self.assertEquals('fuga', fields['hoge'])
142  self.assertEquals('bar', fields['foo'])
143 
145  # #1241 regression test to make sure that imports don't get messed up again
146  from rospy.impl.tcpros_pubsub import _configure_pub_socket
147  import socket
148  sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
149  _configure_pub_socket(sock, True)
150  sock.close()
151  sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
152  _configure_pub_socket(sock, False)
153  sock.close()
154 
156 
157  import rospy
158  import rospy.core
159  # very ugly hack to handle bad design choice in rospy and bad isolation inside of nose
160  rospy.core._shutdown_flag = False
161  rospy.core._in_shutdown = False
162  from rospy.impl.registration import Registration
163  from rospy.impl.tcpros_pubsub import TCPROSHandler
164  import test_rospy.msg
165 
166  handler = TCPROSHandler()
167  tch = handler.topic_connection_handler
168  sock = FakeSocket()
169  client_addr = '127.0.0.1'
170  data_class = test_rospy.msg.Val
171  topic_name = '/foo-tch'
172 
173  headers = { 'topic': topic_name, 'md5sum': data_class._md5sum, 'callerid': '/node'}
174  # test required logic
175  for k in headers.keys():
176  header_copy = headers.copy()
177  del header_copy[k]
178  err = tch(sock, client_addr, header_copy)
179  self.assertNotEquals('', err)
180 
181  # '/foo-tch' is not registered, so this should error
182  err = tch(sock, client_addr, headers)
183  self.assert_(err)
184 
185  # register '/foo-tch'
186  tm = rospy.impl.registration.get_topic_manager()
187  impl = tm.acquire_impl(Registration.PUB, topic_name, data_class)
188  self.assert_(impl is not None)
189 
190  # test with mismatched md5sum
191  header_copy = headers.copy()
192  header_copy['md5sum'] = 'bad'
193  md5_err = tch(sock, client_addr, header_copy)
194  self.assert_("md5sum" in md5_err, md5_err)
195 
196  # now test with correct params
197  err = tch(sock, client_addr, headers)
198  self.failIf(err)
199  self.assertEquals(None, sock.sockopt)
200 
201  # test with mismatched type
202  # - if md5sums match, this should not error
203  header_copy = headers.copy()
204  header_copy['type'] = 'bad_type/Bad'
205  err = tch(sock, client_addr, header_copy)
206  self.failIf(err)
207  # - now give mismatched md5sum, this will test different error message
208  header_copy['md5sum'] = 'bad'
209  type_md5_err = tch(sock, client_addr, header_copy)
210  self.assert_("types" in type_md5_err, type_md5_err)
211 
212  # - these error messages should be different
213  self.assertNotEquals(md5_err, type_md5_err)
214 
215  # test tcp_nodelay
216  # - should be equivalent to above
217  headers['tcp_nodelay'] = '0'
218  err = tch(sock, client_addr, headers)
219  self.failIf(err)
220  self.assertEquals(None, sock.sockopt)
221 
222  # - now test actual sock opt
223  headers['tcp_nodelay'] = '1'
224  err = tch(sock, client_addr, headers)
225  self.failIf(err)
226  self.assertEquals((socket.IPPROTO_TCP, socket.TCP_NODELAY, 1), sock.sockopt)
227  # test connection headers
228  impl.headers = {'foo': 'baz', 'hoge': 'fuga'}
229  headers['tcp_nodelay'] = '0'
230  err = tch(sock, client_addr, headers)
231  self.failIf(err)
232  connection = impl.connections[-1]
233  fields = connection.protocol.get_header_fields()
234  self.assertEquals(impl.resolved_name, fields['topic'])
235  self.assertEquals('fuga', fields['hoge'])
236  self.assertEquals('baz', fields['foo'])
237 


test_rospy
Author(s): Ken Conley
autogenerated on Sun Feb 3 2019 03:30:22