00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 import roslib; roslib.load_manifest('test_rospy')
00037
00038 import os
00039 import sys
00040 import struct
00041 import unittest
00042 import time
00043 import cStringIO
00044
00045 class TestRospyRegistration(unittest.TestCase):
00046
00047 def test_get_set_topic_manager(self):
00048 from rospy.impl.registration import get_topic_manager, set_topic_manager
00049
00050
00051 orig = get_topic_manager()
00052 try:
00053 self.assert_(orig is not None)
00054 class TopicManager(object): pass
00055 x = TopicManager()
00056
00057 set_topic_manager(x)
00058 self.assertEquals(x, get_topic_manager())
00059 set_topic_manager(None)
00060 self.assert_(get_topic_manager() is None)
00061 finally:
00062 set_topic_manager(orig)
00063
00064 def test_get_set_service_manager(self):
00065 from rospy.impl.registration import get_service_manager, set_service_manager
00066
00067
00068 try:
00069 orig = get_service_manager()
00070 self.assert_(orig is not None)
00071 class ServiceManager(object): pass
00072 x = ServiceManager()
00073
00074 set_service_manager(x)
00075 self.assertEquals(x, get_service_manager())
00076 set_service_manager(None)
00077 self.assert_(get_service_manager() is None)
00078 finally:
00079 set_service_manager(orig)
00080
00081 def test_Registration(self):
00082
00083 from rospy.impl.registration import Registration
00084 self.assertEquals('pub', Registration.PUB)
00085 self.assertEquals('sub', Registration.SUB)
00086 self.assertEquals('srv', Registration.SRV)
00087 r = Registration()
00088 self.assertEquals('pub', r.PUB)
00089 self.assertEquals('sub', r.SUB)
00090 self.assertEquals('srv', r.SRV)
00091 def test_RegistrationListener(self):
00092 from rospy.impl.registration import RegistrationListener
00093
00094 l = RegistrationListener()
00095 l.reg_added('name', 'data_type', 'reg_type')
00096 l.reg_removed('name', 'data_type', 'reg_type')
00097
00098 def test_RegistrationListeners(self):
00099 from rospy.impl.registration import RegistrationListeners, RegistrationListener
00100
00101 class Mock(RegistrationListener):
00102 def __init__(self):
00103 self.args = []
00104 def reg_added(self, name, data_type_or_uri, reg_type):
00105 self.args = ['added', name, data_type_or_uri, reg_type]
00106 def reg_removed(self, name, data_type_or_uri, reg_type):
00107 self.args = ['removed', name, data_type_or_uri, reg_type]
00108 class BadMock(RegistrationListener):
00109 def reg_added(self, name, data_type_or_uri, reg_type):
00110 raise Exception("haha!")
00111 def reg_removed(self, name, data_type_or_uri, reg_type):
00112 raise Exception("haha!")
00113
00114 r = RegistrationListeners()
00115 self.assertEquals([], r.listeners)
00116
00117 try:
00118 r.lock.acquire()
00119 finally:
00120 r.lock.release()
00121
00122 r.notify_added('n1', 'dtype1', 'rtype1')
00123 r.notify_removed('n1', 'dtype1', 'rtype1')
00124 l1 = Mock()
00125 l2 = Mock()
00126
00127 r.add_listener(l1)
00128 self.assertEquals([l1], r.listeners)
00129
00130 self.assertEquals([], l1.args)
00131 r.notify_added('n2', 'dtype2', 'rtype2')
00132 self.assertEquals(['added', 'n2', 'dtype2', 'rtype2'], l1.args)
00133 r.notify_removed('n2', 'dtype2', 'rtype2')
00134 self.assertEquals(['removed', 'n2', 'dtype2', 'rtype2'], l1.args)
00135
00136 r.add_listener(l2)
00137 self.assert_(l2 in r.listeners)
00138 self.assert_(l1 in r.listeners)
00139 self.assertEquals(2, len(r.listeners))
00140
00141 self.assertEquals([], l2.args)
00142 r.notify_added('n3', 'dtype3', 'rtype3')
00143 self.assertEquals(['added', 'n3', 'dtype3', 'rtype3'], l2.args)
00144 self.assertEquals(['added', 'n3', 'dtype3', 'rtype3'], l1.args)
00145 r.notify_removed('n3', 'dtype3', 'rtype3')
00146 self.assertEquals(['removed', 'n3', 'dtype3', 'rtype3'], l2.args)
00147 self.assertEquals(['removed', 'n3', 'dtype3', 'rtype3'], l1.args)
00148
00149
00150 l3 = BadMock()
00151 r.add_listener(l3)
00152 self.assert_(l3 in r.listeners)
00153 self.assert_(l2 in r.listeners)
00154 self.assert_(l1 in r.listeners)
00155 self.assertEquals(3, len(r.listeners))
00156
00157 r.notify_added('n4', 'dtype4', 'rtype4')
00158 self.assertEquals(['added', 'n4', 'dtype4', 'rtype4'], l2.args)
00159 self.assertEquals(['added', 'n4', 'dtype4', 'rtype4'], l1.args)
00160 r.notify_removed('n4', 'dtype4', 'rtype4')
00161 self.assertEquals(['removed', 'n4', 'dtype4', 'rtype4'], l2.args)
00162 self.assertEquals(['removed', 'n4', 'dtype4', 'rtype4'], l1.args)
00163
00164 def test_get_registration_listeners(self):
00165 from rospy.impl.registration import RegistrationListeners, get_registration_listeners
00166 r = get_registration_listeners()
00167 self.assert_(isinstance(r, RegistrationListeners))
00168
00169 def test_RegManager(self):
00170 from rospy.impl.registration import RegManager
00171 class MockHandler(object):
00172 def __init__(self):
00173 self.done = False
00174 self.args = []
00175 def _connect_topic(self, topic, uri):
00176 self.args.append(['_connect_topic', topic, uri])
00177
00178 self.done = True
00179 return 1, 'msg', 1
00180
00181 class BadHandler(object):
00182 def __init__(self):
00183 self.done = False
00184 def _connect_topic(self, topic, uri):
00185 self.done = True
00186 return None
00187
00188 class BadHandler2(object):
00189 def __init__(self):
00190 self.done = False
00191 def _connect_topic(self, topic, uri):
00192 self.done = True
00193 return -1, "failed", 1
00194
00195 handler = MockHandler()
00196 m = RegManager(handler)
00197 self.assertEquals(handler, m.handler)
00198 self.assert_(m.logger is not None)
00199 self.assertEquals(m.master_uri, None)
00200 self.assertEquals(m.uri, None)
00201 self.assertEquals([], m.updates)
00202 try:
00203 m.cond.acquire()
00204 finally:
00205 m.cond.release()
00206
00207
00208
00209 m.publisher_update('topic1', ['http://uri:1', 'http://uri:1b'])
00210 m.publisher_update('topic2', ['http://old:2', 'http://old:2b'])
00211 m.publisher_update('topic1b', ['http://foo:1', 'http://foo:1b'])
00212 m.publisher_update('topic2', ['http://uri:2', 'http://uri:2b'])
00213 self.assertEquals([('topic1', ['http://uri:1', 'http://uri:1b']),
00214 ('topic2', ['http://old:2', 'http://old:2b']),
00215 ('topic1b', ['http://foo:1', 'http://foo:1b']),
00216 ('topic2', ['http://uri:2', 'http://uri:2b'])], m.updates)
00217
00218
00219 if 0:
00220
00221 m.run()
00222
00223
00224 timeout_t = 10. + time.time()
00225 while time.time() < timeout_t and len(m.updates) > 2:
00226 time.sleep(0.1)
00227
00228 m.handler = BadHandler()
00229
00230
00231 m.run()
00232
00233
00234
00235 m.updates= [('topic1', ['http://uri:1', 'http://uri:1b']),
00236 ('topic1b', ['http://foo:1', 'http://foo:1b'])]
00237 m.handler = BadHandler2()
00238 m.run()
00239
00240
00241 self.assertEquals(m.master_uri, None)
00242 m.start(None, None)
00243
00244 m.start('http://localhost:1234', 'http://localhost:1234')
00245
00246
00247 def foo():
00248 return True
00249 sys.modules['rospy.impl.registration'].__dict__['is_shutdown'] = foo
00250 m.start('http://localhost:1234', 'http://localhost:4567')
00251
00252 handler.done = True
00253
00254 m.run()
00255
00256 m.master_uri = None
00257 m.reg_added('n1', 'type1', 'rtype1')
00258 m.reg_removed('n1', 'type1', 'rtype1')
00259 m.cleanup('reason')
00260
00261
00262 if __name__ == '__main__':
00263 import rostest
00264 rostest.unitrun('test_rospy', sys.argv[0], TestRospyRegistration, coverage_packages=['rospy.impl.registration'])