$search
00001 #!/usr/bin/env python 00002 # Software License Agreement (BSD License) 00003 # 00004 # Copyright (c) 2009, Willow Garage, Inc. 00005 # All rights reserved. 00006 # 00007 # Redistribution and use in source and binary forms, with or without 00008 # modification, are permitted provided that the following conditions 00009 # are met: 00010 # 00011 # * Redistributions of source code must retain the above copyright 00012 # notice, this list of conditions and the following disclaimer. 00013 # * Redistributions in binary form must reproduce the above 00014 # copyright notice, this list of conditions and the following 00015 # disclaimer in the documentation and/or other materials provided 00016 # with the distribution. 00017 # * Neither the name of Willow Garage, Inc. nor the names of its 00018 # contributors may be used to endorse or promote products derived 00019 # from this software without specific prior written permission. 00020 # 00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 # POSSIBILITY OF SUCH DAMAGE. 00033 # 00034 # Revision $Id: test_rospy_tcpros.py 4140 2009-04-10 23:14:00Z sfkwc $ 00035 00036 import roslib; roslib.load_manifest('test_rospy') 00037 00038 import os 00039 import sys 00040 import struct 00041 import unittest 00042 import time 00043 import cStringIO 00044 00045 class TestRospyRegistration(unittest.TestCase): 00046 00047 def test_get_set_topic_manager(self): 00048 from rospy.impl.registration import get_topic_manager, set_topic_manager 00049 # rospy initialization sets this, but it is out of scope of 00050 # rospy.impl.registrations to test its value 00051 orig = get_topic_manager() 00052 try: 00053 self.assert_(orig is not None) 00054 class TopicManager(object): pass 00055 x = TopicManager() 00056 # currently untyped 00057 set_topic_manager(x) 00058 self.assertEquals(x, get_topic_manager()) 00059 set_topic_manager(None) 00060 self.assert_(get_topic_manager() is None) 00061 finally: 00062 set_topic_manager(orig) 00063 00064 def test_get_set_service_manager(self): 00065 from rospy.impl.registration import get_service_manager, set_service_manager 00066 # rospy initialization sets this, but it is out of scope of 00067 # rospy.impl.registrations to test its value 00068 try: 00069 orig = get_service_manager() 00070 self.assert_(orig is not None) 00071 class ServiceManager(object): pass 00072 x = ServiceManager() 00073 # currently untyped 00074 set_service_manager(x) 00075 self.assertEquals(x, get_service_manager()) 00076 set_service_manager(None) 00077 self.assert_(get_service_manager() is None) 00078 finally: 00079 set_service_manager(orig) 00080 00081 def test_Registration(self): 00082 # nothing to test here really 00083 from rospy.impl.registration import Registration 00084 self.assertEquals('pub', Registration.PUB) 00085 self.assertEquals('sub', Registration.SUB) 00086 self.assertEquals('srv', Registration.SRV) 00087 r = Registration() 00088 self.assertEquals('pub', r.PUB) 00089 self.assertEquals('sub', r.SUB) 00090 self.assertEquals('srv', r.SRV) 00091 def test_RegistrationListener(self): 00092 from rospy.impl.registration import RegistrationListener 00093 #RegistrationListener is just an API, nothing to test here 00094 l = RegistrationListener() 00095 l.reg_added('name', 'data_type', 'reg_type') 00096 l.reg_removed('name', 'data_type', 'reg_type') 00097 00098 def test_RegistrationListeners(self): 00099 from rospy.impl.registration import RegistrationListeners, RegistrationListener 00100 00101 class Mock(RegistrationListener): 00102 def __init__(self): 00103 self.args = [] 00104 def reg_added(self, name, data_type_or_uri, reg_type): 00105 self.args = ['added', name, data_type_or_uri, reg_type] 00106 def reg_removed(self, name, data_type_or_uri, reg_type): 00107 self.args = ['removed', name, data_type_or_uri, reg_type] 00108 class BadMock(RegistrationListener): 00109 def reg_added(self, name, data_type_or_uri, reg_type): 00110 raise Exception("haha!") 00111 def reg_removed(self, name, data_type_or_uri, reg_type): 00112 raise Exception("haha!") 00113 00114 r = RegistrationListeners() 00115 self.assertEquals([], r.listeners) 00116 00117 try: 00118 r.lock.acquire() 00119 finally: 00120 r.lock.release() 00121 00122 r.notify_added('n1', 'dtype1', 'rtype1') 00123 r.notify_removed('n1', 'dtype1', 'rtype1') 00124 l1 = Mock() 00125 l2 = Mock() 00126 00127 r.add_listener(l1) 00128 self.assertEquals([l1], r.listeners) 00129 00130 self.assertEquals([], l1.args) 00131 r.notify_added('n2', 'dtype2', 'rtype2') 00132 self.assertEquals(['added', 'n2', 'dtype2', 'rtype2'], l1.args) 00133 r.notify_removed('n2', 'dtype2', 'rtype2') 00134 self.assertEquals(['removed', 'n2', 'dtype2', 'rtype2'], l1.args) 00135 00136 r.add_listener(l2) 00137 self.assert_(l2 in r.listeners) 00138 self.assert_(l1 in r.listeners) 00139 self.assertEquals(2, len(r.listeners)) 00140 00141 self.assertEquals([], l2.args) 00142 r.notify_added('n3', 'dtype3', 'rtype3') 00143 self.assertEquals(['added', 'n3', 'dtype3', 'rtype3'], l2.args) 00144 self.assertEquals(['added', 'n3', 'dtype3', 'rtype3'], l1.args) 00145 r.notify_removed('n3', 'dtype3', 'rtype3') 00146 self.assertEquals(['removed', 'n3', 'dtype3', 'rtype3'], l2.args) 00147 self.assertEquals(['removed', 'n3', 'dtype3', 'rtype3'], l1.args) 00148 00149 # l3 raises exceptions, make sure they don't break anything 00150 l3 = BadMock() 00151 r.add_listener(l3) 00152 self.assert_(l3 in r.listeners) 00153 self.assert_(l2 in r.listeners) 00154 self.assert_(l1 in r.listeners) 00155 self.assertEquals(3, len(r.listeners)) 00156 00157 r.notify_added('n4', 'dtype4', 'rtype4') 00158 self.assertEquals(['added', 'n4', 'dtype4', 'rtype4'], l2.args) 00159 self.assertEquals(['added', 'n4', 'dtype4', 'rtype4'], l1.args) 00160 r.notify_removed('n4', 'dtype4', 'rtype4') 00161 self.assertEquals(['removed', 'n4', 'dtype4', 'rtype4'], l2.args) 00162 self.assertEquals(['removed', 'n4', 'dtype4', 'rtype4'], l1.args) 00163 00164 def test_get_registration_listeners(self): 00165 from rospy.impl.registration import RegistrationListeners, get_registration_listeners 00166 r = get_registration_listeners() 00167 self.assert_(isinstance(r, RegistrationListeners)) 00168 00169 def test_RegManager(self): 00170 from rospy.impl.registration import RegManager 00171 class MockHandler(object): 00172 def __init__(self): 00173 self.done = False 00174 self.args = [] 00175 def _connect_topic(self, topic, uri): 00176 self.args.append(['_connect_topic', topic, uri]) 00177 # trip to done on connect topic so we can exit loop 00178 self.done = True 00179 return 1, 'msg', 1 00180 # bad return value for _connect_topic 00181 class BadHandler(object): 00182 def __init__(self): 00183 self.done = False 00184 def _connect_topic(self, topic, uri): 00185 self.done = True 00186 return None 00187 # bad code for _connect_topic 00188 class BadHandler2(object): 00189 def __init__(self): 00190 self.done = False 00191 def _connect_topic(self, topic, uri): 00192 self.done = True 00193 return -1, "failed", 1 00194 00195 handler = MockHandler() 00196 m = RegManager(handler) 00197 self.assertEquals(handler, m.handler) 00198 self.assert_(m.logger is not None) 00199 self.assertEquals(m.master_uri, None) 00200 self.assertEquals(m.uri, None) 00201 self.assertEquals([], m.updates) 00202 try: 00203 m.cond.acquire() 00204 finally: 00205 m.cond.release() 00206 00207 # call twice with topic 2 to test filtering logic 00208 00209 m.publisher_update('topic1', ['http://uri:1', 'http://uri:1b']) 00210 m.publisher_update('topic2', ['http://old:2', 'http://old:2b']) 00211 m.publisher_update('topic1b', ['http://foo:1', 'http://foo:1b']) 00212 m.publisher_update('topic2', ['http://uri:2', 'http://uri:2b']) 00213 self.assertEquals([('topic1', ['http://uri:1', 'http://uri:1b']), 00214 ('topic2', ['http://old:2', 'http://old:2b']), 00215 ('topic1b', ['http://foo:1', 'http://foo:1b']), 00216 ('topic2', ['http://uri:2', 'http://uri:2b'])], m.updates) 00217 00218 # disabling these tests as they are just too dangerous now that registrations multithreads updates 00219 if 0: 00220 # this should only go through once as MockHandler trips to done 00221 m.run() 00222 00223 # have to give time for threads to spin up and call handler 00224 timeout_t = 10. + time.time() 00225 while time.time() < timeout_t and len(m.updates) > 2: 00226 time.sleep(0.1) 00227 00228 m.handler = BadHandler() 00229 # this should only go through once as BadHandler trips to done. this tests the 00230 # exception branch 00231 m.run() 00232 00233 # this will cause an error to be logged in _connect_topic_thread 00234 # - reset data in m.updates 00235 m.updates= [('topic1', ['http://uri:1', 'http://uri:1b']), 00236 ('topic1b', ['http://foo:1', 'http://foo:1b'])] 00237 m.handler = BadHandler2() 00238 m.run() 00239 00240 # m.start should return immediately as m.master_uri is not set 00241 self.assertEquals(m.master_uri, None) 00242 m.start(None, None) 00243 # test that it returns if URIs are equal 00244 m.start('http://localhost:1234', 'http://localhost:1234') 00245 00246 # - test with is_shutdown overriden so we don't enter loop 00247 def foo(): 00248 return True 00249 sys.modules['rospy.impl.registration'].__dict__['is_shutdown'] = foo 00250 m.start('http://localhost:1234', 'http://localhost:4567') 00251 00252 handler.done = True 00253 # handler done is true, so this should not run 00254 m.run() 00255 # no master uri, so these should just return 00256 m.master_uri = None 00257 m.reg_added('n1', 'type1', 'rtype1') 00258 m.reg_removed('n1', 'type1', 'rtype1') 00259 m.cleanup('reason') 00260 00261 00262 if __name__ == '__main__': 00263 import rostest 00264 rostest.unitrun('test_rospy', sys.argv[0], TestRospyRegistration, coverage_packages=['rospy.impl.registration'])