$search
00001 #!/usr/bin/env python 00002 # Software License Agreement (BSD License) 00003 # 00004 # Copyright (c) 2008, Willow Garage, Inc. 00005 # All rights reserved. 00006 # 00007 # Redistribution and use in source and binary forms, with or without 00008 # modification, are permitted provided that the following conditions 00009 # are met: 00010 # 00011 # * Redistributions of source code must retain the above copyright 00012 # notice, this list of conditions and the following disclaimer. 00013 # * Redistributions in binary form must reproduce the above 00014 # copyright notice, this list of conditions and the following 00015 # disclaimer in the documentation and/or other materials provided 00016 # with the distribution. 00017 # * Neither the name of Willow Garage, Inc. nor the names of its 00018 # contributors may be used to endorse or promote products derived 00019 # from this software without specific prior written permission. 00020 # 00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 # POSSIBILITY OF SUCH DAMAGE. 00033 # 00034 # Revision $Id: test_rospy_api.py 4898 2009-06-17 01:31:18Z sfkwc $ 00035 00036 import roslib; roslib.load_manifest('test_rospy') 00037 00038 import os 00039 import sys 00040 import struct 00041 import unittest 00042 import time 00043 00044 import test_rospy.msg 00045 00046 def callback1(data): pass 00047 def callback2(data): pass 00048 00049 # for use with publish() tests 00050 import rospy.impl.transport 00051 class ConnectionOverride(rospy.impl.transport.Transport): 00052 def __init__(self, endpoint_id): 00053 super(ConnectionOverride, self).__init__(rospy.impl.transport.OUTBOUND, endpoint_id) 00054 self.endpoint_id = endpoint_id 00055 self.data = '' 00056 00057 def set_cleanup_callback(self, cb): pass 00058 def write_data(self, data): 00059 self.data = self.data + data 00060 00061 # test rospy API verifies that the rospy module exports the required symbols 00062 class TestRospyTopics(unittest.TestCase): 00063 00064 def test_Publisher(self): 00065 import rospy 00066 from rospy.impl.registration import get_topic_manager, Registration 00067 from rospy.topics import Publisher, DEFAULT_BUFF_SIZE 00068 # Publisher(self, name, data_class, subscriber_listener=None, tcp_nodelay=False, latch=False, headers=None) 00069 00070 name = 'foo' 00071 rname = rospy.resolve_name('foo') 00072 data_class = test_rospy.msg.Val 00073 00074 # test invalid params 00075 for n in [None, '', 1]: 00076 try: 00077 Publisher(n, data_class) 00078 self.fail("should not allow invalid name") 00079 except ValueError: pass 00080 for d in [None, 1, TestRospyTopics]: 00081 try: 00082 Publisher(name, d) 00083 self.fail("should now allow invalid data_class") 00084 except ValueError: pass 00085 try: 00086 Publisher(name, None) 00087 self.fail("None should not be allowed for data_class") 00088 except ValueError: pass 00089 00090 # round 1: test basic params 00091 pub = Publisher(name, data_class) 00092 self.assertEquals(rname, pub.resolved_name) 00093 # - pub.name is left in for backwards compatiblity, but resolved_name is preferred 00094 self.assertEquals(rname, pub.name) 00095 self.assertEquals(data_class, pub.data_class) 00096 self.assertEquals('test_rospy/Val', pub.type) 00097 self.assertEquals(data_class._md5sum, pub.md5sum) 00098 self.assertEquals(Registration.PUB, pub.reg_type) 00099 00100 # verify impl as well 00101 impl = get_topic_manager().get_impl(Registration.PUB, rname) 00102 self.assert_(impl == pub.impl) 00103 self.assertEquals(rname, impl.resolved_name) 00104 self.assertEquals(data_class, impl.data_class) 00105 self.failIf(impl.is_latch) 00106 self.assertEquals(None, impl.latch) 00107 self.assertEquals(0, impl.seq) 00108 self.assertEquals(1, impl.ref_count) 00109 self.assertEquals('', impl.buff.getvalue()) 00110 self.failIf(impl.closed) 00111 self.failIf(impl.has_connections()) 00112 # check publish() fall-through 00113 from test_rospy.msg import Val 00114 impl.publish(Val('hello world-1')) 00115 00116 # check stats 00117 self.assertEquals(0, impl.message_data_sent) 00118 # check acquire/release don't bomb 00119 impl.acquire() 00120 impl.release() 00121 00122 # do a single publish with connection override. The connection 00123 # override is a major cheat as the object isn't even an actual 00124 # connection. I will need to add more integrated tests later 00125 co1 = ConnectionOverride('co1') 00126 self.failIf(impl.has_connection('co1')) 00127 impl.add_connection(co1) 00128 self.assert_(impl.has_connection('co1')) 00129 self.assert_(impl.has_connections()) 00130 impl.publish(Val('hello world-1'), connection_override=co1) 00131 00132 import cStringIO 00133 buff = cStringIO.StringIO() 00134 Val('hello world-1').serialize(buff) 00135 # - check equals, but strip length field first 00136 self.assertEquals(co1.data[4:], buff.getvalue()) 00137 self.assertEquals(None, impl.latch) 00138 00139 # Now enable latch 00140 pub = Publisher(name, data_class, latch=True) 00141 impl = get_topic_manager().get_impl(Registration.PUB, rname) 00142 # have to verify latching in pub impl 00143 self.assert_(impl == pub.impl) 00144 self.assertEquals(True, impl.is_latch) 00145 self.assertEquals(None, impl.latch) 00146 self.assertEquals(2, impl.ref_count) 00147 00148 co2 = ConnectionOverride('co2') 00149 self.failIf(impl.has_connection('co2')) 00150 impl.add_connection(co2) 00151 for n in ['co1', 'co2']: 00152 self.assert_(impl.has_connection(n)) 00153 self.assert_(impl.has_connections()) 00154 v = Val('hello world-2') 00155 impl.publish(v, connection_override=co2) 00156 self.assert_(v == impl.latch) 00157 00158 buff = cStringIO.StringIO() 00159 Val('hello world-2').serialize(buff) 00160 # - strip length and check value 00161 self.assertEquals(co2.data[4:], buff.getvalue()) 00162 00163 # test that latched value is sent to connections on connect 00164 co3 = ConnectionOverride('co3') 00165 self.failIf(impl.has_connection('co3')) 00166 impl.add_connection(co3) 00167 for n in ['co1', 'co2', 'co3']: 00168 self.assert_(impl.has_connection(n)) 00169 self.assert_(impl.has_connections()) 00170 self.assertEquals(co3.data[4:], buff.getvalue()) 00171 00172 # TODO: tcp_nodelay 00173 # TODO: suscribe listener 00174 self.assert_(impl.has_connection('co1')) 00175 impl.remove_connection(co1) 00176 self.failIf(impl.has_connection('co1')) 00177 self.assert_(impl.has_connections()) 00178 # TODO: need to validate DeadTransport better 00179 self.assert_([d for d in impl.dead_connections if d.endpoint_id == 'co1']) 00180 00181 self.assert_(impl.has_connection('co3')) 00182 impl.remove_connection(co3) 00183 self.failIf(impl.has_connection('co3')) 00184 self.assert_(impl.has_connections()) 00185 for id in ['co1', 'co3']: 00186 self.assert_([d for d in impl.dead_connections if d.endpoint_id == id]) 00187 00188 self.assert_(impl.has_connection('co2')) 00189 impl.remove_connection(co2) 00190 self.failIf(impl.has_connection('co2')) 00191 self.failIf(impl.has_connections()) 00192 for id in ['co1', 'co2', 'co3']: 00193 self.assert_([d for d in impl.dead_connections if d.endpoint_id == id]) 00194 00195 00196 # test publish() latch on a new Publisher object (this was encountered in testing, so I want a test case for it) 00197 pub = Publisher('bar', data_class, latch=True) 00198 v = Val('no connection test') 00199 pub.impl.publish(v) 00200 self.assert_(v == pub.impl.latch) 00201 00202 # test connection header 00203 h = {'foo': 'bar', 'fuga': 'hoge'} 00204 pub = Publisher('header_test', data_class, headers=h) 00205 self.assertEquals(h, pub.impl.headers) 00206 00207 def test_Subscriber_unregister(self): 00208 # regression test for #3029 (unregistering a Subcriber with no 00209 # callback) plus other unregistration tests 00210 import rospy 00211 from rospy.impl.registration import get_topic_manager, Registration 00212 from rospy.topics import Subscriber, DEFAULT_BUFF_SIZE 00213 00214 #Subscriber: name, data_class, callback=None, callback_args=None, 00215 #queue_size=None, buff_size=DEFAULT_BUFF_SIZE, tcp_nodelay=False): 00216 00217 name = 'unregistertest' 00218 rname = rospy.resolve_name(name) 00219 data_class = test_rospy.msg.Val 00220 00221 sub = Subscriber(name, data_class) 00222 self.assertEquals(None, sub.callback) 00223 00224 # verify impl (test_Subscriber handles more verification, we 00225 # just care about callbacks and ref_count state here) 00226 impl = get_topic_manager().get_impl(Registration.SUB, rname) 00227 self.assert_(impl == sub.impl) 00228 self.assertEquals(1, impl.ref_count) 00229 self.assertEquals([], impl.callbacks) 00230 00231 # unregister should release the underlying impl 00232 sub.unregister() 00233 self.assertEquals(None, get_topic_manager().get_impl(Registration.SUB, rname)) 00234 00235 # create two subs 00236 sub2 = Subscriber(name, data_class) 00237 sub3 = Subscriber(name, data_class) 00238 00239 impl = get_topic_manager().get_impl(Registration.SUB, rname) 00240 # - test that they share the same impl 00241 self.assert_(impl == sub2.impl) 00242 self.assert_(impl == sub3.impl) 00243 # - test basic impl state 00244 self.assertEquals([], impl.callbacks) 00245 self.assertEquals(2, impl.ref_count) 00246 sub2.unregister() 00247 self.assertEquals(1, impl.ref_count) 00248 # - make sure double unregister is safe 00249 sub2.unregister() 00250 self.assertEquals(1, impl.ref_count) 00251 # - clean it up 00252 sub3.unregister() 00253 self.assertEquals(0, impl.ref_count) 00254 self.assertEquals(None, get_topic_manager().get_impl(Registration.SUB, rname)) 00255 00256 # CALLBACKS 00257 cb_args5 = 5 00258 cb_args6 = 6 00259 cb_args7 = 7 00260 sub4 = Subscriber(name, data_class, callback1) 00261 # - use should be allowed to subcribe using the same callback 00262 # and it shouldn't interfere on unregister 00263 sub5 = Subscriber(name, data_class, callback2, cb_args5) 00264 sub6 = Subscriber(name, data_class, callback2, cb_args6) 00265 sub7 = Subscriber(name, data_class, callback2, cb_args7) 00266 impl = get_topic_manager().get_impl(Registration.SUB, rname) 00267 self.assertEquals(4, impl.ref_count) 00268 00269 self.assertEquals([(callback1, None), (callback2, cb_args5), (callback2, cb_args6), (callback2, cb_args7)], impl.callbacks) 00270 # unregister sub6 first to as it is most likely to confuse any callback-finding logic 00271 sub6.unregister() 00272 self.assertEquals([(callback1, None), (callback2, cb_args5), (callback2, cb_args7)], impl.callbacks) 00273 self.assertEquals(3, impl.ref_count) 00274 sub5.unregister() 00275 self.assertEquals([(callback1, None), (callback2, cb_args7)], impl.callbacks) 00276 self.assertEquals(2, impl.ref_count) 00277 sub4.unregister() 00278 self.assertEquals([(callback2, cb_args7)], impl.callbacks) 00279 self.assertEquals(1, impl.ref_count) 00280 sub7.unregister() 00281 self.assertEquals([], impl.callbacks) 00282 self.assertEquals(0, impl.ref_count) 00283 self.assertEquals(None, get_topic_manager().get_impl(Registration.SUB, rname)) 00284 00285 # one final condition: two identical subscribers 00286 sub8 = Subscriber(name, data_class, callback1, 'hello') 00287 sub9 = Subscriber(name, data_class, callback1, 'hello') 00288 impl = get_topic_manager().get_impl(Registration.SUB, rname) 00289 self.assertEquals([(callback1, 'hello'), (callback1, 'hello')], impl.callbacks) 00290 self.assertEquals(2, impl.ref_count) 00291 sub8.unregister() 00292 self.assertEquals([(callback1, 'hello')], impl.callbacks) 00293 self.assertEquals(1, impl.ref_count) 00294 sub9.unregister() 00295 self.assertEquals([], impl.callbacks) 00296 self.assertEquals(0, impl.ref_count) 00297 00298 def test_Subscriber(self): 00299 #TODO: test callback args 00300 #TODO: negative buff_size 00301 #TODO: negative queue_size 00302 import rospy 00303 from rospy.impl.registration import get_topic_manager, Registration 00304 from rospy.topics import Subscriber, DEFAULT_BUFF_SIZE 00305 00306 #Subscriber: name, data_class, callback=None, callback_args=None, 00307 #queue_size=None, buff_size=DEFAULT_BUFF_SIZE, tcp_nodelay=False): 00308 00309 name = 'foo' 00310 rname = rospy.resolve_name('foo') 00311 data_class = test_rospy.msg.Val 00312 00313 # test invalid params 00314 for n in [None, '', 1]: 00315 try: 00316 Subscriber(n, data_class) 00317 self.fail("should not allow invalid name") 00318 except ValueError: pass 00319 for d in [None, 1, TestRospyTopics]: 00320 try: 00321 Subscriber(name, d) 00322 self.fail("should now allow invalid data_class") 00323 except ValueError: pass 00324 try: 00325 Subscriber(name, None) 00326 self.fail("None should not be allowed for data_class") 00327 except ValueError: pass 00328 00329 sub = Subscriber(name, data_class) 00330 self.assertEquals(rname, sub.resolved_name) 00331 self.assertEquals(data_class, sub.data_class) 00332 self.assertEquals('test_rospy/Val', sub.type) 00333 self.assertEquals(data_class._md5sum, sub.md5sum) 00334 self.assertEquals(Registration.SUB, sub.reg_type) 00335 00336 # verify impl as well 00337 impl = get_topic_manager().get_impl(Registration.SUB, rname) 00338 self.assert_(impl == sub.impl) 00339 self.assertEquals([], impl.callbacks) 00340 self.assertEquals(rname, impl.resolved_name) 00341 self.assertEquals(data_class, impl.data_class) 00342 self.assertEquals(None, impl.queue_size) 00343 self.assertEquals(DEFAULT_BUFF_SIZE, impl.buff_size) 00344 self.failIf(impl.tcp_nodelay) 00345 self.assertEquals(1, impl.ref_count) 00346 self.failIf(impl.closed) 00347 00348 # round 2, now start setting options and make sure underlying impl is reconfigured 00349 name = 'foo' 00350 data_class = test_rospy.msg.Val 00351 queue_size = 1 00352 buff_size = 1 00353 sub = Subscriber(name, data_class, callback=callback1, 00354 queue_size=queue_size, buff_size=buff_size, tcp_nodelay=True) 00355 self.assertEquals(rname, sub.resolved_name) 00356 # - sub.name is a backwards-compat field as it is public API 00357 self.assertEquals(rname, sub.name) 00358 self.assertEquals(data_class, sub.data_class) 00359 00360 # verify impl 00361 impl2 = get_topic_manager().get_impl(Registration.SUB, rname) 00362 self.assert_(impl == impl2) # should be same instance 00363 self.assertEquals([(callback1, None)], impl.callbacks) 00364 self.assertEquals(rname, impl.resolved_name) 00365 self.assertEquals(data_class, impl.data_class) 00366 self.assertEquals(queue_size, impl.queue_size) 00367 self.assertEquals(buff_size, impl.buff_size) 00368 self.assert_(impl.tcp_nodelay) 00369 self.assertEquals(2, impl.ref_count) 00370 self.failIf(impl.closed) 00371 00372 # round 3, make sure that options continue to reconfigure 00373 # underlying impl also test that tcp_nodelay is sticky. this 00374 # is technically undefined, but this is how rospy chose to 00375 # implement. 00376 name = 'foo' 00377 data_class = test_rospy.msg.Val 00378 queue_size = 2 00379 buff_size = 2 00380 sub = Subscriber(name, data_class, callback=callback2, 00381 queue_size=queue_size, buff_size=buff_size, tcp_nodelay=False) 00382 00383 # verify impl 00384 impl2 = get_topic_manager().get_impl(Registration.SUB, rname) 00385 self.assert_(impl == impl2) # should be same instance 00386 self.assertEquals(set([(callback1, None), (callback2, None)]), set(impl.callbacks)) 00387 self.assertEquals(queue_size, impl.queue_size) 00388 self.assertEquals(buff_size, impl.buff_size) 00389 self.assert_(impl.tcp_nodelay) 00390 self.assertEquals(3, impl.ref_count) 00391 self.failIf(impl.closed) 00392 00393 if __name__ == '__main__': 00394 import rostest 00395 rostest.unitrun('test_rospy', 'test_rospy_topic', TestRospyTopics, coverage_packages=['rospy.topics'])