test_rospy_registration.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2009, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 
00034 import os
00035 import sys
00036 import struct
00037 import unittest
00038 import time
00039 import cStringIO
00040         
00041 class TestRospyRegistration(unittest.TestCase):
00042 
00043     def test_get_set_topic_manager(self):
00044         from rospy.impl.registration import get_topic_manager, set_topic_manager
00045         # rospy initialization sets this, but it is out of scope of
00046         # rospy.impl.registrations to test its value
00047         orig = get_topic_manager()
00048         try:
00049             self.assert_(orig is not None)
00050             class TopicManager(object): pass
00051             x = TopicManager()
00052         # currently untyped
00053             set_topic_manager(x)
00054             self.assertEquals(x, get_topic_manager())
00055             set_topic_manager(None)
00056             self.assert_(get_topic_manager() is None)
00057         finally:
00058             set_topic_manager(orig)            
00059 
00060     def test_get_set_service_manager(self):
00061         from rospy.impl.registration import get_service_manager, set_service_manager
00062         # rospy initialization sets this, but it is out of scope of
00063         # rospy.impl.registrations to test its value
00064         try:
00065             orig = get_service_manager()
00066             self.assert_(orig is not None)
00067             class ServiceManager(object): pass
00068             x = ServiceManager()
00069             # currently untyped
00070             set_service_manager(x)
00071             self.assertEquals(x, get_service_manager())
00072             set_service_manager(None)
00073             self.assert_(get_service_manager() is None)
00074         finally:
00075             set_service_manager(orig)            
00076         
00077     def test_Registration(self):
00078         # nothing to test here really
00079         from rospy.impl.registration import Registration
00080         self.assertEquals('pub', Registration.PUB)
00081         self.assertEquals('sub', Registration.SUB)
00082         self.assertEquals('srv', Registration.SRV)        
00083         r = Registration()
00084         self.assertEquals('pub', r.PUB)
00085         self.assertEquals('sub', r.SUB)
00086         self.assertEquals('srv', r.SRV)        
00087     def test_RegistrationListener(self):
00088         from rospy.impl.registration import RegistrationListener
00089         #RegistrationListener is just an API, nothing to test here
00090         l = RegistrationListener()
00091         l.reg_added('name', 'data_type', 'reg_type')
00092         l.reg_removed('name', 'data_type', 'reg_type')
00093         
00094     def test_RegistrationListeners(self):
00095         from rospy.impl.registration import RegistrationListeners, RegistrationListener
00096 
00097         class Mock(RegistrationListener):
00098             def __init__(self):
00099                 self.args = []
00100             def reg_added(self, name, data_type_or_uri, reg_type): 
00101                 self.args = ['added', name, data_type_or_uri, reg_type]
00102             def reg_removed(self, name, data_type_or_uri, reg_type):
00103                 self.args = ['removed', name, data_type_or_uri, reg_type]
00104         class BadMock(RegistrationListener):
00105             def reg_added(self, name, data_type_or_uri, reg_type):
00106                 raise Exception("haha!")
00107             def reg_removed(self, name, data_type_or_uri, reg_type):
00108                 raise Exception("haha!")                
00109 
00110         r = RegistrationListeners()
00111         self.assertEquals([], r.listeners)
00112 
00113         try:
00114             r.lock.acquire()
00115         finally:
00116             r.lock.release()
00117 
00118         r.notify_added('n1', 'dtype1', 'rtype1')
00119         r.notify_removed('n1', 'dtype1', 'rtype1')
00120         l1 = Mock()
00121         l2 = Mock()
00122         
00123         r.add_listener(l1)
00124         self.assertEquals([l1], r.listeners)
00125 
00126         self.assertEquals([], l1.args)
00127         r.notify_added('n2', 'dtype2', 'rtype2')
00128         self.assertEquals(['added', 'n2', 'dtype2', 'rtype2'], l1.args)
00129         r.notify_removed('n2', 'dtype2', 'rtype2')
00130         self.assertEquals(['removed', 'n2', 'dtype2', 'rtype2'], l1.args)
00131 
00132         r.add_listener(l2)
00133         self.assert_(l2 in r.listeners)
00134         self.assert_(l1 in r.listeners)
00135         self.assertEquals(2, len(r.listeners))
00136 
00137         self.assertEquals([], l2.args)
00138         r.notify_added('n3', 'dtype3', 'rtype3')
00139         self.assertEquals(['added', 'n3', 'dtype3', 'rtype3'], l2.args)        
00140         self.assertEquals(['added', 'n3', 'dtype3', 'rtype3'], l1.args)
00141         r.notify_removed('n3', 'dtype3', 'rtype3')
00142         self.assertEquals(['removed', 'n3', 'dtype3', 'rtype3'], l2.args)
00143         self.assertEquals(['removed', 'n3', 'dtype3', 'rtype3'], l1.args)
00144 
00145         # l3 raises exceptions, make sure they don't break anything
00146         l3 = BadMock()
00147         r.add_listener(l3)
00148         self.assert_(l3 in r.listeners)
00149         self.assert_(l2 in r.listeners)
00150         self.assert_(l1 in r.listeners)
00151         self.assertEquals(3, len(r.listeners))
00152 
00153         r.notify_added('n4', 'dtype4', 'rtype4')
00154         self.assertEquals(['added', 'n4', 'dtype4', 'rtype4'], l2.args)        
00155         self.assertEquals(['added', 'n4', 'dtype4', 'rtype4'], l1.args)
00156         r.notify_removed('n4', 'dtype4', 'rtype4')
00157         self.assertEquals(['removed', 'n4', 'dtype4', 'rtype4'], l2.args)
00158         self.assertEquals(['removed', 'n4', 'dtype4', 'rtype4'], l1.args)
00159         
00160     def test_get_registration_listeners(self):
00161         from rospy.impl.registration import RegistrationListeners, get_registration_listeners
00162         r = get_registration_listeners()
00163         self.assert_(isinstance(r, RegistrationListeners))
00164 
00165     def test_RegManager(self):
00166         from rospy.impl.registration import RegManager
00167         class MockHandler(object):
00168             def __init__(self):
00169                 self.done = False
00170                 self.args = []
00171             def _connect_topic(self, topic, uri):
00172                 self.args.append(['_connect_topic', topic, uri])
00173                 # trip to done on connect topic so we can exit loop
00174                 self.done = True
00175                 return 1, 'msg', 1
00176         # bad return value for _connect_topic
00177         class BadHandler(object):
00178             def __init__(self):
00179                 self.done = False
00180             def _connect_topic(self, topic, uri):
00181                 self.done = True
00182                 return None
00183         # bad code for _connect_topic
00184         class BadHandler2(object):
00185             def __init__(self):
00186                 self.done = False
00187             def _connect_topic(self, topic, uri):
00188                 self.done = True
00189                 return -1, "failed", 1
00190 
00191         handler = MockHandler()
00192         m = RegManager(handler)
00193         self.assertEquals(handler, m.handler)
00194         self.assert_(m.logger is not None)
00195         self.assertEquals(m.master_uri, None)
00196         self.assertEquals(m.uri, None)
00197         self.assertEquals([], m.updates)
00198         try:
00199             m.cond.acquire()
00200         finally:
00201             m.cond.release()
00202 
00203         # call twice with topic 2 to test filtering logic
00204 
00205         m.publisher_update('topic1', ['http://uri:1', 'http://uri:1b'])
00206         m.publisher_update('topic2', ['http://old:2', 'http://old:2b'])
00207         m.publisher_update('topic1b', ['http://foo:1', 'http://foo:1b'])
00208         m.publisher_update('topic2', ['http://uri:2', 'http://uri:2b'])
00209         self.assertEquals([('topic1', ['http://uri:1', 'http://uri:1b']),
00210                            ('topic2', ['http://old:2', 'http://old:2b']),
00211                            ('topic1b', ['http://foo:1', 'http://foo:1b']),
00212                            ('topic2', ['http://uri:2', 'http://uri:2b'])], m.updates)
00213 
00214         # disabling these tests as they are just too dangerous now that registrations multithreads updates
00215         if 0:
00216             # this should only go through once as MockHandler trips to done
00217             m.run()
00218 
00219             # have to give time for threads to spin up and call handler
00220             timeout_t = 10. + time.time()
00221             while time.time() < timeout_t and len(m.updates) > 2:
00222                 time.sleep(0.1)
00223 
00224             m.handler = BadHandler()
00225             # this should only go through once as BadHandler trips to done. this tests the
00226             # exception branch
00227             m.run()
00228 
00229             # this will cause an error to be logged in _connect_topic_thread
00230             # - reset data in m.updates
00231             m.updates= [('topic1', ['http://uri:1', 'http://uri:1b']),
00232                         ('topic1b', ['http://foo:1', 'http://foo:1b'])]
00233             m.handler = BadHandler2()
00234             m.run()
00235         
00236         # m.start should return immediately as m.master_uri is not set
00237         self.assertEquals(m.master_uri, None)
00238         m.start(None, None)
00239         # test that it returns if URIs are equal
00240         m.start('http://localhost:1234', 'http://localhost:1234')
00241 
00242         # - test with is_shutdown overriden so we don't enter loop
00243         def foo():
00244             return True
00245         sys.modules['rospy.impl.registration'].__dict__['is_shutdown'] = foo
00246         m.start('http://localhost:1234', 'http://localhost:4567')
00247         
00248         handler.done = True
00249         # handler done is true, so this should not run
00250         m.run()
00251         # no master uri, so these should just return
00252         m.master_uri = None
00253         m.reg_added('n1', 'type1', 'rtype1')
00254         m.reg_removed('n1', 'type1', 'rtype1')
00255         m.cleanup('reason')
00256 


test_rospy
Author(s): Ken Conley
autogenerated on Fri Aug 28 2015 12:33:56