test_rospy_topics.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2008, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 
34 import os
35 import sys
36 import struct
37 import unittest
38 import time
39 
40 import test_rospy.msg
41 
42 def callback1(data): pass
43 def callback2(data): pass
44 
45 # for use with publish() tests
46 import rospy.impl.transport
47 class ConnectionOverride(rospy.impl.transport.Transport):
48  def __init__(self, endpoint_id):
49  super(ConnectionOverride, self).__init__(rospy.impl.transport.OUTBOUND, endpoint_id)
50  self.endpoint_id = endpoint_id
51  self.data = ''
52 
53  def set_cleanup_callback(self, cb): pass
54  def write_data(self, data):
55  self.data = self.data + data
56 
57 # test rospy API verifies that the rospy module exports the required symbols
58 class TestRospyTopics(unittest.TestCase):
59 
61  from rospy.topics import _TopicImpl
62  from std_msgs.msg import String
63  t = _TopicImpl('/foo', String)
64  c1 = ConnectionOverride('c1')
65  c1b = ConnectionOverride('c1')
66  c2 = ConnectionOverride('c2')
67  c3 = ConnectionOverride('c3')
68  assert not t.has_connection('c1')
69  assert t.get_num_connections() == 0
70  t.add_connection(c1)
71  assert t.get_num_connections() == 1
72  assert t.has_connection('c1')
73  t.add_connection(c1b)
74  assert t.get_num_connections() == 1
75  assert t.has_connection('c1')
76  # should not remove
77  t.remove_connection(c1)
78  assert t.has_connection('c1')
79  t.remove_connection(c1b)
80  assert not t.has_connection('c1')
81 
82  t.close()
83  assert t.get_num_connections() == 0
84 
85  t = _TopicImpl('/foo', String)
86  t.add_connection(c1)
87  t.add_connection(c2)
88  t.add_connection(c3)
89  for x in ['c1', 'c2', 'c3']:
90  assert t.has_connection(x)
91  assert t.get_num_connections() == 3
92 
93  t.close()
94  assert t.get_num_connections() == 0
95 
96  def test_Publisher(self):
97  import rospy
98  from rospy.impl.registration import get_topic_manager, Registration
99  from rospy.topics import Publisher, DEFAULT_BUFF_SIZE
100  # Publisher(self, name, data_class, subscriber_listener=None, tcp_nodelay=False, latch=False, headers=None)
101 
102  name = 'foo'
103  rname = rospy.resolve_name('foo')
104  data_class = test_rospy.msg.Val
105 
106  # test invalid params
107  for n in [None, '', 1]:
108  try:
109  Publisher(n, data_class)
110  self.fail("should not allow invalid name")
111  except ValueError: pass
112  for d in [None, 1, TestRospyTopics]:
113  try:
114  Publisher(name, d)
115  self.fail("should now allow invalid data_class")
116  except ValueError: pass
117  try:
118  Publisher(name, None)
119  self.fail("None should not be allowed for data_class")
120  except ValueError: pass
121 
122  # round 1: test basic params
123  pub = Publisher(name, data_class)
124  self.assertEquals(rname, pub.resolved_name)
125  # - pub.name is left in for backwards compatiblity, but resolved_name is preferred
126  self.assertEquals(rname, pub.name)
127  self.assertEquals(data_class, pub.data_class)
128  self.assertEquals('test_rospy/Val', pub.type)
129  self.assertEquals(data_class._md5sum, pub.md5sum)
130  self.assertEquals(Registration.PUB, pub.reg_type)
131 
132  # verify impl as well
133  impl = get_topic_manager().get_impl(Registration.PUB, rname)
134  self.assert_(impl == pub.impl)
135  self.assertEquals(rname, impl.resolved_name)
136  self.assertEquals(data_class, impl.data_class)
137  self.failIf(impl.is_latch)
138  self.assertEquals(None, impl.latch)
139  self.assertEquals(0, impl.seq)
140  self.assertEquals(1, impl.ref_count)
141  self.assertEquals(b'', impl.buff.getvalue())
142  self.failIf(impl.closed)
143  self.failIf(impl.has_connections())
144  # check publish() fall-through
145  from test_rospy.msg import Val
146  impl.publish(Val('hello world-1'))
147 
148  # check stats
149  self.assertEquals(0, impl.message_data_sent)
150  # check acquire/release don't bomb
151  impl.acquire()
152  impl.release()
153 
154  # do a single publish with connection override. The connection
155  # override is a major cheat as the object isn't even an actual
156  # connection. I will need to add more integrated tests later
157  co1 = ConnectionOverride('co1')
158  self.failIf(impl.has_connection('co1'))
159  impl.add_connection(co1)
160  self.assert_(impl.has_connection('co1'))
161  self.assert_(impl.has_connections())
162  impl.publish(Val('hello world-1'), connection_override=co1)
163 
164  try:
165  from cStringIO import StringIO
166  except ImportError:
167  from io import StringIO
168  buff = StringIO()
169  Val('hello world-1').serialize(buff)
170  # - check equals, but strip length field first
171  self.assertEquals(co1.data[4:], buff.getvalue())
172  self.assertEquals(None, impl.latch)
173 
174  # Now enable latch
175  pub = Publisher(name, data_class, latch=True)
176  impl = get_topic_manager().get_impl(Registration.PUB, rname)
177  # have to verify latching in pub impl
178  self.assert_(impl == pub.impl)
179  self.assertEquals(True, impl.is_latch)
180  self.assertEquals(None, impl.latch)
181  self.assertEquals(2, impl.ref_count)
182 
183  co2 = ConnectionOverride('co2')
184  self.failIf(impl.has_connection('co2'))
185  impl.add_connection(co2)
186  for n in ['co1', 'co2']:
187  self.assert_(impl.has_connection(n))
188  self.assert_(impl.has_connections())
189  v = Val('hello world-2')
190  impl.publish(v, connection_override=co2)
191  self.assert_(v == impl.latch)
192 
193  buff = StringIO()
194  Val('hello world-2').serialize(buff)
195  # - strip length and check value
196  self.assertEquals(co2.data[4:], buff.getvalue())
197 
198  # test that latched value is sent to connections on connect
199  co3 = ConnectionOverride('co3')
200  self.failIf(impl.has_connection('co3'))
201  impl.add_connection(co3)
202  for n in ['co1', 'co2', 'co3']:
203  self.assert_(impl.has_connection(n))
204  self.assert_(impl.has_connections())
205  self.assertEquals(co3.data[4:], buff.getvalue())
206 
207  # TODO: tcp_nodelay
208  # TODO: suscribe listener
209  self.assert_(impl.has_connection('co1'))
210  impl.remove_connection(co1)
211  self.failIf(impl.has_connection('co1'))
212  self.assert_(impl.has_connections())
213 
214  self.assert_(impl.has_connection('co3'))
215  impl.remove_connection(co3)
216  self.failIf(impl.has_connection('co3'))
217  self.assert_(impl.has_connections())
218 
219  self.assert_(impl.has_connection('co2'))
220  impl.remove_connection(co2)
221  self.failIf(impl.has_connection('co2'))
222  self.failIf(impl.has_connections())
223 
224  # test publish() latch on a new Publisher object (this was encountered in testing, so I want a test case for it)
225  pub = Publisher('bar', data_class, latch=True)
226  v = Val('no connection test')
227  pub.impl.publish(v)
228  self.assert_(v == pub.impl.latch)
229 
230  # test connection header
231  h = {'foo': 'bar', 'fuga': 'hoge'}
232  pub = Publisher('header_test', data_class, headers=h)
233  self.assertEquals(h, pub.impl.headers)
234 
236  # regression test for #3029 (unregistering a Subcriber with no
237  # callback) plus other unregistration tests
238  import rospy
239  from rospy.impl.registration import get_topic_manager, Registration
240  from rospy.topics import Subscriber, DEFAULT_BUFF_SIZE
241 
242  #Subscriber: name, data_class, callback=None, callback_args=None,
243  #queue_size=None, buff_size=DEFAULT_BUFF_SIZE, tcp_nodelay=False):
244 
245  name = 'unregistertest'
246  rname = rospy.resolve_name(name)
247  data_class = test_rospy.msg.Val
248 
249  sub = Subscriber(name, data_class)
250  self.assertEquals(None, sub.callback)
251 
252  # verify impl (test_Subscriber handles more verification, we
253  # just care about callbacks and ref_count state here)
254  impl = get_topic_manager().get_impl(Registration.SUB, rname)
255  self.assert_(impl == sub.impl)
256  self.assertEquals(1, impl.ref_count)
257  self.assertEquals([], impl.callbacks)
258 
259  # unregister should release the underlying impl
260  sub.unregister()
261  self.assertEquals(None, get_topic_manager().get_impl(Registration.SUB, rname))
262 
263  # create two subs
264  sub2 = Subscriber(name, data_class)
265  sub3 = Subscriber(name, data_class)
266 
267  impl = get_topic_manager().get_impl(Registration.SUB, rname)
268  # - test that they share the same impl
269  self.assert_(impl == sub2.impl)
270  self.assert_(impl == sub3.impl)
271  # - test basic impl state
272  self.assertEquals([], impl.callbacks)
273  self.assertEquals(2, impl.ref_count)
274  sub2.unregister()
275  self.assertEquals(1, impl.ref_count)
276  # - make sure double unregister is safe
277  sub2.unregister()
278  self.assertEquals(1, impl.ref_count)
279  # - clean it up
280  sub3.unregister()
281  self.assertEquals(0, impl.ref_count)
282  self.assertEquals(None, get_topic_manager().get_impl(Registration.SUB, rname))
283 
284  # CALLBACKS
285  cb_args5 = 5
286  cb_args6 = 6
287  cb_args7 = 7
288  sub4 = Subscriber(name, data_class, callback1)
289  # - use should be allowed to subcribe using the same callback
290  # and it shouldn't interfere on unregister
291  sub5 = Subscriber(name, data_class, callback2, cb_args5)
292  sub6 = Subscriber(name, data_class, callback2, cb_args6)
293  sub7 = Subscriber(name, data_class, callback2, cb_args7)
294  impl = get_topic_manager().get_impl(Registration.SUB, rname)
295  self.assertEquals(4, impl.ref_count)
296 
297  self.assertEquals([(callback1, None), (callback2, cb_args5), (callback2, cb_args6), (callback2, cb_args7)], impl.callbacks)
298  # unregister sub6 first to as it is most likely to confuse any callback-finding logic
299  sub6.unregister()
300  self.assertEquals([(callback1, None), (callback2, cb_args5), (callback2, cb_args7)], impl.callbacks)
301  self.assertEquals(3, impl.ref_count)
302  sub5.unregister()
303  self.assertEquals([(callback1, None), (callback2, cb_args7)], impl.callbacks)
304  self.assertEquals(2, impl.ref_count)
305  sub4.unregister()
306  self.assertEquals([(callback2, cb_args7)], impl.callbacks)
307  self.assertEquals(1, impl.ref_count)
308  sub7.unregister()
309  self.assertEquals([], impl.callbacks)
310  self.assertEquals(0, impl.ref_count)
311  self.assertEquals(None, get_topic_manager().get_impl(Registration.SUB, rname))
312 
313  # one final condition: two identical subscribers
314  sub8 = Subscriber(name, data_class, callback1, 'hello')
315  sub9 = Subscriber(name, data_class, callback1, 'hello')
316  impl = get_topic_manager().get_impl(Registration.SUB, rname)
317  self.assertEquals([(callback1, 'hello'), (callback1, 'hello')], impl.callbacks)
318  self.assertEquals(2, impl.ref_count)
319  sub8.unregister()
320  self.assertEquals([(callback1, 'hello')], impl.callbacks)
321  self.assertEquals(1, impl.ref_count)
322  sub9.unregister()
323  self.assertEquals([], impl.callbacks)
324  self.assertEquals(0, impl.ref_count)
325 
326  def test_Subscriber(self):
327  #TODO: test callback args
328  #TODO: negative buff_size
329  #TODO: negative queue_size
330  import rospy
331  from rospy.impl.registration import get_topic_manager, Registration
332  from rospy.topics import Subscriber, DEFAULT_BUFF_SIZE
333 
334  #Subscriber: name, data_class, callback=None, callback_args=None,
335  #queue_size=None, buff_size=DEFAULT_BUFF_SIZE, tcp_nodelay=False):
336 
337  name = 'foo'
338  rname = rospy.resolve_name('foo')
339  data_class = test_rospy.msg.Val
340 
341  # test invalid params
342  for n in [None, '', 1]:
343  try:
344  Subscriber(n, data_class)
345  self.fail("should not allow invalid name")
346  except ValueError: pass
347  for d in [None, 1, TestRospyTopics]:
348  try:
349  Subscriber(name, d)
350  self.fail("should now allow invalid data_class")
351  except ValueError: pass
352  try:
353  Subscriber(name, None)
354  self.fail("None should not be allowed for data_class")
355  except ValueError: pass
356 
357  sub = Subscriber(name, data_class)
358  self.assertEquals(rname, sub.resolved_name)
359  self.assertEquals(data_class, sub.data_class)
360  self.assertEquals('test_rospy/Val', sub.type)
361  self.assertEquals(data_class._md5sum, sub.md5sum)
362  self.assertEquals(Registration.SUB, sub.reg_type)
363 
364  # verify impl as well
365  impl = get_topic_manager().get_impl(Registration.SUB, rname)
366  self.assert_(impl == sub.impl)
367  self.assertEquals([], impl.callbacks)
368  self.assertEquals(rname, impl.resolved_name)
369  self.assertEquals(data_class, impl.data_class)
370  self.assertEquals(None, impl.queue_size)
371  self.assertEquals(DEFAULT_BUFF_SIZE, impl.buff_size)
372  self.failIf(impl.tcp_nodelay)
373  self.assertEquals(1, impl.ref_count)
374  self.failIf(impl.closed)
375 
376  # round 2, now start setting options and make sure underlying impl is reconfigured
377  name = 'foo'
378  data_class = test_rospy.msg.Val
379  queue_size = 1
380  buff_size = 1
381  sub = Subscriber(name, data_class, callback=callback1,
382  queue_size=queue_size, buff_size=buff_size, tcp_nodelay=True)
383  self.assertEquals(rname, sub.resolved_name)
384  # - sub.name is a backwards-compat field as it is public API
385  self.assertEquals(rname, sub.name)
386  self.assertEquals(data_class, sub.data_class)
387 
388  # verify impl
389  impl2 = get_topic_manager().get_impl(Registration.SUB, rname)
390  self.assert_(impl == impl2) # should be same instance
391  self.assertEquals([(callback1, None)], impl.callbacks)
392  self.assertEquals(rname, impl.resolved_name)
393  self.assertEquals(data_class, impl.data_class)
394  self.assertEquals(queue_size, impl.queue_size)
395  self.assertEquals(buff_size, impl.buff_size)
396  self.assert_(impl.tcp_nodelay)
397  self.assertEquals(2, impl.ref_count)
398  self.failIf(impl.closed)
399 
400  # round 3, make sure that options continue to reconfigure
401  # underlying impl also test that tcp_nodelay is sticky. this
402  # is technically undefined, but this is how rospy chose to
403  # implement.
404  name = 'foo'
405  data_class = test_rospy.msg.Val
406  queue_size = 2
407  buff_size = 2
408  sub = Subscriber(name, data_class, callback=callback2,
409  queue_size=queue_size, buff_size=buff_size, tcp_nodelay=False)
410 
411  # verify impl
412  impl2 = get_topic_manager().get_impl(Registration.SUB, rname)
413  self.assert_(impl == impl2) # should be same instance
414  self.assertEquals(set([(callback1, None), (callback2, None)]), set(impl.callbacks))
415  self.assertEquals(queue_size, impl.queue_size)
416  self.assertEquals(buff_size, impl.buff_size)
417  self.assert_(impl.tcp_nodelay)
418  self.assertEquals(3, impl.ref_count)
419  self.failIf(impl.closed)
420 
421  def test_Poller(self):
422  # no real test as this goes down to kqueue/select, just make sure that it behaves
423  from rospy.topics import Poller
424  p = Poller()
425  p.add_fd(1)
426  for x in p.error_iter():
427  pass
428  p.remove_fd(1)
429  for x in p.error_iter():
430  pass
431 


test_rospy
Author(s): Ken Conley, Dirk Thomas
autogenerated on Mon Nov 2 2020 03:52:56