test_rospy_registration.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2009, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 
34 import os
35 import sys
36 import struct
37 import unittest
38 import time
39 
40 class TestRospyRegistration(unittest.TestCase):
41 
43  from rospy.impl.registration import get_topic_manager, set_topic_manager
44  # rospy initialization sets this, but it is out of scope of
45  # rospy.impl.registrations to test its value
46  orig = get_topic_manager()
47  try:
48  self.assert_(orig is not None)
49  class TopicManager(object): pass
50  x = TopicManager()
51  # currently untyped
52  set_topic_manager(x)
53  self.assertEquals(x, get_topic_manager())
54  set_topic_manager(None)
55  self.assert_(get_topic_manager() is None)
56  finally:
57  set_topic_manager(orig)
58 
60  from rospy.impl.registration import get_service_manager, set_service_manager
61  # rospy initialization sets this, but it is out of scope of
62  # rospy.impl.registrations to test its value
63  try:
64  orig = get_service_manager()
65  self.assert_(orig is not None)
66  class ServiceManager(object): pass
67  x = ServiceManager()
68  # currently untyped
69  set_service_manager(x)
70  self.assertEquals(x, get_service_manager())
71  set_service_manager(None)
72  self.assert_(get_service_manager() is None)
73  finally:
74  set_service_manager(orig)
75 
76  def test_Registration(self):
77  # nothing to test here really
78  from rospy.impl.registration import Registration
79  self.assertEquals('pub', Registration.PUB)
80  self.assertEquals('sub', Registration.SUB)
81  self.assertEquals('srv', Registration.SRV)
82  r = Registration()
83  self.assertEquals('pub', r.PUB)
84  self.assertEquals('sub', r.SUB)
85  self.assertEquals('srv', r.SRV)
87  from rospy.impl.registration import RegistrationListener
88  #RegistrationListener is just an API, nothing to test here
89  l = RegistrationListener()
90  l.reg_added('name', 'data_type', 'reg_type')
91  l.reg_removed('name', 'data_type', 'reg_type')
92 
94  from rospy.impl.registration import RegistrationListeners, RegistrationListener
95 
96  class Mock(RegistrationListener):
97  def __init__(self):
98  self.args = []
99  def reg_added(self, name, data_type_or_uri, reg_type):
100  self.args = ['added', name, data_type_or_uri, reg_type]
101  def reg_removed(self, name, data_type_or_uri, reg_type):
102  self.args = ['removed', name, data_type_or_uri, reg_type]
103  class BadMock(RegistrationListener):
104  def reg_added(self, name, data_type_or_uri, reg_type):
105  raise Exception("haha!")
106  def reg_removed(self, name, data_type_or_uri, reg_type):
107  raise Exception("haha!")
108 
109  r = RegistrationListeners()
110  self.assertEquals([], r.listeners)
111 
112  try:
113  r.lock.acquire()
114  finally:
115  r.lock.release()
116 
117  r.notify_added('n1', 'dtype1', 'rtype1')
118  r.notify_removed('n1', 'dtype1', 'rtype1')
119  l1 = Mock()
120  l2 = Mock()
121 
122  r.add_listener(l1)
123  self.assertEquals([l1], r.listeners)
124 
125  self.assertEquals([], l1.args)
126  r.notify_added('n2', 'dtype2', 'rtype2')
127  self.assertEquals(['added', 'n2', 'dtype2', 'rtype2'], l1.args)
128  r.notify_removed('n2', 'dtype2', 'rtype2')
129  self.assertEquals(['removed', 'n2', 'dtype2', 'rtype2'], l1.args)
130 
131  r.add_listener(l2)
132  self.assert_(l2 in r.listeners)
133  self.assert_(l1 in r.listeners)
134  self.assertEquals(2, len(r.listeners))
135 
136  self.assertEquals([], l2.args)
137  r.notify_added('n3', 'dtype3', 'rtype3')
138  self.assertEquals(['added', 'n3', 'dtype3', 'rtype3'], l2.args)
139  self.assertEquals(['added', 'n3', 'dtype3', 'rtype3'], l1.args)
140  r.notify_removed('n3', 'dtype3', 'rtype3')
141  self.assertEquals(['removed', 'n3', 'dtype3', 'rtype3'], l2.args)
142  self.assertEquals(['removed', 'n3', 'dtype3', 'rtype3'], l1.args)
143 
144  # l3 raises exceptions, make sure they don't break anything
145  l3 = BadMock()
146  r.add_listener(l3)
147  self.assert_(l3 in r.listeners)
148  self.assert_(l2 in r.listeners)
149  self.assert_(l1 in r.listeners)
150  self.assertEquals(3, len(r.listeners))
151 
152  r.notify_added('n4', 'dtype4', 'rtype4')
153  self.assertEquals(['added', 'n4', 'dtype4', 'rtype4'], l2.args)
154  self.assertEquals(['added', 'n4', 'dtype4', 'rtype4'], l1.args)
155  r.notify_removed('n4', 'dtype4', 'rtype4')
156  self.assertEquals(['removed', 'n4', 'dtype4', 'rtype4'], l2.args)
157  self.assertEquals(['removed', 'n4', 'dtype4', 'rtype4'], l1.args)
158 
160  from rospy.impl.registration import RegistrationListeners, get_registration_listeners
161  r = get_registration_listeners()
162  self.assert_(isinstance(r, RegistrationListeners))
163 
164  def test_RegManager(self):
165  from rospy.impl.registration import RegManager
166  class MockHandler(object):
167  def __init__(self):
168  self.done = False
169  self.args = []
170  def _connect_topic(self, topic, uri):
171  self.args.append(['_connect_topic', topic, uri])
172  # trip to done on connect topic so we can exit loop
173  self.done = True
174  return 1, 'msg', 1
175  # bad return value for _connect_topic
176  class BadHandler(object):
177  def __init__(self):
178  self.done = False
179  def _connect_topic(self, topic, uri):
180  self.done = True
181  return None
182  # bad code for _connect_topic
183  class BadHandler2(object):
184  def __init__(self):
185  self.done = False
186  def _connect_topic(self, topic, uri):
187  self.done = True
188  return -1, "failed", 1
189 
190  handler = MockHandler()
191  m = RegManager(handler)
192  self.assertEquals(handler, m.handler)
193  self.assert_(m.logger is not None)
194  self.assertEquals(m.master_uri, None)
195  self.assertEquals(m.uri, None)
196  self.assertEquals([], m.updates)
197  try:
198  m.cond.acquire()
199  finally:
200  m.cond.release()
201 
202  # call twice with topic 2 to test filtering logic
203 
204  m.publisher_update('topic1', ['http://uri:1', 'http://uri:1b'])
205  m.publisher_update('topic2', ['http://old:2', 'http://old:2b'])
206  m.publisher_update('topic1b', ['http://foo:1', 'http://foo:1b'])
207  m.publisher_update('topic2', ['http://uri:2', 'http://uri:2b'])
208  self.assertEquals([('topic1', ['http://uri:1', 'http://uri:1b']),
209  ('topic2', ['http://old:2', 'http://old:2b']),
210  ('topic1b', ['http://foo:1', 'http://foo:1b']),
211  ('topic2', ['http://uri:2', 'http://uri:2b'])], m.updates)
212 
213  # disabling these tests as they are just too dangerous now that registrations multithreads updates
214  if 0:
215  # this should only go through once as MockHandler trips to done
216  m.run()
217 
218  # have to give time for threads to spin up and call handler
219  timeout_t = 10. + time.time()
220  while time.time() < timeout_t and len(m.updates) > 2:
221  time.sleep(0.1)
222 
223  m.handler = BadHandler()
224  # this should only go through once as BadHandler trips to done. this tests the
225  # exception branch
226  m.run()
227 
228  # this will cause an error to be logged in _connect_topic_thread
229  # - reset data in m.updates
230  m.updates= [('topic1', ['http://uri:1', 'http://uri:1b']),
231  ('topic1b', ['http://foo:1', 'http://foo:1b'])]
232  m.handler = BadHandler2()
233  m.run()
234 
235  # m.start should return immediately as m.master_uri is not set
236  self.assertEquals(m.master_uri, None)
237  m.start(None, None)
238  # test that it returns if URIs are equal
239  m.start('http://localhost:1234', 'http://localhost:1234')
240 
241  # - test with is_shutdown overriden so we don't enter loop
242  def foo():
243  return True
244  sys.modules['rospy.impl.registration'].__dict__['is_shutdown'] = foo
245  m.start('http://localhost:1234', 'http://localhost:4567')
246 
247  handler.done = True
248  # handler done is true, so this should not run
249  m.run()
250  # no master uri, so these should just return
251  m.master_uri = None
252  m.reg_added('n1', 'type1', 'rtype1')
253  m.reg_removed('n1', 'type1', 'rtype1')
254  m.cleanup('reason')
255 


test_rospy
Author(s): Ken Conley, Dirk Thomas
autogenerated on Mon Nov 2 2020 03:52:56