00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 import os
00035 import sys
00036 import struct
00037 import unittest
00038 import time
00039
00040 class TestRospyRegistration(unittest.TestCase):
00041
00042 def test_get_set_topic_manager(self):
00043 from rospy.impl.registration import get_topic_manager, set_topic_manager
00044
00045
00046 orig = get_topic_manager()
00047 try:
00048 self.assert_(orig is not None)
00049 class TopicManager(object): pass
00050 x = TopicManager()
00051
00052 set_topic_manager(x)
00053 self.assertEquals(x, get_topic_manager())
00054 set_topic_manager(None)
00055 self.assert_(get_topic_manager() is None)
00056 finally:
00057 set_topic_manager(orig)
00058
00059 def test_get_set_service_manager(self):
00060 from rospy.impl.registration import get_service_manager, set_service_manager
00061
00062
00063 try:
00064 orig = get_service_manager()
00065 self.assert_(orig is not None)
00066 class ServiceManager(object): pass
00067 x = ServiceManager()
00068
00069 set_service_manager(x)
00070 self.assertEquals(x, get_service_manager())
00071 set_service_manager(None)
00072 self.assert_(get_service_manager() is None)
00073 finally:
00074 set_service_manager(orig)
00075
00076 def test_Registration(self):
00077
00078 from rospy.impl.registration import Registration
00079 self.assertEquals('pub', Registration.PUB)
00080 self.assertEquals('sub', Registration.SUB)
00081 self.assertEquals('srv', Registration.SRV)
00082 r = Registration()
00083 self.assertEquals('pub', r.PUB)
00084 self.assertEquals('sub', r.SUB)
00085 self.assertEquals('srv', r.SRV)
00086 def test_RegistrationListener(self):
00087 from rospy.impl.registration import RegistrationListener
00088
00089 l = RegistrationListener()
00090 l.reg_added('name', 'data_type', 'reg_type')
00091 l.reg_removed('name', 'data_type', 'reg_type')
00092
00093 def test_RegistrationListeners(self):
00094 from rospy.impl.registration import RegistrationListeners, RegistrationListener
00095
00096 class Mock(RegistrationListener):
00097 def __init__(self):
00098 self.args = []
00099 def reg_added(self, name, data_type_or_uri, reg_type):
00100 self.args = ['added', name, data_type_or_uri, reg_type]
00101 def reg_removed(self, name, data_type_or_uri, reg_type):
00102 self.args = ['removed', name, data_type_or_uri, reg_type]
00103 class BadMock(RegistrationListener):
00104 def reg_added(self, name, data_type_or_uri, reg_type):
00105 raise Exception("haha!")
00106 def reg_removed(self, name, data_type_or_uri, reg_type):
00107 raise Exception("haha!")
00108
00109 r = RegistrationListeners()
00110 self.assertEquals([], r.listeners)
00111
00112 try:
00113 r.lock.acquire()
00114 finally:
00115 r.lock.release()
00116
00117 r.notify_added('n1', 'dtype1', 'rtype1')
00118 r.notify_removed('n1', 'dtype1', 'rtype1')
00119 l1 = Mock()
00120 l2 = Mock()
00121
00122 r.add_listener(l1)
00123 self.assertEquals([l1], r.listeners)
00124
00125 self.assertEquals([], l1.args)
00126 r.notify_added('n2', 'dtype2', 'rtype2')
00127 self.assertEquals(['added', 'n2', 'dtype2', 'rtype2'], l1.args)
00128 r.notify_removed('n2', 'dtype2', 'rtype2')
00129 self.assertEquals(['removed', 'n2', 'dtype2', 'rtype2'], l1.args)
00130
00131 r.add_listener(l2)
00132 self.assert_(l2 in r.listeners)
00133 self.assert_(l1 in r.listeners)
00134 self.assertEquals(2, len(r.listeners))
00135
00136 self.assertEquals([], l2.args)
00137 r.notify_added('n3', 'dtype3', 'rtype3')
00138 self.assertEquals(['added', 'n3', 'dtype3', 'rtype3'], l2.args)
00139 self.assertEquals(['added', 'n3', 'dtype3', 'rtype3'], l1.args)
00140 r.notify_removed('n3', 'dtype3', 'rtype3')
00141 self.assertEquals(['removed', 'n3', 'dtype3', 'rtype3'], l2.args)
00142 self.assertEquals(['removed', 'n3', 'dtype3', 'rtype3'], l1.args)
00143
00144
00145 l3 = BadMock()
00146 r.add_listener(l3)
00147 self.assert_(l3 in r.listeners)
00148 self.assert_(l2 in r.listeners)
00149 self.assert_(l1 in r.listeners)
00150 self.assertEquals(3, len(r.listeners))
00151
00152 r.notify_added('n4', 'dtype4', 'rtype4')
00153 self.assertEquals(['added', 'n4', 'dtype4', 'rtype4'], l2.args)
00154 self.assertEquals(['added', 'n4', 'dtype4', 'rtype4'], l1.args)
00155 r.notify_removed('n4', 'dtype4', 'rtype4')
00156 self.assertEquals(['removed', 'n4', 'dtype4', 'rtype4'], l2.args)
00157 self.assertEquals(['removed', 'n4', 'dtype4', 'rtype4'], l1.args)
00158
00159 def test_get_registration_listeners(self):
00160 from rospy.impl.registration import RegistrationListeners, get_registration_listeners
00161 r = get_registration_listeners()
00162 self.assert_(isinstance(r, RegistrationListeners))
00163
00164 def test_RegManager(self):
00165 from rospy.impl.registration import RegManager
00166 class MockHandler(object):
00167 def __init__(self):
00168 self.done = False
00169 self.args = []
00170 def _connect_topic(self, topic, uri):
00171 self.args.append(['_connect_topic', topic, uri])
00172
00173 self.done = True
00174 return 1, 'msg', 1
00175
00176 class BadHandler(object):
00177 def __init__(self):
00178 self.done = False
00179 def _connect_topic(self, topic, uri):
00180 self.done = True
00181 return None
00182
00183 class BadHandler2(object):
00184 def __init__(self):
00185 self.done = False
00186 def _connect_topic(self, topic, uri):
00187 self.done = True
00188 return -1, "failed", 1
00189
00190 handler = MockHandler()
00191 m = RegManager(handler)
00192 self.assertEquals(handler, m.handler)
00193 self.assert_(m.logger is not None)
00194 self.assertEquals(m.master_uri, None)
00195 self.assertEquals(m.uri, None)
00196 self.assertEquals([], m.updates)
00197 try:
00198 m.cond.acquire()
00199 finally:
00200 m.cond.release()
00201
00202
00203
00204 m.publisher_update('topic1', ['http://uri:1', 'http://uri:1b'])
00205 m.publisher_update('topic2', ['http://old:2', 'http://old:2b'])
00206 m.publisher_update('topic1b', ['http://foo:1', 'http://foo:1b'])
00207 m.publisher_update('topic2', ['http://uri:2', 'http://uri:2b'])
00208 self.assertEquals([('topic1', ['http://uri:1', 'http://uri:1b']),
00209 ('topic2', ['http://old:2', 'http://old:2b']),
00210 ('topic1b', ['http://foo:1', 'http://foo:1b']),
00211 ('topic2', ['http://uri:2', 'http://uri:2b'])], m.updates)
00212
00213
00214 if 0:
00215
00216 m.run()
00217
00218
00219 timeout_t = 10. + time.time()
00220 while time.time() < timeout_t and len(m.updates) > 2:
00221 time.sleep(0.1)
00222
00223 m.handler = BadHandler()
00224
00225
00226 m.run()
00227
00228
00229
00230 m.updates= [('topic1', ['http://uri:1', 'http://uri:1b']),
00231 ('topic1b', ['http://foo:1', 'http://foo:1b'])]
00232 m.handler = BadHandler2()
00233 m.run()
00234
00235
00236 self.assertEquals(m.master_uri, None)
00237 m.start(None, None)
00238
00239 m.start('http://localhost:1234', 'http://localhost:1234')
00240
00241
00242 def foo():
00243 return True
00244 sys.modules['rospy.impl.registration'].__dict__['is_shutdown'] = foo
00245 m.start('http://localhost:1234', 'http://localhost:4567')
00246
00247 handler.done = True
00248
00249 m.run()
00250
00251 m.master_uri = None
00252 m.reg_added('n1', 'type1', 'rtype1')
00253 m.reg_removed('n1', 'type1', 'rtype1')
00254 m.cleanup('reason')
00255