test_rospy_topics.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2008, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 
00034 import os
00035 import sys
00036 import struct
00037 import unittest
00038 import time
00039 
00040 import test_rospy.msg
00041 
00042 def callback1(data): pass
00043 def callback2(data): pass
00044 
00045 # for use with publish() tests
00046 import rospy.impl.transport
00047 class ConnectionOverride(rospy.impl.transport.Transport):
00048     def __init__(self, endpoint_id):
00049         super(ConnectionOverride, self).__init__(rospy.impl.transport.OUTBOUND, endpoint_id)
00050         self.endpoint_id = endpoint_id
00051         self.data = ''
00052 
00053     def set_cleanup_callback(self, cb): pass
00054     def write_data(self, data):
00055         self.data = self.data + data
00056 
00057 # test rospy API verifies that the rospy module exports the required symbols
00058 class TestRospyTopics(unittest.TestCase):
00059 
00060     def test_add_connection(self):
00061         from rospy.topics import _TopicImpl
00062         from std_msgs.msg import String
00063         t = _TopicImpl('/foo', String)
00064         c1 = ConnectionOverride('c1')
00065         c1b = ConnectionOverride('c1')
00066         c2 = ConnectionOverride('c2')
00067         c3 = ConnectionOverride('c3')
00068         assert not t.has_connection('c1')
00069         assert t.get_num_connections() == 0
00070         t.add_connection(c1)
00071         assert t.get_num_connections() == 1
00072         assert t.has_connection('c1')        
00073         t.add_connection(c1b)
00074         assert t.get_num_connections() == 1
00075         assert t.has_connection('c1')
00076         # should not remove
00077         t.remove_connection(c1)
00078         assert t.has_connection('c1')
00079         t.remove_connection(c1b)        
00080         assert not t.has_connection('c1')
00081 
00082         t.close()
00083         assert t.get_num_connections() == 0 
00084 
00085         t = _TopicImpl('/foo', String)
00086         t.add_connection(c1)
00087         t.add_connection(c2)
00088         t.add_connection(c3)
00089         for x in ['c1', 'c2', 'c3']:
00090             assert t.has_connection(x)
00091         assert t.get_num_connections() == 3
00092 
00093         t.close()
00094         assert t.get_num_connections() == 0        
00095         
00096     def test_Publisher(self):
00097         import rospy
00098         from rospy.impl.registration import get_topic_manager, Registration
00099         from rospy.topics import Publisher, DEFAULT_BUFF_SIZE
00100         # Publisher(self, name, data_class, subscriber_listener=None, tcp_nodelay=False, latch=False, headers=None)
00101 
00102         name = 'foo'
00103         rname = rospy.resolve_name('foo')
00104         data_class = test_rospy.msg.Val
00105 
00106         # test invalid params
00107         for n in [None, '', 1]:
00108             try:
00109                 Publisher(n, data_class)
00110                 self.fail("should not allow invalid name")
00111             except ValueError: pass
00112         for d in [None, 1, TestRospyTopics]:
00113             try:
00114                 Publisher(name, d)
00115                 self.fail("should now allow invalid data_class")
00116             except ValueError: pass
00117         try:
00118             Publisher(name, None)
00119             self.fail("None should not be allowed for data_class")
00120         except ValueError: pass
00121 
00122         # round 1: test basic params
00123         pub = Publisher(name, data_class)
00124         self.assertEquals(rname, pub.resolved_name)
00125         # - pub.name is left in for backwards compatiblity, but resolved_name is preferred
00126         self.assertEquals(rname, pub.name)        
00127         self.assertEquals(data_class, pub.data_class)
00128         self.assertEquals('test_rospy/Val', pub.type)
00129         self.assertEquals(data_class._md5sum, pub.md5sum)
00130         self.assertEquals(Registration.PUB, pub.reg_type)
00131         
00132         # verify impl as well
00133         impl = get_topic_manager().get_impl(Registration.PUB, rname)
00134         self.assert_(impl == pub.impl)
00135         self.assertEquals(rname, impl.resolved_name)
00136         self.assertEquals(data_class, impl.data_class)                
00137         self.failIf(impl.is_latch)
00138         self.assertEquals(None, impl.latch)                
00139         self.assertEquals(0, impl.seq)
00140         self.assertEquals(1, impl.ref_count)
00141         self.assertEquals('', impl.buff.getvalue())
00142         self.failIf(impl.closed)
00143         self.failIf(impl.has_connections())
00144         # check publish() fall-through
00145         from test_rospy.msg import Val
00146         impl.publish(Val('hello world-1'))
00147         
00148         # check stats
00149         self.assertEquals(0, impl.message_data_sent)
00150         # check acquire/release don't bomb
00151         impl.acquire()
00152         impl.release()        
00153 
00154         # do a single publish with connection override. The connection
00155         # override is a major cheat as the object isn't even an actual
00156         # connection. I will need to add more integrated tests later
00157         co1 = ConnectionOverride('co1')
00158         self.failIf(impl.has_connection('co1'))
00159         impl.add_connection(co1)
00160         self.assert_(impl.has_connection('co1'))
00161         self.assert_(impl.has_connections())        
00162         impl.publish(Val('hello world-1'), connection_override=co1)
00163 
00164         import cStringIO
00165         buff = cStringIO.StringIO()
00166         Val('hello world-1').serialize(buff)
00167         # - check equals, but strip length field first
00168         self.assertEquals(co1.data[4:], buff.getvalue())
00169         self.assertEquals(None, impl.latch)
00170         
00171         # Now enable latch
00172         pub = Publisher(name, data_class, latch=True)
00173         impl = get_topic_manager().get_impl(Registration.PUB, rname)
00174         # have to verify latching in pub impl
00175         self.assert_(impl == pub.impl)
00176         self.assertEquals(True, impl.is_latch)
00177         self.assertEquals(None, impl.latch)
00178         self.assertEquals(2, impl.ref_count)        
00179 
00180         co2 = ConnectionOverride('co2')
00181         self.failIf(impl.has_connection('co2'))
00182         impl.add_connection(co2)
00183         for n in ['co1', 'co2']:
00184             self.assert_(impl.has_connection(n))
00185         self.assert_(impl.has_connections())        
00186         v = Val('hello world-2')
00187         impl.publish(v, connection_override=co2)
00188         self.assert_(v == impl.latch)
00189 
00190         buff = cStringIO.StringIO()
00191         Val('hello world-2').serialize(buff)
00192         # - strip length and check value
00193         self.assertEquals(co2.data[4:], buff.getvalue())
00194 
00195         # test that latched value is sent to connections on connect
00196         co3 = ConnectionOverride('co3')
00197         self.failIf(impl.has_connection('co3'))
00198         impl.add_connection(co3)
00199         for n in ['co1', 'co2', 'co3']:
00200             self.assert_(impl.has_connection(n))
00201         self.assert_(impl.has_connections())
00202         self.assertEquals(co3.data[4:], buff.getvalue())        
00203         
00204         # TODO: tcp_nodelay
00205         # TODO: suscribe listener
00206         self.assert_(impl.has_connection('co1'))
00207         impl.remove_connection(co1)
00208         self.failIf(impl.has_connection('co1'))
00209         self.assert_(impl.has_connections())
00210         
00211         self.assert_(impl.has_connection('co3'))
00212         impl.remove_connection(co3)        
00213         self.failIf(impl.has_connection('co3'))
00214         self.assert_(impl.has_connections())
00215         
00216         self.assert_(impl.has_connection('co2'))
00217         impl.remove_connection(co2)        
00218         self.failIf(impl.has_connection('co2'))
00219         self.failIf(impl.has_connections())
00220 
00221         # test publish() latch on a new Publisher object (this was encountered in testing, so I want a test case for it)
00222         pub = Publisher('bar', data_class, latch=True)
00223         v = Val('no connection test')
00224         pub.impl.publish(v)
00225         self.assert_(v == pub.impl.latch)
00226 
00227         # test connection header
00228         h = {'foo': 'bar', 'fuga': 'hoge'}
00229         pub = Publisher('header_test', data_class, headers=h)
00230         self.assertEquals(h, pub.impl.headers)
00231         
00232     def test_Subscriber_unregister(self):
00233         # regression test for #3029 (unregistering a Subcriber with no
00234         # callback) plus other unregistration tests
00235         import rospy
00236         from rospy.impl.registration import get_topic_manager, Registration
00237         from rospy.topics import Subscriber, DEFAULT_BUFF_SIZE
00238 
00239         #Subscriber: name, data_class, callback=None, callback_args=None,
00240         #queue_size=None, buff_size=DEFAULT_BUFF_SIZE, tcp_nodelay=False):
00241 
00242         name = 'unregistertest'
00243         rname = rospy.resolve_name(name)
00244         data_class = test_rospy.msg.Val
00245 
00246         sub = Subscriber(name, data_class)
00247         self.assertEquals(None, sub.callback)
00248 
00249         # verify impl (test_Subscriber handles more verification, we
00250         # just care about callbacks and ref_count state here)
00251         impl = get_topic_manager().get_impl(Registration.SUB, rname)
00252         self.assert_(impl == sub.impl)
00253         self.assertEquals(1, impl.ref_count)
00254         self.assertEquals([], impl.callbacks)
00255         
00256         # unregister should release the underlying impl
00257         sub.unregister()
00258         self.assertEquals(None, get_topic_manager().get_impl(Registration.SUB, rname))
00259 
00260         # create two subs
00261         sub2 = Subscriber(name, data_class)
00262         sub3 = Subscriber(name, data_class)        
00263 
00264         impl = get_topic_manager().get_impl(Registration.SUB, rname)
00265         # - test that they share the same impl
00266         self.assert_(impl == sub2.impl)
00267         self.assert_(impl == sub3.impl)
00268         # - test basic impl state
00269         self.assertEquals([], impl.callbacks)
00270         self.assertEquals(2, impl.ref_count)
00271         sub2.unregister()
00272         self.assertEquals(1, impl.ref_count)
00273         # - make sure double unregister is safe
00274         sub2.unregister()
00275         self.assertEquals(1, impl.ref_count)
00276         # - clean it up
00277         sub3.unregister()
00278         self.assertEquals(0, impl.ref_count)
00279         self.assertEquals(None, get_topic_manager().get_impl(Registration.SUB, rname))
00280 
00281         # CALLBACKS
00282         cb_args5 = 5
00283         cb_args6 = 6
00284         cb_args7 = 7
00285         sub4 = Subscriber(name, data_class, callback1)
00286         # - use should be allowed to subcribe using the same callback
00287         # and it shouldn't interfere on unregister
00288         sub5 = Subscriber(name, data_class, callback2, cb_args5)        
00289         sub6 = Subscriber(name, data_class, callback2, cb_args6)        
00290         sub7 = Subscriber(name, data_class, callback2, cb_args7)        
00291         impl = get_topic_manager().get_impl(Registration.SUB, rname)
00292         self.assertEquals(4, impl.ref_count)
00293         
00294         self.assertEquals([(callback1, None), (callback2, cb_args5), (callback2, cb_args6), (callback2, cb_args7)], impl.callbacks)
00295         # unregister sub6 first to as it is most likely to confuse any callback-finding logic
00296         sub6.unregister()
00297         self.assertEquals([(callback1, None), (callback2, cb_args5), (callback2, cb_args7)], impl.callbacks)
00298         self.assertEquals(3, impl.ref_count)
00299         sub5.unregister()
00300         self.assertEquals([(callback1, None), (callback2, cb_args7)], impl.callbacks)
00301         self.assertEquals(2, impl.ref_count)
00302         sub4.unregister()
00303         self.assertEquals([(callback2, cb_args7)], impl.callbacks)
00304         self.assertEquals(1, impl.ref_count)
00305         sub7.unregister()
00306         self.assertEquals([], impl.callbacks)
00307         self.assertEquals(0, impl.ref_count)
00308         self.assertEquals(None, get_topic_manager().get_impl(Registration.SUB, rname))
00309 
00310         # one final condition: two identical subscribers
00311         sub8 = Subscriber(name, data_class, callback1, 'hello')
00312         sub9 = Subscriber(name, data_class, callback1, 'hello')
00313         impl = get_topic_manager().get_impl(Registration.SUB, rname)
00314         self.assertEquals([(callback1, 'hello'), (callback1, 'hello')], impl.callbacks)
00315         self.assertEquals(2, impl.ref_count)
00316         sub8.unregister()
00317         self.assertEquals([(callback1, 'hello')], impl.callbacks)
00318         self.assertEquals(1, impl.ref_count)
00319         sub9.unregister()
00320         self.assertEquals([], impl.callbacks)
00321         self.assertEquals(0, impl.ref_count)
00322 
00323     def test_Subscriber(self):
00324         #TODO: test callback args
00325         #TODO: negative buff_size
00326         #TODO: negative queue_size        
00327         import rospy
00328         from rospy.impl.registration import get_topic_manager, Registration
00329         from rospy.topics import Subscriber, DEFAULT_BUFF_SIZE
00330 
00331         #Subscriber: name, data_class, callback=None, callback_args=None,
00332         #queue_size=None, buff_size=DEFAULT_BUFF_SIZE, tcp_nodelay=False):
00333 
00334         name = 'foo'
00335         rname = rospy.resolve_name('foo')
00336         data_class = test_rospy.msg.Val
00337 
00338         # test invalid params
00339         for n in [None, '', 1]:
00340             try:
00341                 Subscriber(n, data_class)
00342                 self.fail("should not allow invalid name")
00343             except ValueError: pass
00344         for d in [None, 1, TestRospyTopics]:
00345             try:
00346                 Subscriber(name, d)
00347                 self.fail("should now allow invalid data_class")
00348             except ValueError: pass
00349         try:
00350             Subscriber(name, None)
00351             self.fail("None should not be allowed for data_class")
00352         except ValueError: pass
00353         
00354         sub = Subscriber(name, data_class)
00355         self.assertEquals(rname, sub.resolved_name)
00356         self.assertEquals(data_class, sub.data_class)
00357         self.assertEquals('test_rospy/Val', sub.type)
00358         self.assertEquals(data_class._md5sum, sub.md5sum)
00359         self.assertEquals(Registration.SUB, sub.reg_type)
00360         
00361         # verify impl as well
00362         impl = get_topic_manager().get_impl(Registration.SUB, rname)
00363         self.assert_(impl == sub.impl)
00364         self.assertEquals([], impl.callbacks)
00365         self.assertEquals(rname, impl.resolved_name)
00366         self.assertEquals(data_class, impl.data_class)                
00367         self.assertEquals(None, impl.queue_size)
00368         self.assertEquals(DEFAULT_BUFF_SIZE, impl.buff_size)
00369         self.failIf(impl.tcp_nodelay)
00370         self.assertEquals(1, impl.ref_count)
00371         self.failIf(impl.closed)
00372         
00373         # round 2, now start setting options and make sure underlying impl is reconfigured
00374         name = 'foo'
00375         data_class = test_rospy.msg.Val
00376         queue_size = 1
00377         buff_size = 1
00378         sub = Subscriber(name, data_class, callback=callback1,
00379                          queue_size=queue_size, buff_size=buff_size, tcp_nodelay=True)
00380         self.assertEquals(rname, sub.resolved_name)
00381         # - sub.name is a backwards-compat field as it is public API
00382         self.assertEquals(rname, sub.name)        
00383         self.assertEquals(data_class, sub.data_class)
00384 
00385         # verify impl 
00386         impl2 = get_topic_manager().get_impl(Registration.SUB, rname)
00387         self.assert_(impl == impl2) # should be same instance
00388         self.assertEquals([(callback1, None)], impl.callbacks)
00389         self.assertEquals(rname, impl.resolved_name)
00390         self.assertEquals(data_class, impl.data_class)                
00391         self.assertEquals(queue_size, impl.queue_size)
00392         self.assertEquals(buff_size, impl.buff_size)
00393         self.assert_(impl.tcp_nodelay)
00394         self.assertEquals(2, impl.ref_count)
00395         self.failIf(impl.closed)
00396 
00397         # round 3, make sure that options continue to reconfigure
00398         # underlying impl also test that tcp_nodelay is sticky. this
00399         # is technically undefined, but this is how rospy chose to
00400         # implement.
00401         name = 'foo'
00402         data_class = test_rospy.msg.Val
00403         queue_size = 2
00404         buff_size = 2
00405         sub = Subscriber(name, data_class, callback=callback2, 
00406                          queue_size=queue_size, buff_size=buff_size, tcp_nodelay=False)
00407 
00408         # verify impl 
00409         impl2 = get_topic_manager().get_impl(Registration.SUB, rname)
00410         self.assert_(impl == impl2) # should be same instance
00411         self.assertEquals(set([(callback1, None), (callback2, None)]), set(impl.callbacks))
00412         self.assertEquals(queue_size, impl.queue_size)
00413         self.assertEquals(buff_size, impl.buff_size)
00414         self.assert_(impl.tcp_nodelay)
00415         self.assertEquals(3, impl.ref_count)
00416         self.failIf(impl.closed)
00417 
00418     def test_Poller(self):
00419         # no real test as this goes down to kqueue/select, just make sure that it behaves
00420         from rospy.topics import Poller
00421         p = Poller()
00422         p.add_fd(1)
00423         for x in p.error_iter():
00424             pass
00425         p.remove_fd(1)
00426         for x in p.error_iter():
00427             pass
00428         


test_rospy
Author(s): Ken Conley
autogenerated on Mon Oct 6 2014 11:47:19