test_rospy_topics.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2008, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 
00034 import os
00035 import sys
00036 import struct
00037 import unittest
00038 import time
00039 
00040 import test_rospy.msg
00041 
00042 def callback1(data): pass
00043 def callback2(data): pass
00044 
00045 # for use with publish() tests
00046 import rospy.impl.transport
00047 class ConnectionOverride(rospy.impl.transport.Transport):
00048     def __init__(self, endpoint_id):
00049         super(ConnectionOverride, self).__init__(rospy.impl.transport.OUTBOUND, endpoint_id)
00050         self.endpoint_id = endpoint_id
00051         self.data = ''
00052 
00053     def set_cleanup_callback(self, cb): pass
00054     def write_data(self, data):
00055         self.data = self.data + data
00056 
00057 # test rospy API verifies that the rospy module exports the required symbols
00058 class TestRospyTopics(unittest.TestCase):
00059 
00060     def test_add_connection(self):
00061         from rospy.topics import _TopicImpl
00062         from std_msgs.msg import String
00063         t = _TopicImpl('/foo', String)
00064         c1 = ConnectionOverride('c1')
00065         c1b = ConnectionOverride('c1')
00066         c2 = ConnectionOverride('c2')
00067         c3 = ConnectionOverride('c3')
00068         assert not t.has_connection('c1')
00069         assert t.get_num_connections() == 0
00070         t.add_connection(c1)
00071         assert t.get_num_connections() == 1
00072         assert t.has_connection('c1')        
00073         t.add_connection(c1b)
00074         assert t.get_num_connections() == 1
00075         assert t.has_connection('c1')
00076         # should not remove
00077         t.remove_connection(c1)
00078         assert t.has_connection('c1')
00079         t.remove_connection(c1b)        
00080         assert not t.has_connection('c1')
00081 
00082         t.close()
00083         assert t.get_num_connections() == 0 
00084 
00085         t = _TopicImpl('/foo', String)
00086         t.add_connection(c1)
00087         t.add_connection(c2)
00088         t.add_connection(c3)
00089         for x in ['c1', 'c2', 'c3']:
00090             assert t.has_connection(x)
00091         assert t.get_num_connections() == 3
00092 
00093         t.close()
00094         assert t.get_num_connections() == 0        
00095         
00096     def test_Publisher(self):
00097         import rospy
00098         from rospy.impl.registration import get_topic_manager, Registration
00099         from rospy.topics import Publisher, DEFAULT_BUFF_SIZE
00100         # Publisher(self, name, data_class, subscriber_listener=None, tcp_nodelay=False, latch=False, headers=None)
00101 
00102         name = 'foo'
00103         rname = rospy.resolve_name('foo')
00104         data_class = test_rospy.msg.Val
00105 
00106         # test invalid params
00107         for n in [None, '', 1]:
00108             try:
00109                 Publisher(n, data_class)
00110                 self.fail("should not allow invalid name")
00111             except ValueError: pass
00112         for d in [None, 1, TestRospyTopics]:
00113             try:
00114                 Publisher(name, d)
00115                 self.fail("should now allow invalid data_class")
00116             except ValueError: pass
00117         try:
00118             Publisher(name, None)
00119             self.fail("None should not be allowed for data_class")
00120         except ValueError: pass
00121 
00122         # round 1: test basic params
00123         pub = Publisher(name, data_class)
00124         self.assertEquals(rname, pub.resolved_name)
00125         # - pub.name is left in for backwards compatiblity, but resolved_name is preferred
00126         self.assertEquals(rname, pub.name)        
00127         self.assertEquals(data_class, pub.data_class)
00128         self.assertEquals('test_rospy/Val', pub.type)
00129         self.assertEquals(data_class._md5sum, pub.md5sum)
00130         self.assertEquals(Registration.PUB, pub.reg_type)
00131         
00132         # verify impl as well
00133         impl = get_topic_manager().get_impl(Registration.PUB, rname)
00134         self.assert_(impl == pub.impl)
00135         self.assertEquals(rname, impl.resolved_name)
00136         self.assertEquals(data_class, impl.data_class)                
00137         self.failIf(impl.is_latch)
00138         self.assertEquals(None, impl.latch)                
00139         self.assertEquals(0, impl.seq)
00140         self.assertEquals(1, impl.ref_count)
00141         self.assertEquals('', impl.buff.getvalue())
00142         self.failIf(impl.closed)
00143         self.failIf(impl.has_connections())
00144         # check publish() fall-through
00145         from test_rospy.msg import Val
00146         impl.publish(Val('hello world-1'))
00147         
00148         # check stats
00149         self.assertEquals(0, impl.message_data_sent)
00150         # check acquire/release don't bomb
00151         impl.acquire()
00152         impl.release()        
00153 
00154         # do a single publish with connection override. The connection
00155         # override is a major cheat as the object isn't even an actual
00156         # connection. I will need to add more integrated tests later
00157         co1 = ConnectionOverride('co1')
00158         self.failIf(impl.has_connection('co1'))
00159         impl.add_connection(co1)
00160         self.assert_(impl.has_connection('co1'))
00161         self.assert_(impl.has_connections())        
00162         impl.publish(Val('hello world-1'), connection_override=co1)
00163 
00164         try:
00165             from cStringIO import StringIO
00166         except ImportError:
00167             from io import StringIO
00168         buff = StringIO()
00169         Val('hello world-1').serialize(buff)
00170         # - check equals, but strip length field first
00171         self.assertEquals(co1.data[4:], buff.getvalue())
00172         self.assertEquals(None, impl.latch)
00173         
00174         # Now enable latch
00175         pub = Publisher(name, data_class, latch=True)
00176         impl = get_topic_manager().get_impl(Registration.PUB, rname)
00177         # have to verify latching in pub impl
00178         self.assert_(impl == pub.impl)
00179         self.assertEquals(True, impl.is_latch)
00180         self.assertEquals(None, impl.latch)
00181         self.assertEquals(2, impl.ref_count)        
00182 
00183         co2 = ConnectionOverride('co2')
00184         self.failIf(impl.has_connection('co2'))
00185         impl.add_connection(co2)
00186         for n in ['co1', 'co2']:
00187             self.assert_(impl.has_connection(n))
00188         self.assert_(impl.has_connections())        
00189         v = Val('hello world-2')
00190         impl.publish(v, connection_override=co2)
00191         self.assert_(v == impl.latch)
00192 
00193         buff = StringIO()
00194         Val('hello world-2').serialize(buff)
00195         # - strip length and check value
00196         self.assertEquals(co2.data[4:], buff.getvalue())
00197 
00198         # test that latched value is sent to connections on connect
00199         co3 = ConnectionOverride('co3')
00200         self.failIf(impl.has_connection('co3'))
00201         impl.add_connection(co3)
00202         for n in ['co1', 'co2', 'co3']:
00203             self.assert_(impl.has_connection(n))
00204         self.assert_(impl.has_connections())
00205         self.assertEquals(co3.data[4:], buff.getvalue())        
00206         
00207         # TODO: tcp_nodelay
00208         # TODO: suscribe listener
00209         self.assert_(impl.has_connection('co1'))
00210         impl.remove_connection(co1)
00211         self.failIf(impl.has_connection('co1'))
00212         self.assert_(impl.has_connections())
00213         
00214         self.assert_(impl.has_connection('co3'))
00215         impl.remove_connection(co3)        
00216         self.failIf(impl.has_connection('co3'))
00217         self.assert_(impl.has_connections())
00218         
00219         self.assert_(impl.has_connection('co2'))
00220         impl.remove_connection(co2)        
00221         self.failIf(impl.has_connection('co2'))
00222         self.failIf(impl.has_connections())
00223 
00224         # test publish() latch on a new Publisher object (this was encountered in testing, so I want a test case for it)
00225         pub = Publisher('bar', data_class, latch=True)
00226         v = Val('no connection test')
00227         pub.impl.publish(v)
00228         self.assert_(v == pub.impl.latch)
00229 
00230         # test connection header
00231         h = {'foo': 'bar', 'fuga': 'hoge'}
00232         pub = Publisher('header_test', data_class, headers=h)
00233         self.assertEquals(h, pub.impl.headers)
00234         
00235     def test_Subscriber_unregister(self):
00236         # regression test for #3029 (unregistering a Subcriber with no
00237         # callback) plus other unregistration tests
00238         import rospy
00239         from rospy.impl.registration import get_topic_manager, Registration
00240         from rospy.topics import Subscriber, DEFAULT_BUFF_SIZE
00241 
00242         #Subscriber: name, data_class, callback=None, callback_args=None,
00243         #queue_size=None, buff_size=DEFAULT_BUFF_SIZE, tcp_nodelay=False):
00244 
00245         name = 'unregistertest'
00246         rname = rospy.resolve_name(name)
00247         data_class = test_rospy.msg.Val
00248 
00249         sub = Subscriber(name, data_class)
00250         self.assertEquals(None, sub.callback)
00251 
00252         # verify impl (test_Subscriber handles more verification, we
00253         # just care about callbacks and ref_count state here)
00254         impl = get_topic_manager().get_impl(Registration.SUB, rname)
00255         self.assert_(impl == sub.impl)
00256         self.assertEquals(1, impl.ref_count)
00257         self.assertEquals([], impl.callbacks)
00258         
00259         # unregister should release the underlying impl
00260         sub.unregister()
00261         self.assertEquals(None, get_topic_manager().get_impl(Registration.SUB, rname))
00262 
00263         # create two subs
00264         sub2 = Subscriber(name, data_class)
00265         sub3 = Subscriber(name, data_class)        
00266 
00267         impl = get_topic_manager().get_impl(Registration.SUB, rname)
00268         # - test that they share the same impl
00269         self.assert_(impl == sub2.impl)
00270         self.assert_(impl == sub3.impl)
00271         # - test basic impl state
00272         self.assertEquals([], impl.callbacks)
00273         self.assertEquals(2, impl.ref_count)
00274         sub2.unregister()
00275         self.assertEquals(1, impl.ref_count)
00276         # - make sure double unregister is safe
00277         sub2.unregister()
00278         self.assertEquals(1, impl.ref_count)
00279         # - clean it up
00280         sub3.unregister()
00281         self.assertEquals(0, impl.ref_count)
00282         self.assertEquals(None, get_topic_manager().get_impl(Registration.SUB, rname))
00283 
00284         # CALLBACKS
00285         cb_args5 = 5
00286         cb_args6 = 6
00287         cb_args7 = 7
00288         sub4 = Subscriber(name, data_class, callback1)
00289         # - use should be allowed to subcribe using the same callback
00290         # and it shouldn't interfere on unregister
00291         sub5 = Subscriber(name, data_class, callback2, cb_args5)        
00292         sub6 = Subscriber(name, data_class, callback2, cb_args6)        
00293         sub7 = Subscriber(name, data_class, callback2, cb_args7)        
00294         impl = get_topic_manager().get_impl(Registration.SUB, rname)
00295         self.assertEquals(4, impl.ref_count)
00296         
00297         self.assertEquals([(callback1, None), (callback2, cb_args5), (callback2, cb_args6), (callback2, cb_args7)], impl.callbacks)
00298         # unregister sub6 first to as it is most likely to confuse any callback-finding logic
00299         sub6.unregister()
00300         self.assertEquals([(callback1, None), (callback2, cb_args5), (callback2, cb_args7)], impl.callbacks)
00301         self.assertEquals(3, impl.ref_count)
00302         sub5.unregister()
00303         self.assertEquals([(callback1, None), (callback2, cb_args7)], impl.callbacks)
00304         self.assertEquals(2, impl.ref_count)
00305         sub4.unregister()
00306         self.assertEquals([(callback2, cb_args7)], impl.callbacks)
00307         self.assertEquals(1, impl.ref_count)
00308         sub7.unregister()
00309         self.assertEquals([], impl.callbacks)
00310         self.assertEquals(0, impl.ref_count)
00311         self.assertEquals(None, get_topic_manager().get_impl(Registration.SUB, rname))
00312 
00313         # one final condition: two identical subscribers
00314         sub8 = Subscriber(name, data_class, callback1, 'hello')
00315         sub9 = Subscriber(name, data_class, callback1, 'hello')
00316         impl = get_topic_manager().get_impl(Registration.SUB, rname)
00317         self.assertEquals([(callback1, 'hello'), (callback1, 'hello')], impl.callbacks)
00318         self.assertEquals(2, impl.ref_count)
00319         sub8.unregister()
00320         self.assertEquals([(callback1, 'hello')], impl.callbacks)
00321         self.assertEquals(1, impl.ref_count)
00322         sub9.unregister()
00323         self.assertEquals([], impl.callbacks)
00324         self.assertEquals(0, impl.ref_count)
00325 
00326     def test_Subscriber(self):
00327         #TODO: test callback args
00328         #TODO: negative buff_size
00329         #TODO: negative queue_size        
00330         import rospy
00331         from rospy.impl.registration import get_topic_manager, Registration
00332         from rospy.topics import Subscriber, DEFAULT_BUFF_SIZE
00333 
00334         #Subscriber: name, data_class, callback=None, callback_args=None,
00335         #queue_size=None, buff_size=DEFAULT_BUFF_SIZE, tcp_nodelay=False):
00336 
00337         name = 'foo'
00338         rname = rospy.resolve_name('foo')
00339         data_class = test_rospy.msg.Val
00340 
00341         # test invalid params
00342         for n in [None, '', 1]:
00343             try:
00344                 Subscriber(n, data_class)
00345                 self.fail("should not allow invalid name")
00346             except ValueError: pass
00347         for d in [None, 1, TestRospyTopics]:
00348             try:
00349                 Subscriber(name, d)
00350                 self.fail("should now allow invalid data_class")
00351             except ValueError: pass
00352         try:
00353             Subscriber(name, None)
00354             self.fail("None should not be allowed for data_class")
00355         except ValueError: pass
00356         
00357         sub = Subscriber(name, data_class)
00358         self.assertEquals(rname, sub.resolved_name)
00359         self.assertEquals(data_class, sub.data_class)
00360         self.assertEquals('test_rospy/Val', sub.type)
00361         self.assertEquals(data_class._md5sum, sub.md5sum)
00362         self.assertEquals(Registration.SUB, sub.reg_type)
00363         
00364         # verify impl as well
00365         impl = get_topic_manager().get_impl(Registration.SUB, rname)
00366         self.assert_(impl == sub.impl)
00367         self.assertEquals([], impl.callbacks)
00368         self.assertEquals(rname, impl.resolved_name)
00369         self.assertEquals(data_class, impl.data_class)                
00370         self.assertEquals(None, impl.queue_size)
00371         self.assertEquals(DEFAULT_BUFF_SIZE, impl.buff_size)
00372         self.failIf(impl.tcp_nodelay)
00373         self.assertEquals(1, impl.ref_count)
00374         self.failIf(impl.closed)
00375         
00376         # round 2, now start setting options and make sure underlying impl is reconfigured
00377         name = 'foo'
00378         data_class = test_rospy.msg.Val
00379         queue_size = 1
00380         buff_size = 1
00381         sub = Subscriber(name, data_class, callback=callback1,
00382                          queue_size=queue_size, buff_size=buff_size, tcp_nodelay=True)
00383         self.assertEquals(rname, sub.resolved_name)
00384         # - sub.name is a backwards-compat field as it is public API
00385         self.assertEquals(rname, sub.name)        
00386         self.assertEquals(data_class, sub.data_class)
00387 
00388         # verify impl 
00389         impl2 = get_topic_manager().get_impl(Registration.SUB, rname)
00390         self.assert_(impl == impl2) # should be same instance
00391         self.assertEquals([(callback1, None)], impl.callbacks)
00392         self.assertEquals(rname, impl.resolved_name)
00393         self.assertEquals(data_class, impl.data_class)                
00394         self.assertEquals(queue_size, impl.queue_size)
00395         self.assertEquals(buff_size, impl.buff_size)
00396         self.assert_(impl.tcp_nodelay)
00397         self.assertEquals(2, impl.ref_count)
00398         self.failIf(impl.closed)
00399 
00400         # round 3, make sure that options continue to reconfigure
00401         # underlying impl also test that tcp_nodelay is sticky. this
00402         # is technically undefined, but this is how rospy chose to
00403         # implement.
00404         name = 'foo'
00405         data_class = test_rospy.msg.Val
00406         queue_size = 2
00407         buff_size = 2
00408         sub = Subscriber(name, data_class, callback=callback2, 
00409                          queue_size=queue_size, buff_size=buff_size, tcp_nodelay=False)
00410 
00411         # verify impl 
00412         impl2 = get_topic_manager().get_impl(Registration.SUB, rname)
00413         self.assert_(impl == impl2) # should be same instance
00414         self.assertEquals(set([(callback1, None), (callback2, None)]), set(impl.callbacks))
00415         self.assertEquals(queue_size, impl.queue_size)
00416         self.assertEquals(buff_size, impl.buff_size)
00417         self.assert_(impl.tcp_nodelay)
00418         self.assertEquals(3, impl.ref_count)
00419         self.failIf(impl.closed)
00420 
00421     def test_Poller(self):
00422         # no real test as this goes down to kqueue/select, just make sure that it behaves
00423         from rospy.topics import Poller
00424         p = Poller()
00425         p.add_fd(1)
00426         for x in p.error_iter():
00427             pass
00428         p.remove_fd(1)
00429         for x in p.error_iter():
00430             pass
00431         


test_rospy
Author(s): Ken Conley
autogenerated on Thu Jun 6 2019 21:10:57