test_rospy_registration.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2009, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 
00034 import os
00035 import sys
00036 import struct
00037 import unittest
00038 import time
00039         
00040 class TestRospyRegistration(unittest.TestCase):
00041 
00042     def test_get_set_topic_manager(self):
00043         from rospy.impl.registration import get_topic_manager, set_topic_manager
00044         # rospy initialization sets this, but it is out of scope of
00045         # rospy.impl.registrations to test its value
00046         orig = get_topic_manager()
00047         try:
00048             self.assert_(orig is not None)
00049             class TopicManager(object): pass
00050             x = TopicManager()
00051         # currently untyped
00052             set_topic_manager(x)
00053             self.assertEquals(x, get_topic_manager())
00054             set_topic_manager(None)
00055             self.assert_(get_topic_manager() is None)
00056         finally:
00057             set_topic_manager(orig)            
00058 
00059     def test_get_set_service_manager(self):
00060         from rospy.impl.registration import get_service_manager, set_service_manager
00061         # rospy initialization sets this, but it is out of scope of
00062         # rospy.impl.registrations to test its value
00063         try:
00064             orig = get_service_manager()
00065             self.assert_(orig is not None)
00066             class ServiceManager(object): pass
00067             x = ServiceManager()
00068             # currently untyped
00069             set_service_manager(x)
00070             self.assertEquals(x, get_service_manager())
00071             set_service_manager(None)
00072             self.assert_(get_service_manager() is None)
00073         finally:
00074             set_service_manager(orig)            
00075         
00076     def test_Registration(self):
00077         # nothing to test here really
00078         from rospy.impl.registration import Registration
00079         self.assertEquals('pub', Registration.PUB)
00080         self.assertEquals('sub', Registration.SUB)
00081         self.assertEquals('srv', Registration.SRV)        
00082         r = Registration()
00083         self.assertEquals('pub', r.PUB)
00084         self.assertEquals('sub', r.SUB)
00085         self.assertEquals('srv', r.SRV)        
00086     def test_RegistrationListener(self):
00087         from rospy.impl.registration import RegistrationListener
00088         #RegistrationListener is just an API, nothing to test here
00089         l = RegistrationListener()
00090         l.reg_added('name', 'data_type', 'reg_type')
00091         l.reg_removed('name', 'data_type', 'reg_type')
00092         
00093     def test_RegistrationListeners(self):
00094         from rospy.impl.registration import RegistrationListeners, RegistrationListener
00095 
00096         class Mock(RegistrationListener):
00097             def __init__(self):
00098                 self.args = []
00099             def reg_added(self, name, data_type_or_uri, reg_type): 
00100                 self.args = ['added', name, data_type_or_uri, reg_type]
00101             def reg_removed(self, name, data_type_or_uri, reg_type):
00102                 self.args = ['removed', name, data_type_or_uri, reg_type]
00103         class BadMock(RegistrationListener):
00104             def reg_added(self, name, data_type_or_uri, reg_type):
00105                 raise Exception("haha!")
00106             def reg_removed(self, name, data_type_or_uri, reg_type):
00107                 raise Exception("haha!")                
00108 
00109         r = RegistrationListeners()
00110         self.assertEquals([], r.listeners)
00111 
00112         try:
00113             r.lock.acquire()
00114         finally:
00115             r.lock.release()
00116 
00117         r.notify_added('n1', 'dtype1', 'rtype1')
00118         r.notify_removed('n1', 'dtype1', 'rtype1')
00119         l1 = Mock()
00120         l2 = Mock()
00121         
00122         r.add_listener(l1)
00123         self.assertEquals([l1], r.listeners)
00124 
00125         self.assertEquals([], l1.args)
00126         r.notify_added('n2', 'dtype2', 'rtype2')
00127         self.assertEquals(['added', 'n2', 'dtype2', 'rtype2'], l1.args)
00128         r.notify_removed('n2', 'dtype2', 'rtype2')
00129         self.assertEquals(['removed', 'n2', 'dtype2', 'rtype2'], l1.args)
00130 
00131         r.add_listener(l2)
00132         self.assert_(l2 in r.listeners)
00133         self.assert_(l1 in r.listeners)
00134         self.assertEquals(2, len(r.listeners))
00135 
00136         self.assertEquals([], l2.args)
00137         r.notify_added('n3', 'dtype3', 'rtype3')
00138         self.assertEquals(['added', 'n3', 'dtype3', 'rtype3'], l2.args)        
00139         self.assertEquals(['added', 'n3', 'dtype3', 'rtype3'], l1.args)
00140         r.notify_removed('n3', 'dtype3', 'rtype3')
00141         self.assertEquals(['removed', 'n3', 'dtype3', 'rtype3'], l2.args)
00142         self.assertEquals(['removed', 'n3', 'dtype3', 'rtype3'], l1.args)
00143 
00144         # l3 raises exceptions, make sure they don't break anything
00145         l3 = BadMock()
00146         r.add_listener(l3)
00147         self.assert_(l3 in r.listeners)
00148         self.assert_(l2 in r.listeners)
00149         self.assert_(l1 in r.listeners)
00150         self.assertEquals(3, len(r.listeners))
00151 
00152         r.notify_added('n4', 'dtype4', 'rtype4')
00153         self.assertEquals(['added', 'n4', 'dtype4', 'rtype4'], l2.args)        
00154         self.assertEquals(['added', 'n4', 'dtype4', 'rtype4'], l1.args)
00155         r.notify_removed('n4', 'dtype4', 'rtype4')
00156         self.assertEquals(['removed', 'n4', 'dtype4', 'rtype4'], l2.args)
00157         self.assertEquals(['removed', 'n4', 'dtype4', 'rtype4'], l1.args)
00158         
00159     def test_get_registration_listeners(self):
00160         from rospy.impl.registration import RegistrationListeners, get_registration_listeners
00161         r = get_registration_listeners()
00162         self.assert_(isinstance(r, RegistrationListeners))
00163 
00164     def test_RegManager(self):
00165         from rospy.impl.registration import RegManager
00166         class MockHandler(object):
00167             def __init__(self):
00168                 self.done = False
00169                 self.args = []
00170             def _connect_topic(self, topic, uri):
00171                 self.args.append(['_connect_topic', topic, uri])
00172                 # trip to done on connect topic so we can exit loop
00173                 self.done = True
00174                 return 1, 'msg', 1
00175         # bad return value for _connect_topic
00176         class BadHandler(object):
00177             def __init__(self):
00178                 self.done = False
00179             def _connect_topic(self, topic, uri):
00180                 self.done = True
00181                 return None
00182         # bad code for _connect_topic
00183         class BadHandler2(object):
00184             def __init__(self):
00185                 self.done = False
00186             def _connect_topic(self, topic, uri):
00187                 self.done = True
00188                 return -1, "failed", 1
00189 
00190         handler = MockHandler()
00191         m = RegManager(handler)
00192         self.assertEquals(handler, m.handler)
00193         self.assert_(m.logger is not None)
00194         self.assertEquals(m.master_uri, None)
00195         self.assertEquals(m.uri, None)
00196         self.assertEquals([], m.updates)
00197         try:
00198             m.cond.acquire()
00199         finally:
00200             m.cond.release()
00201 
00202         # call twice with topic 2 to test filtering logic
00203 
00204         m.publisher_update('topic1', ['http://uri:1', 'http://uri:1b'])
00205         m.publisher_update('topic2', ['http://old:2', 'http://old:2b'])
00206         m.publisher_update('topic1b', ['http://foo:1', 'http://foo:1b'])
00207         m.publisher_update('topic2', ['http://uri:2', 'http://uri:2b'])
00208         self.assertEquals([('topic1', ['http://uri:1', 'http://uri:1b']),
00209                            ('topic2', ['http://old:2', 'http://old:2b']),
00210                            ('topic1b', ['http://foo:1', 'http://foo:1b']),
00211                            ('topic2', ['http://uri:2', 'http://uri:2b'])], m.updates)
00212 
00213         # disabling these tests as they are just too dangerous now that registrations multithreads updates
00214         if 0:
00215             # this should only go through once as MockHandler trips to done
00216             m.run()
00217 
00218             # have to give time for threads to spin up and call handler
00219             timeout_t = 10. + time.time()
00220             while time.time() < timeout_t and len(m.updates) > 2:
00221                 time.sleep(0.1)
00222 
00223             m.handler = BadHandler()
00224             # this should only go through once as BadHandler trips to done. this tests the
00225             # exception branch
00226             m.run()
00227 
00228             # this will cause an error to be logged in _connect_topic_thread
00229             # - reset data in m.updates
00230             m.updates= [('topic1', ['http://uri:1', 'http://uri:1b']),
00231                         ('topic1b', ['http://foo:1', 'http://foo:1b'])]
00232             m.handler = BadHandler2()
00233             m.run()
00234         
00235         # m.start should return immediately as m.master_uri is not set
00236         self.assertEquals(m.master_uri, None)
00237         m.start(None, None)
00238         # test that it returns if URIs are equal
00239         m.start('http://localhost:1234', 'http://localhost:1234')
00240 
00241         # - test with is_shutdown overriden so we don't enter loop
00242         def foo():
00243             return True
00244         sys.modules['rospy.impl.registration'].__dict__['is_shutdown'] = foo
00245         m.start('http://localhost:1234', 'http://localhost:4567')
00246         
00247         handler.done = True
00248         # handler done is true, so this should not run
00249         m.run()
00250         # no master uri, so these should just return
00251         m.master_uri = None
00252         m.reg_added('n1', 'type1', 'rtype1')
00253         m.reg_removed('n1', 'type1', 'rtype1')
00254         m.cleanup('reason')
00255 


test_rospy
Author(s): Ken Conley
autogenerated on Tue Mar 7 2017 03:45:43