00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 import os
00035 import sys
00036 import struct
00037 import unittest
00038 import time
00039 import cStringIO
00040
00041 class TestRospyRegistration(unittest.TestCase):
00042
00043 def test_get_set_topic_manager(self):
00044 from rospy.impl.registration import get_topic_manager, set_topic_manager
00045
00046
00047 orig = get_topic_manager()
00048 try:
00049 self.assert_(orig is not None)
00050 class TopicManager(object): pass
00051 x = TopicManager()
00052
00053 set_topic_manager(x)
00054 self.assertEquals(x, get_topic_manager())
00055 set_topic_manager(None)
00056 self.assert_(get_topic_manager() is None)
00057 finally:
00058 set_topic_manager(orig)
00059
00060 def test_get_set_service_manager(self):
00061 from rospy.impl.registration import get_service_manager, set_service_manager
00062
00063
00064 try:
00065 orig = get_service_manager()
00066 self.assert_(orig is not None)
00067 class ServiceManager(object): pass
00068 x = ServiceManager()
00069
00070 set_service_manager(x)
00071 self.assertEquals(x, get_service_manager())
00072 set_service_manager(None)
00073 self.assert_(get_service_manager() is None)
00074 finally:
00075 set_service_manager(orig)
00076
00077 def test_Registration(self):
00078
00079 from rospy.impl.registration import Registration
00080 self.assertEquals('pub', Registration.PUB)
00081 self.assertEquals('sub', Registration.SUB)
00082 self.assertEquals('srv', Registration.SRV)
00083 r = Registration()
00084 self.assertEquals('pub', r.PUB)
00085 self.assertEquals('sub', r.SUB)
00086 self.assertEquals('srv', r.SRV)
00087 def test_RegistrationListener(self):
00088 from rospy.impl.registration import RegistrationListener
00089
00090 l = RegistrationListener()
00091 l.reg_added('name', 'data_type', 'reg_type')
00092 l.reg_removed('name', 'data_type', 'reg_type')
00093
00094 def test_RegistrationListeners(self):
00095 from rospy.impl.registration import RegistrationListeners, RegistrationListener
00096
00097 class Mock(RegistrationListener):
00098 def __init__(self):
00099 self.args = []
00100 def reg_added(self, name, data_type_or_uri, reg_type):
00101 self.args = ['added', name, data_type_or_uri, reg_type]
00102 def reg_removed(self, name, data_type_or_uri, reg_type):
00103 self.args = ['removed', name, data_type_or_uri, reg_type]
00104 class BadMock(RegistrationListener):
00105 def reg_added(self, name, data_type_or_uri, reg_type):
00106 raise Exception("haha!")
00107 def reg_removed(self, name, data_type_or_uri, reg_type):
00108 raise Exception("haha!")
00109
00110 r = RegistrationListeners()
00111 self.assertEquals([], r.listeners)
00112
00113 try:
00114 r.lock.acquire()
00115 finally:
00116 r.lock.release()
00117
00118 r.notify_added('n1', 'dtype1', 'rtype1')
00119 r.notify_removed('n1', 'dtype1', 'rtype1')
00120 l1 = Mock()
00121 l2 = Mock()
00122
00123 r.add_listener(l1)
00124 self.assertEquals([l1], r.listeners)
00125
00126 self.assertEquals([], l1.args)
00127 r.notify_added('n2', 'dtype2', 'rtype2')
00128 self.assertEquals(['added', 'n2', 'dtype2', 'rtype2'], l1.args)
00129 r.notify_removed('n2', 'dtype2', 'rtype2')
00130 self.assertEquals(['removed', 'n2', 'dtype2', 'rtype2'], l1.args)
00131
00132 r.add_listener(l2)
00133 self.assert_(l2 in r.listeners)
00134 self.assert_(l1 in r.listeners)
00135 self.assertEquals(2, len(r.listeners))
00136
00137 self.assertEquals([], l2.args)
00138 r.notify_added('n3', 'dtype3', 'rtype3')
00139 self.assertEquals(['added', 'n3', 'dtype3', 'rtype3'], l2.args)
00140 self.assertEquals(['added', 'n3', 'dtype3', 'rtype3'], l1.args)
00141 r.notify_removed('n3', 'dtype3', 'rtype3')
00142 self.assertEquals(['removed', 'n3', 'dtype3', 'rtype3'], l2.args)
00143 self.assertEquals(['removed', 'n3', 'dtype3', 'rtype3'], l1.args)
00144
00145
00146 l3 = BadMock()
00147 r.add_listener(l3)
00148 self.assert_(l3 in r.listeners)
00149 self.assert_(l2 in r.listeners)
00150 self.assert_(l1 in r.listeners)
00151 self.assertEquals(3, len(r.listeners))
00152
00153 r.notify_added('n4', 'dtype4', 'rtype4')
00154 self.assertEquals(['added', 'n4', 'dtype4', 'rtype4'], l2.args)
00155 self.assertEquals(['added', 'n4', 'dtype4', 'rtype4'], l1.args)
00156 r.notify_removed('n4', 'dtype4', 'rtype4')
00157 self.assertEquals(['removed', 'n4', 'dtype4', 'rtype4'], l2.args)
00158 self.assertEquals(['removed', 'n4', 'dtype4', 'rtype4'], l1.args)
00159
00160 def test_get_registration_listeners(self):
00161 from rospy.impl.registration import RegistrationListeners, get_registration_listeners
00162 r = get_registration_listeners()
00163 self.assert_(isinstance(r, RegistrationListeners))
00164
00165 def test_RegManager(self):
00166 from rospy.impl.registration import RegManager
00167 class MockHandler(object):
00168 def __init__(self):
00169 self.done = False
00170 self.args = []
00171 def _connect_topic(self, topic, uri):
00172 self.args.append(['_connect_topic', topic, uri])
00173
00174 self.done = True
00175 return 1, 'msg', 1
00176
00177 class BadHandler(object):
00178 def __init__(self):
00179 self.done = False
00180 def _connect_topic(self, topic, uri):
00181 self.done = True
00182 return None
00183
00184 class BadHandler2(object):
00185 def __init__(self):
00186 self.done = False
00187 def _connect_topic(self, topic, uri):
00188 self.done = True
00189 return -1, "failed", 1
00190
00191 handler = MockHandler()
00192 m = RegManager(handler)
00193 self.assertEquals(handler, m.handler)
00194 self.assert_(m.logger is not None)
00195 self.assertEquals(m.master_uri, None)
00196 self.assertEquals(m.uri, None)
00197 self.assertEquals([], m.updates)
00198 try:
00199 m.cond.acquire()
00200 finally:
00201 m.cond.release()
00202
00203
00204
00205 m.publisher_update('topic1', ['http://uri:1', 'http://uri:1b'])
00206 m.publisher_update('topic2', ['http://old:2', 'http://old:2b'])
00207 m.publisher_update('topic1b', ['http://foo:1', 'http://foo:1b'])
00208 m.publisher_update('topic2', ['http://uri:2', 'http://uri:2b'])
00209 self.assertEquals([('topic1', ['http://uri:1', 'http://uri:1b']),
00210 ('topic2', ['http://old:2', 'http://old:2b']),
00211 ('topic1b', ['http://foo:1', 'http://foo:1b']),
00212 ('topic2', ['http://uri:2', 'http://uri:2b'])], m.updates)
00213
00214
00215 if 0:
00216
00217 m.run()
00218
00219
00220 timeout_t = 10. + time.time()
00221 while time.time() < timeout_t and len(m.updates) > 2:
00222 time.sleep(0.1)
00223
00224 m.handler = BadHandler()
00225
00226
00227 m.run()
00228
00229
00230
00231 m.updates= [('topic1', ['http://uri:1', 'http://uri:1b']),
00232 ('topic1b', ['http://foo:1', 'http://foo:1b'])]
00233 m.handler = BadHandler2()
00234 m.run()
00235
00236
00237 self.assertEquals(m.master_uri, None)
00238 m.start(None, None)
00239
00240 m.start('http://localhost:1234', 'http://localhost:1234')
00241
00242
00243 def foo():
00244 return True
00245 sys.modules['rospy.impl.registration'].__dict__['is_shutdown'] = foo
00246 m.start('http://localhost:1234', 'http://localhost:4567')
00247
00248 handler.done = True
00249
00250 m.run()
00251
00252 m.master_uri = None
00253 m.reg_added('n1', 'type1', 'rtype1')
00254 m.reg_removed('n1', 'type1', 'rtype1')
00255 m.cleanup('reason')
00256