43 from rospy.impl.registration
import get_topic_manager, set_topic_manager
46 orig = get_topic_manager()
48 self.assert_(orig
is not None)
49 class TopicManager(object):
pass 53 self.assertEquals(x, get_topic_manager())
54 set_topic_manager(
None)
55 self.assert_(get_topic_manager()
is None)
57 set_topic_manager(orig)
60 from rospy.impl.registration
import get_service_manager, set_service_manager
64 orig = get_service_manager()
65 self.assert_(orig
is not None)
66 class ServiceManager(object):
pass 69 set_service_manager(x)
70 self.assertEquals(x, get_service_manager())
71 set_service_manager(
None)
72 self.assert_(get_service_manager()
is None)
74 set_service_manager(orig)
78 from rospy.impl.registration
import Registration
79 self.assertEquals(
'pub', Registration.PUB)
80 self.assertEquals(
'sub', Registration.SUB)
81 self.assertEquals(
'srv', Registration.SRV)
83 self.assertEquals(
'pub', r.PUB)
84 self.assertEquals(
'sub', r.SUB)
85 self.assertEquals(
'srv', r.SRV)
87 from rospy.impl.registration
import RegistrationListener
89 l = RegistrationListener()
90 l.reg_added(
'name',
'data_type',
'reg_type')
91 l.reg_removed(
'name',
'data_type',
'reg_type')
94 from rospy.impl.registration
import RegistrationListeners, RegistrationListener
96 class Mock(RegistrationListener):
99 def reg_added(self, name, data_type_or_uri, reg_type):
100 self.
args = [
'added', name, data_type_or_uri, reg_type]
101 def reg_removed(self, name, data_type_or_uri, reg_type):
102 self.
args = [
'removed', name, data_type_or_uri, reg_type]
103 class BadMock(RegistrationListener):
104 def reg_added(self, name, data_type_or_uri, reg_type):
105 raise Exception(
"haha!")
106 def reg_removed(self, name, data_type_or_uri, reg_type):
107 raise Exception(
"haha!")
109 r = RegistrationListeners()
110 self.assertEquals([], r.listeners)
117 r.notify_added(
'n1',
'dtype1',
'rtype1')
118 r.notify_removed(
'n1',
'dtype1',
'rtype1')
123 self.assertEquals([l1], r.listeners)
125 self.assertEquals([], l1.args)
126 r.notify_added(
'n2',
'dtype2',
'rtype2')
127 self.assertEquals([
'added',
'n2',
'dtype2',
'rtype2'], l1.args)
128 r.notify_removed(
'n2',
'dtype2',
'rtype2')
129 self.assertEquals([
'removed',
'n2',
'dtype2',
'rtype2'], l1.args)
132 self.assert_(l2
in r.listeners)
133 self.assert_(l1
in r.listeners)
134 self.assertEquals(2, len(r.listeners))
136 self.assertEquals([], l2.args)
137 r.notify_added(
'n3',
'dtype3',
'rtype3')
138 self.assertEquals([
'added',
'n3',
'dtype3',
'rtype3'], l2.args)
139 self.assertEquals([
'added',
'n3',
'dtype3',
'rtype3'], l1.args)
140 r.notify_removed(
'n3',
'dtype3',
'rtype3')
141 self.assertEquals([
'removed',
'n3',
'dtype3',
'rtype3'], l2.args)
142 self.assertEquals([
'removed',
'n3',
'dtype3',
'rtype3'], l1.args)
147 self.assert_(l3
in r.listeners)
148 self.assert_(l2
in r.listeners)
149 self.assert_(l1
in r.listeners)
150 self.assertEquals(3, len(r.listeners))
152 r.notify_added(
'n4',
'dtype4',
'rtype4')
153 self.assertEquals([
'added',
'n4',
'dtype4',
'rtype4'], l2.args)
154 self.assertEquals([
'added',
'n4',
'dtype4',
'rtype4'], l1.args)
155 r.notify_removed(
'n4',
'dtype4',
'rtype4')
156 self.assertEquals([
'removed',
'n4',
'dtype4',
'rtype4'], l2.args)
157 self.assertEquals([
'removed',
'n4',
'dtype4',
'rtype4'], l1.args)
160 from rospy.impl.registration
import RegistrationListeners, get_registration_listeners
161 r = get_registration_listeners()
162 self.assert_(isinstance(r, RegistrationListeners))
165 from rospy.impl.registration
import RegManager
166 class MockHandler(object):
170 def _connect_topic(self, topic, uri):
171 self.args.append([
'_connect_topic', topic, uri])
176 class BadHandler(object):
179 def _connect_topic(self, topic, uri):
183 class BadHandler2(object):
186 def _connect_topic(self, topic, uri):
188 return -1,
"failed", 1
190 handler = MockHandler()
191 m = RegManager(handler)
192 self.assertEquals(handler, m.handler)
193 self.assert_(m.logger
is not None)
194 self.assertEquals(m.master_uri,
None)
195 self.assertEquals(m.uri,
None)
196 self.assertEquals([], m.updates)
204 m.publisher_update(
'topic1', [
'http://uri:1',
'http://uri:1b'])
205 m.publisher_update(
'topic2', [
'http://old:2',
'http://old:2b'])
206 m.publisher_update(
'topic1b', [
'http://foo:1',
'http://foo:1b'])
207 m.publisher_update(
'topic2', [
'http://uri:2',
'http://uri:2b'])
208 self.assertEquals([(
'topic1', [
'http://uri:1',
'http://uri:1b']),
209 (
'topic2', [
'http://old:2',
'http://old:2b']),
210 (
'topic1b', [
'http://foo:1',
'http://foo:1b']),
211 (
'topic2', [
'http://uri:2',
'http://uri:2b'])], m.updates)
219 timeout_t = 10. + time.time()
220 while time.time() < timeout_t
and len(m.updates) > 2:
223 m.handler = BadHandler()
230 m.updates= [(
'topic1', [
'http://uri:1',
'http://uri:1b']),
231 (
'topic1b', [
'http://foo:1',
'http://foo:1b'])]
232 m.handler = BadHandler2()
236 self.assertEquals(m.master_uri,
None)
239 m.start(
'http://localhost:1234',
'http://localhost:1234')
244 sys.modules[
'rospy.impl.registration'].__dict__[
'is_shutdown'] = foo
245 m.start(
'http://localhost:1234',
'http://localhost:4567')
252 m.reg_added(
'n1',
'type1',
'rtype1')
253 m.reg_removed(
'n1',
'type1',
'rtype1')
def test_get_registration_listeners(self)
def test_RegistrationListeners(self)
def test_get_set_service_manager(self)
def test_Registration(self)
def test_RegManager(self)
def test_RegistrationListener(self)
def test_get_set_topic_manager(self)