$search
00001 #!/usr/bin/env python 00002 # Software License Agreement (BSD License) 00003 # 00004 # Copyright (c) 2008, Willow Garage, Inc. 00005 # All rights reserved. 00006 # 00007 # Redistribution and use in source and binary forms, with or without 00008 # modification, are permitted provided that the following conditions 00009 # are met: 00010 # 00011 # * Redistributions of source code must retain the above copyright 00012 # notice, this list of conditions and the following disclaimer. 00013 # * Redistributions in binary form must reproduce the above 00014 # copyright notice, this list of conditions and the following 00015 # disclaimer in the documentation and/or other materials provided 00016 # with the distribution. 00017 # * Neither the name of Willow Garage, Inc. nor the names of its 00018 # contributors may be used to endorse or promote products derived 00019 # from this software without specific prior written permission. 00020 # 00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 # POSSIBILITY OF SUCH DAMAGE. 00033 # 00034 # Revision $Id$ 00035 00036 import roslib; roslib.load_manifest('test_rosmaster') 00037 00038 import os 00039 import sys 00040 import struct 00041 import unittest 00042 import time 00043 00044 # mock of subscription tests 00045 class ThreadPoolMock(object): 00046 def queue_task(*args): pass 00047 00048 class TestRosmasterRegistrations(unittest.TestCase): 00049 00050 def test_NodeRef_services(self): 00051 from rosmaster.registrations import NodeRef, Registrations 00052 n = NodeRef('n1', 'http://localhost:1234') 00053 # test services 00054 n.add(Registrations.SERVICE, 'add_two_ints') 00055 self.failIf(n.is_empty()) 00056 self.assert_('add_two_ints' in n.services) 00057 self.assertEquals(['add_two_ints'], n.services) 00058 00059 n.add(Registrations.SERVICE, 'add_three_ints') 00060 self.failIf(n.is_empty()) 00061 self.assert_('add_three_ints' in n.services) 00062 self.assert_('add_two_ints' in n.services) 00063 00064 n.remove(Registrations.SERVICE, 'add_two_ints') 00065 self.assert_('add_three_ints' in n.services) 00066 self.assertEquals(['add_three_ints'], n.services) 00067 self.failIf('add_two_ints' in n.services) 00068 self.failIf(n.is_empty()) 00069 00070 n.remove(Registrations.SERVICE, 'add_three_ints') 00071 self.failIf('add_three_ints' in n.services) 00072 self.failIf('add_two_ints' in n.services) 00073 self.assertEquals([], n.services) 00074 self.assert_(n.is_empty()) 00075 00076 def test_NodeRef_subs(self): 00077 from rosmaster.registrations import NodeRef, Registrations 00078 n = NodeRef('n1', 'http://localhost:1234') 00079 # test topic suscriptions 00080 n.add(Registrations.TOPIC_SUBSCRIPTIONS, 'topic1') 00081 self.failIf(n.is_empty()) 00082 self.assert_('topic1' in n.topic_subscriptions) 00083 self.assertEquals(['topic1'], n.topic_subscriptions) 00084 00085 n.add(Registrations.TOPIC_SUBSCRIPTIONS, 'topic2') 00086 self.failIf(n.is_empty()) 00087 self.assert_('topic2' in n.topic_subscriptions) 00088 self.assert_('topic1' in n.topic_subscriptions) 00089 00090 n.remove(Registrations.TOPIC_SUBSCRIPTIONS, 'topic1') 00091 self.assert_('topic2' in n.topic_subscriptions) 00092 self.assertEquals(['topic2'], n.topic_subscriptions) 00093 self.failIf('topic1' in n.topic_subscriptions) 00094 self.failIf(n.is_empty()) 00095 00096 n.remove(Registrations.TOPIC_SUBSCRIPTIONS, 'topic2') 00097 self.failIf('topic2' in n.topic_subscriptions) 00098 self.failIf('topic1' in n.topic_subscriptions) 00099 self.assertEquals([], n.topic_subscriptions) 00100 self.assert_(n.is_empty()) 00101 00102 def test_NodeRef_pubs(self): 00103 from rosmaster.registrations import NodeRef, Registrations 00104 n = NodeRef('n1', 'http://localhost:1234') 00105 # test topic publications 00106 n.add(Registrations.TOPIC_PUBLICATIONS, 'topic1') 00107 self.failIf(n.is_empty()) 00108 self.assert_('topic1' in n.topic_publications) 00109 self.assertEquals(['topic1'], n.topic_publications) 00110 00111 n.add(Registrations.TOPIC_PUBLICATIONS, 'topic2') 00112 self.failIf(n.is_empty()) 00113 self.assert_('topic2' in n.topic_publications) 00114 self.assert_('topic1' in n.topic_publications) 00115 00116 n.remove(Registrations.TOPIC_PUBLICATIONS, 'topic1') 00117 self.assert_('topic2' in n.topic_publications) 00118 self.assertEquals(['topic2'], n.topic_publications) 00119 self.failIf('topic1' in n.topic_publications) 00120 self.failIf(n.is_empty()) 00121 00122 n.remove(Registrations.TOPIC_PUBLICATIONS, 'topic2') 00123 self.failIf('topic2' in n.topic_publications) 00124 self.failIf('topic1' in n.topic_publications) 00125 self.assertEquals([], n.topic_publications) 00126 self.assert_(n.is_empty()) 00127 00128 def test_NodeRef_base(self): 00129 import rosmaster.exceptions 00130 from rosmaster.registrations import NodeRef, Registrations 00131 n = NodeRef('n1', 'http://localhost:1234') 00132 self.assertEquals('http://localhost:1234', n.api) 00133 self.assertEquals([], n.param_subscriptions) 00134 self.assertEquals([], n.topic_subscriptions) 00135 self.assertEquals([], n.topic_publications) 00136 self.assertEquals([], n.services) 00137 self.assert_(n.is_empty()) 00138 00139 try: 00140 n.add(12345, 'topic') 00141 self.fail("should have failed with invalid type") 00142 except rosmaster.exceptions.InternalException: pass 00143 try: 00144 n.remove(12345, 'topic') 00145 self.fail("should have failed with invalid type") 00146 except rosmaster.exceptions.InternalException: pass 00147 00148 n.add(Registrations.TOPIC_PUBLICATIONS, 'topic1') 00149 n.add(Registrations.TOPIC_PUBLICATIONS, 'topic2') 00150 n.add(Registrations.TOPIC_SUBSCRIPTIONS, 'topic2') 00151 n.add(Registrations.TOPIC_SUBSCRIPTIONS, 'topic3') 00152 n.add(Registrations.PARAM_SUBSCRIPTIONS, 'topic4') 00153 n.add(Registrations.SERVICE, 'serv') 00154 self.failIf(n.is_empty()) 00155 00156 n.clear() 00157 self.assert_(n.is_empty()) 00158 00159 def test_NodeRef_param_subs(self): 00160 from rosmaster.registrations import NodeRef, Registrations 00161 n = NodeRef('n1', 'http://localhost:1234') 00162 # test param suscriptions 00163 n.add(Registrations.PARAM_SUBSCRIPTIONS, 'param1') 00164 self.failIf(n.is_empty()) 00165 self.assert_('param1' in n.param_subscriptions) 00166 self.assertEquals(['param1'], n.param_subscriptions) 00167 00168 n.add(Registrations.PARAM_SUBSCRIPTIONS, 'param2') 00169 self.failIf(n.is_empty()) 00170 self.assert_('param2' in n.param_subscriptions) 00171 self.assert_('param1' in n.param_subscriptions) 00172 00173 n.remove(Registrations.PARAM_SUBSCRIPTIONS, 'param1') 00174 self.assert_('param2' in n.param_subscriptions) 00175 self.assertEquals(['param2'], n.param_subscriptions) 00176 self.failIf('param1' in n.param_subscriptions) 00177 self.failIf(n.is_empty()) 00178 00179 n.remove(Registrations.PARAM_SUBSCRIPTIONS, 'param2') 00180 self.failIf('param2' in n.param_subscriptions) 00181 self.failIf('param1' in n.param_subscriptions) 00182 self.assertEquals([], n.param_subscriptions) 00183 self.assert_(n.is_empty()) 00184 00185 ## subroutine of registration tests that test topic/param type Reg objects 00186 ## @param r Registrations: initialized registrations object to test 00187 def _subtest_Registrations_basic(self, r): 00188 #NOTE: no real difference between topic and param names, so tests are reusable 00189 00190 # - note that we've updated node1's API 00191 r.register('topic1', 'node1', 'http://node1:5678') 00192 self.assert_('topic1' in r) # test contains 00193 self.assert_(r.has_key('topic1')) # test contains 00194 self.assertEquals(['topic1'], [k for k in r.iterkeys()]) 00195 self.assertEquals(['http://node1:5678'], r.get_apis('topic1')) 00196 self.assertEquals([('node1', 'http://node1:5678')], r['topic1']) 00197 self.failIf(not r) #test nonzero 00198 self.assertEquals(None, r.get_service_api('topic1')) #make sure no contamination 00199 self.assertEquals([['topic1', ['node1']]], r.get_state()) 00200 00201 r.register('topic1', 'node2', 'http://node2:5678') 00202 self.assertEquals(['topic1'], [k for k in r.iterkeys()]) 00203 self.assertEquals(['topic1'], [k for k in r.iterkeys()]) 00204 self.assertEquals(2, len(r.get_apis('topic1'))) 00205 self.assert_('http://node1:5678' in r.get_apis('topic1')) 00206 self.assert_('http://node2:5678' in r.get_apis('topic1')) 00207 self.assertEquals(2, len(r['topic1'])) 00208 self.assert_(('node1', 'http://node1:5678') in r['topic1'], r['topic1']) 00209 self.assert_(('node2', 'http://node2:5678') in r['topic1']) 00210 self.assertEquals([['topic1', ['node1', 'node2']]], r.get_state()) 00211 00212 # TODO: register second topic 00213 r.register('topic2', 'node3', 'http://node3:5678') 00214 self.assert_('topic2' in r) # test contains 00215 self.assert_(r.has_key('topic2')) # test contains 00216 self.assert_('topic1' in [k for k in r.iterkeys()]) 00217 self.assert_('topic2' in [k for k in r.iterkeys()]) 00218 self.assertEquals(['http://node3:5678'], r.get_apis('topic2')) 00219 self.assertEquals([('node3', 'http://node3:5678')], r['topic2']) 00220 self.failIf(not r) #test nonzero 00221 self.assert_(['topic1', ['node1', 'node2']] in r.get_state(), r.get_state()) 00222 self.assert_(['topic2', ['node3']] in r.get_state(), r.get_state()) 00223 00224 # Unregister 00225 00226 # - fail if node is not registered 00227 code, _, val = r.unregister('topic1', 'node3', 'http://node3:5678') 00228 self.assertEquals(0, val) 00229 # - fail if topic is not registered by that node 00230 code, _, val = r.unregister('topic2', 'node2', 'http://node2:5678') 00231 self.assertEquals(0, val) 00232 # - fail if URI does not match 00233 code, _, val = r.unregister('topic2', 'node2', 'http://fakenode2:5678') 00234 self.assertEquals(0, val) 00235 00236 # - unregister node2 00237 code, _, val = r.unregister('topic1', 'node1', 'http://node1:5678') 00238 self.assertEquals(1, code) 00239 self.assertEquals(1, val) 00240 self.assert_('topic1' in r) # test contains 00241 self.assert_(r.has_key('topic1')) 00242 self.assert_('topic1' in [k for k in r.iterkeys()]) 00243 self.assert_('topic2' in [k for k in r.iterkeys()]) 00244 self.assertEquals(['http://node2:5678'], r.get_apis('topic1')) 00245 self.assertEquals([('node2', 'http://node2:5678')], r['topic1']) 00246 self.failIf(not r) #test nonzero 00247 self.assert_(['topic1', ['node2']] in r.get_state()) 00248 self.assert_(['topic2', ['node3']] in r.get_state()) 00249 00250 code, _, val = r.unregister('topic1', 'node2', 'http://node2:5678') 00251 self.assertEquals(1, code) 00252 self.assertEquals(1, val) 00253 self.failIf('topic1' in r) # test contains 00254 self.failIf(r.has_key('topic1')) 00255 self.assertEquals(['topic2'], [k for k in r.iterkeys()]) 00256 self.assertEquals([], r.get_apis('topic1')) 00257 self.assertEquals([], r['topic1']) 00258 self.failIf(not r) #test nonzero 00259 self.assertEquals([['topic2', ['node3']]], r.get_state()) 00260 00261 # clear out last reg 00262 code, _, val = r.unregister('topic2', 'node3', 'http://node3:5678') 00263 self.assertEquals(1, code) 00264 self.assertEquals(1, val) 00265 self.failIf('topic2' in r) # test contains 00266 self.assert_(not r) 00267 self.assertEquals([], r.get_state()) 00268 00269 def test_Registrations(self): 00270 import rosmaster.exceptions 00271 from rosmaster.registrations import Registrations 00272 types = [Registrations.TOPIC_SUBSCRIPTIONS, 00273 Registrations.TOPIC_PUBLICATIONS, 00274 Registrations.SERVICE, 00275 Registrations.PARAM_SUBSCRIPTIONS] 00276 # test enums 00277 self.assertEquals(4, len(set(types))) 00278 try: 00279 r = Registrations(-1) 00280 self.fail("Registrations accepted invalid type") 00281 except rosmaster.exceptions.InternalException, e: pass 00282 00283 for t in types: 00284 r = Registrations(t) 00285 self.assertEquals(t, r.type) 00286 self.assert_(not r) #test nonzero 00287 self.failIf('topic1' in r) #test contains 00288 self.failIf(r.has_key('topic1')) #test has_key 00289 self.failIf([k for k in r.iterkeys()]) #no keys 00290 self.assertEquals(None, r.get_service_api('non-existent')) 00291 00292 # Test topic subs 00293 r = Registrations(Registrations.TOPIC_SUBSCRIPTIONS) 00294 self._subtest_Registrations_basic(r) 00295 r = Registrations(Registrations.TOPIC_PUBLICATIONS) 00296 self._subtest_Registrations_basic(r) 00297 r = Registrations(Registrations.PARAM_SUBSCRIPTIONS) 00298 self._subtest_Registrations_basic(r) 00299 00300 r = Registrations(Registrations.SERVICE) 00301 self._subtest_Registrations_services(r) 00302 00303 def test_RegistrationManager_services(self): 00304 from rosmaster.registrations import Registrations, RegistrationManager 00305 rm = RegistrationManager(ThreadPoolMock()) 00306 00307 self.assertEquals(None, rm.get_node('caller1')) 00308 00309 # do an unregister first, before service_api is initialized 00310 code, msg, val = rm.unregister_service('s1', 'caller1', 'rosrpc://one:1234') 00311 self.assertEquals(1, code) 00312 self.assertEquals(0, val) 00313 00314 rm.register_service('s1', 'caller1', 'http://one:1234', 'rosrpc://one:1234') 00315 self.assert_(rm.services.has_key('s1')) 00316 self.assertEquals('rosrpc://one:1234', rm.services.get_service_api('s1')) 00317 self.assertEquals('http://one:1234', rm.get_node('caller1').api) 00318 self.assertEquals([['s1', ['caller1']]], rm.services.get_state()) 00319 00320 # - verify that changed caller_api updates ref 00321 rm.register_service('s1', 'caller1', 'http://oneB:1234', 'rosrpc://one:1234') 00322 self.assert_(rm.services.has_key('s1')) 00323 self.assertEquals('rosrpc://one:1234', rm.services.get_service_api('s1')) 00324 self.assertEquals('http://oneB:1234', rm.get_node('caller1').api) 00325 self.assertEquals([['s1', ['caller1']]], rm.services.get_state()) 00326 00327 # - verify that changed service_api updates ref 00328 rm.register_service('s1', 'caller1', 'http://oneB:1234', 'rosrpc://oneB:1234') 00329 self.assert_(rm.services.has_key('s1')) 00330 self.assertEquals('rosrpc://oneB:1234', rm.services.get_service_api('s1')) 00331 self.assertEquals('http://oneB:1234', rm.get_node('caller1').api) 00332 self.assertEquals([['s1', ['caller1']]], rm.services.get_state()) 00333 00334 rm.register_service('s2', 'caller2', 'http://two:1234', 'rosrpc://two:1234') 00335 self.assertEquals('http://two:1234', rm.get_node('caller2').api) 00336 00337 # - unregister should be noop if service api does not match 00338 code, msg, val = rm.unregister_service('s2', 'caller2', 'rosrpc://b:1234') 00339 self.assertEquals(1, code) 00340 self.assertEquals(0, val) 00341 self.assert_(rm.services.has_key('s2')) 00342 self.assertEquals('http://two:1234', rm.get_node('caller2').api) 00343 self.assertEquals('rosrpc://two:1234', rm.services.get_service_api('s2')) 00344 00345 # - unregister should be noop if service is unknown 00346 code, msg, val = rm.unregister_service('unknown', 'caller2', 'rosrpc://two:1234') 00347 self.assertEquals(1, code) 00348 self.assertEquals(0, val) 00349 self.assert_(rm.services.has_key('s2')) 00350 self.assertEquals('http://two:1234', rm.get_node('caller2').api) 00351 self.assertEquals('rosrpc://two:1234', rm.services.get_service_api('s2')) 00352 00353 # - unregister should clear all knowledge of caller2 00354 code,msg, val = rm.unregister_service('s2', 'caller2', 'rosrpc://two:1234') 00355 self.assertEquals(1, code) 00356 self.assertEquals(1, val) 00357 self.assert_(rm.services.has_key('s1')) 00358 self.failIf(rm.services.has_key('s2')) 00359 self.assertEquals(None, rm.get_node('caller2')) 00360 00361 code, msg, val = rm.unregister_service('s1', 'caller1', 'rosrpc://oneB:1234') 00362 self.assertEquals(1, code) 00363 self.assertEquals(1, val) 00364 self.assert_(not rm.services.__nonzero__()) 00365 self.failIf(rm.services.has_key('s1')) 00366 self.assertEquals(None, rm.get_node('caller1')) 00367 00368 def test_RegistrationManager_topic_pub(self): 00369 from rosmaster.registrations import Registrations, RegistrationManager 00370 rm = RegistrationManager(ThreadPoolMock()) 00371 self.subtest_RegistrationManager(rm, rm.publishers, rm.register_publisher, rm.unregister_publisher) 00372 00373 def test_RegistrationManager_topic_sub(self): 00374 from rosmaster.registrations import Registrations, RegistrationManager 00375 rm = RegistrationManager(ThreadPoolMock()) 00376 self.subtest_RegistrationManager(rm, rm.subscribers, rm.register_subscriber, rm.unregister_subscriber) 00377 def test_RegistrationManager_param_sub(self): 00378 from rosmaster.registrations import Registrations, RegistrationManager 00379 rm = RegistrationManager(ThreadPoolMock()) 00380 self.subtest_RegistrationManager(rm, rm.param_subscribers, rm.register_param_subscriber, rm.unregister_param_subscriber) 00381 00382 def subtest_RegistrationManager(self, rm, r, register, unregister): 00383 self.assertEquals(None, rm.get_node('caller1')) 00384 00385 register('key1', 'caller1', 'http://one:1234') 00386 self.assert_(r.has_key('key1')) 00387 self.assertEquals('http://one:1234', rm.get_node('caller1').api) 00388 self.assertEquals([['key1', ['caller1']]], r.get_state()) 00389 00390 # - verify that changed caller_api updates ref 00391 register('key1', 'caller1', 'http://oneB:1234') 00392 self.assert_(r.has_key('key1')) 00393 self.assertEquals('http://oneB:1234', rm.get_node('caller1').api) 00394 self.assertEquals([['key1', ['caller1']]], r.get_state()) 00395 00396 register('key2', 'caller2', 'http://two:1234') 00397 self.assertEquals('http://two:1234', rm.get_node('caller2').api) 00398 00399 # - unregister should be noop if caller api does not match 00400 code, msg, val = unregister('key2', 'caller2', 'http://b:1234') 00401 self.assertEquals(1, code) 00402 self.assertEquals(0, val) 00403 self.assertEquals('http://two:1234', rm.get_node('caller2').api) 00404 00405 # - unregister should be noop if key is unknown 00406 code, msg, val = unregister('unknown', 'caller2', 'http://two:1234') 00407 self.assertEquals(1, code) 00408 self.assertEquals(0, val) 00409 self.assert_(r.has_key('key2')) 00410 self.assertEquals('http://two:1234', rm.get_node('caller2').api) 00411 00412 # - unregister should be noop if unknown node 00413 code, msg, val = rm.unregister_publisher('key2', 'unknown', 'http://unknown:1') 00414 self.assertEquals(1, code) 00415 self.assertEquals(0, val) 00416 self.assert_(r.has_key('key2')) 00417 00418 # - unregister should clear all knowledge of caller2 00419 code,msg, val = unregister('key2', 'caller2', 'http://two:1234') 00420 self.assertEquals(1, code) 00421 self.assertEquals(1, val) 00422 self.assert_(r.has_key('key1')) 00423 self.failIf(r.has_key('key2')) 00424 self.assertEquals(None, rm.get_node('caller2')) 00425 00426 code, msg, val = unregister('key1', 'caller1', 'http://oneB:1234') 00427 self.assertEquals(1, code) 00428 self.assertEquals(1, val) 00429 self.assert_(not r.__nonzero__()) 00430 self.failIf(r.has_key('key1')) 00431 self.assertEquals(None, rm.get_node('caller1')) 00432 00433 def test_RegistrationManager_base(self): 00434 import rosmaster.exceptions 00435 from rosmaster.registrations import Registrations, RegistrationManager 00436 threadpool = ThreadPoolMock() 00437 00438 rm = RegistrationManager(threadpool) 00439 self.assert_(isinstance(rm.services, Registrations)) 00440 self.assertEquals(Registrations.SERVICE, rm.services.type) 00441 self.assert_(isinstance(rm.param_subscribers, Registrations)) 00442 self.assertEquals(Registrations.PARAM_SUBSCRIPTIONS, rm.param_subscribers.type) 00443 self.assert_(isinstance(rm.subscribers, Registrations)) 00444 self.assertEquals(Registrations.TOPIC_SUBSCRIPTIONS, rm.subscribers.type) 00445 self.assert_(isinstance(rm.subscribers, Registrations)) 00446 self.assertEquals(Registrations.TOPIC_PUBLICATIONS, rm.publishers.type) 00447 self.assert_(isinstance(rm.publishers, Registrations)) 00448 00449 #test auto-clearing of registrations if node API changes 00450 rm.register_publisher('pub1', 'caller1', 'http://one:1') 00451 rm.register_publisher('pub1', 'caller2', 'http://two:1') 00452 rm.register_publisher('pub1', 'caller3', 'http://three:1') 00453 rm.register_subscriber('sub1', 'caller1', 'http://one:1') 00454 rm.register_subscriber('sub1', 'caller2', 'http://two:1') 00455 rm.register_subscriber('sub1', 'caller3', 'http://three:1') 00456 rm.register_param_subscriber('p1', 'caller1', 'http://one:1') 00457 rm.register_param_subscriber('p1', 'caller2', 'http://two:1') 00458 rm.register_param_subscriber('p1', 'caller3', 'http://three:1') 00459 rm.register_service('s1', 'caller1', 'http://one:1', 'rosrpc://one:1') 00460 self.assertEquals('http://one:1', rm.get_node('caller1').api) 00461 self.assertEquals('http://two:1', rm.get_node('caller2').api) 00462 self.assertEquals('http://three:1', rm.get_node('caller3').api) 00463 00464 # - first, make sure that changing rosrpc URI does not erase state 00465 rm.register_service('s1', 'caller1', 'http://one:1', 'rosrpc://oneB:1') 00466 n = rm.get_node('caller1') 00467 self.assertEquals(['pub1'], n.topic_publications) 00468 self.assertEquals(['sub1'], n.topic_subscriptions) 00469 self.assertEquals(['p1'], n.param_subscriptions) 00470 self.assertEquals(['s1'], n.services) 00471 self.assert_('http://one:1' in rm.publishers.get_apis('pub1')) 00472 self.assert_('http://one:1' in rm.subscribers.get_apis('sub1')) 00473 self.assert_('http://one:1' in rm.param_subscribers.get_apis('p1')) 00474 self.assert_('http://one:1' in rm.services.get_apis('s1')) 00475 00476 # - also, make sure unregister does not erase state if API changed 00477 rm.unregister_publisher('pub1', 'caller1', 'http://not:1') 00478 self.assert_('http://one:1' in rm.publishers.get_apis('pub1')) 00479 rm.unregister_subscriber('sub1', 'caller1', 'http://not:1') 00480 self.assert_('http://one:1' in rm.subscribers.get_apis('sub1')) 00481 rm.unregister_param_subscriber('p1', 'caller1', 'http://not:1') 00482 self.assert_('http://one:1' in rm.param_subscribers.get_apis('p1')) 00483 rm.unregister_service('sub1', 'caller1', 'rosrpc://not:1') 00484 self.assert_('http://one:1' in rm.services.get_apis('s1')) 00485 00486 00487 # erase caller1 sub/srvs/params via register_publisher 00488 rm.register_publisher('pub1', 'caller1', 'http://newone:1') 00489 self.assertEquals('http://newone:1', rm.get_node('caller1').api) 00490 # - check node ref 00491 n = rm.get_node('caller1') 00492 self.assertEquals(['pub1'], n.topic_publications) 00493 self.assertEquals([], n.services) 00494 self.assertEquals([], n.topic_subscriptions) 00495 self.assertEquals([], n.param_subscriptions) 00496 # - checks publishers 00497 self.assert_('http://newone:1' in rm.publishers.get_apis('pub1')) 00498 # - checks subscribers 00499 self.assert_(rm.subscribers.has_key('sub1')) 00500 self.failIf('http://one:1' in rm.subscribers.get_apis('sub1')) 00501 # - checks param subscribers 00502 self.assert_(rm.param_subscribers.has_key('p1')) 00503 self.failIf('http://one:1' in rm.param_subscribers.get_apis('p1')) 00504 00505 # erase caller2 pub/sub/params via register_service 00506 # - initial state 00507 self.assert_('http://two:1' in rm.publishers.get_apis('pub1')) 00508 self.assert_('http://two:1' in rm.subscribers.get_apis('sub1')) 00509 self.assert_('http://two:1' in rm.param_subscribers.get_apis('p1')) 00510 # - change ownership of s1 to caller2 00511 rm.register_service('s1', 'caller2', 'http://two:1', 'rosrpc://two:1') 00512 self.assert_('http://two:1' in rm.services.get_apis('s1')) 00513 self.assert_('http://two:1' in rm.publishers.get_apis('pub1')) 00514 self.assert_('http://two:1' in rm.subscribers.get_apis('sub1')) 00515 self.assert_('http://two:1' in rm.param_subscribers.get_apis('p1')) 00516 00517 rm.register_service('s1', 'caller2', 'http://newtwo:1', 'rosrpc://newtwo:1') 00518 self.assertEquals('http://newone:1', rm.get_node('caller1').api) 00519 # - check node ref 00520 n = rm.get_node('caller2') 00521 self.assertEquals([], n.topic_publications) 00522 self.assertEquals(['s1'], n.services) 00523 self.assertEquals([], n.topic_subscriptions) 00524 self.assertEquals([], n.param_subscriptions) 00525 # - checks publishers 00526 self.assert_(rm.publishers.has_key('pub1')) 00527 self.failIf('http://two:1' in rm.publishers.get_apis('pub1')) 00528 # - checks subscribers 00529 self.assert_(rm.subscribers.has_key('sub1')) 00530 self.failIf('http://two:1' in rm.subscribers.get_apis('sub1')) 00531 self.assertEquals([['sub1', ['caller3']]], rm.subscribers.get_state()) 00532 # - checks param subscribers 00533 self.assert_(rm.param_subscribers.has_key('p1')) 00534 self.failIf('http://two:1' in rm.param_subscribers.get_apis('p1')) 00535 self.assertEquals([['p1', ['caller3']]], rm.param_subscribers.get_state()) 00536 00537 00538 def test_Registrations_unregister_all(self): 00539 import rosmaster.exceptions 00540 from rosmaster.registrations import Registrations 00541 00542 r = Registrations(Registrations.TOPIC_SUBSCRIPTIONS) 00543 for k in ['topic1', 'topic1b', 'topic1c', 'topic1d']: 00544 r.register(k, 'node1', 'http://node1:5678') 00545 r.register('topic2', 'node2', 'http://node2:5678') 00546 r.unregister_all('node1') 00547 self.failIf(not r) 00548 for k in ['topic1', 'topic1b', 'topic1c', 'topic1d']: 00549 self.failIf(r.has_key(k)) 00550 self.assertEquals(['topic2'], [k for k in r.iterkeys()]) 00551 00552 r = Registrations(Registrations.TOPIC_PUBLICATIONS) 00553 for k in ['topic1', 'topic1b', 'topic1c', 'topic1d']: 00554 r.register(k, 'node1', 'http://node1:5678') 00555 r.register('topic2', 'node2', 'http://node2:5678') 00556 r.unregister_all('node1') 00557 self.failIf(not r) 00558 for k in ['topic1', 'topic1b', 'topic1c', 'topic1d']: 00559 self.failIf(r.has_key(k)) 00560 self.assertEquals(['topic2'], [k for k in r.iterkeys()]) 00561 00562 r = Registrations(Registrations.PARAM_SUBSCRIPTIONS) 00563 r.register('param2', 'node2', 'http://node2:5678') 00564 for k in ['param1', 'param1b', 'param1c', 'param1d']: 00565 r.register(k, 'node1', 'http://node1:5678') 00566 r.unregister_all('node1') 00567 self.failIf(not r) 00568 for k in ['param1', 'param1b', 'param1c', 'param1d']: 00569 self.failIf(r.has_key(k)) 00570 self.assertEquals(['param2'], [k for k in r.iterkeys()]) 00571 00572 r = Registrations(Registrations.SERVICE) 00573 for k in ['service1', 'service1b', 'service1c', 'service1d']: 00574 r.register(k, 'node1', 'http://node1:5678', 'rosrpc://node1:1234') 00575 r.register('service2', 'node2', 'http://node2:5678', 'rosrpc://node2:1234') 00576 r.unregister_all('node1') 00577 self.failIf(not r) 00578 for k in ['service1', 'service1b', 'service1c', 'service1d']: 00579 self.failIf(r.has_key(k)) 00580 self.assertEquals(None, r.get_service_api(k)) 00581 self.assertEquals(['service2'], [k for k in r.iterkeys()]) 00582 self.assertEquals('rosrpc://node2:1234', r.get_service_api('service2')) 00583 00584 def _subtest_Registrations_services(self, r): 00585 import rosmaster.exceptions 00586 00587 # call methods that use service_api_map, make sure they are guarded against lazy-init 00588 self.assertEquals(None, r.get_service_api('s1')) 00589 r.unregister_all('node1') 00590 00591 # do an unregister first, before service_api is initialized 00592 code, msg, val = r.unregister('s1', 'caller1', None, 'rosrpc://one:1234') 00593 self.assertEquals(1, code) 00594 self.assertEquals(0, val) 00595 00596 try: 00597 r.register('service1', 'node1', 'http://node1:5678') 00598 self.fail("should require service_api") 00599 except rosmaster.exceptions.InternalException: pass 00600 00601 r.register('service1', 'node1', 'http://node1:5678', 'rosrpc://node1:1234') 00602 00603 self.assert_('service1' in r) # test contains 00604 self.assert_(r.has_key('service1')) # test contains 00605 self.assertEquals(['service1'], [k for k in r.iterkeys()]) 00606 self.assertEquals(['http://node1:5678'], r.get_apis('service1')) 00607 self.assertEquals('rosrpc://node1:1234', r.get_service_api('service1')) 00608 self.assertEquals([('node1', 'http://node1:5678')], r['service1']) 00609 self.failIf(not r) #test nonzero 00610 self.assertEquals([['service1', ['node1']]], r.get_state()) 00611 00612 r.register('service1', 'node2', 'http://node2:5678', 'rosrpc://node2:1234') 00613 self.assertEquals(['service1'], [k for k in r.iterkeys()]) 00614 self.assertEquals('rosrpc://node2:1234', r.get_service_api('service1')) 00615 self.assertEquals(['http://node2:5678'], r.get_apis('service1')) 00616 self.assertEquals([('node2', 'http://node2:5678')], r['service1']) 00617 self.assertEquals([['service1', ['node2']]], r.get_state()) 00618 00619 # register a second service 00620 r.register('service2', 'node3', 'http://node3:5678', 'rosrpc://node3:1234') 00621 self.assertEquals('rosrpc://node3:1234', r.get_service_api('service2')) 00622 self.assertEquals(2, len(r.get_state())) 00623 self.assert_(['service2', ['node3']] in r.get_state(), r.get_state()) 00624 self.assert_(['service1', ['node2']] in r.get_state()) 00625 00626 # register a third service, second service for node2 00627 r.register('service1b', 'node2', 'http://node2:5678', 'rosrpc://node2:1234') 00628 self.assertEquals(3, len(r.get_state())) 00629 self.assert_(['service2', ['node3']] in r.get_state()) 00630 self.assert_(['service1b', ['node2']] in r.get_state()) 00631 self.assert_(['service1', ['node2']] in r.get_state()) 00632 00633 # Unregister 00634 try: 00635 r.unregister('service1', 'node2', 'http://node2:1234') 00636 self.fail("service_api param must be specified") 00637 except rosmaster.exceptions.InternalException: pass 00638 00639 # - fail if service is not known 00640 code, _, val = r.unregister('unknown', 'node2', 'http://node2:5678', 'rosprc://node2:1234') 00641 self.assertEquals(0, val) 00642 # - fail if node is not registered 00643 code, _, val = r.unregister('service1', 'node3', 'http://node3:5678', 'rosrpc://node3:1234') 00644 self.assertEquals(0, val) 00645 # - fail if service API is different 00646 code, _, val = r.unregister('service1', 'node2', 'http://node2b:5678', 'rosrpc://node3:1234') 00647 self.assertEquals(0, val) 00648 00649 # - unregister service2 00650 code, _, val = r.unregister('service2', 'node3', 'http://node3:5678', 'rosrpc://node3:1234') 00651 self.assertEquals(1, code) 00652 self.assertEquals(1, val) 00653 self.failIf('service2' in r) # test contains 00654 self.failIf(r.has_key('service2')) 00655 self.assert_('service1' in [k for k in r.iterkeys()]) 00656 self.assert_('service1b' in [k for k in r.iterkeys()]) 00657 self.assertEquals([], r.get_apis('service2')) 00658 self.assertEquals([], r['service2']) 00659 self.failIf(not r) #test nonzero 00660 self.assertEquals(2, len(r.get_state())) 00661 self.failIf(['service2', ['node3']] in r.get_state()) 00662 00663 # - unregister node2 00664 code, _, val = r.unregister('service1', 'node2', 'http://node2:5678', 'rosrpc://node2:1234') 00665 self.assertEquals(1, code) 00666 self.assertEquals(1, val) 00667 self.failIf('service1' in r) # test contains 00668 self.failIf(r.has_key('service1')) 00669 self.assertEquals(['service1b'], [k for k in r.iterkeys()]) 00670 self.assertEquals([], r.get_apis('service1')) 00671 self.assertEquals([], r['service1']) 00672 self.failIf(not r) #test nonzero 00673 self.assertEquals([['service1b', ['node2']]], r.get_state()) 00674 00675 code, _, val = r.unregister('service1b', 'node2', 'http://node2:5678', 'rosrpc://node2:1234') 00676 self.assertEquals(1, code) 00677 self.assertEquals(1, val) 00678 self.failIf('service1' in r) # test contains 00679 self.failIf(r.has_key('service1')) 00680 self.assertEquals([], [k for k in r.iterkeys()]) 00681 self.assertEquals([], r.get_apis('service1')) 00682 self.assertEquals([], r['service1']) 00683 self.assert_(not r) #test nonzero 00684 self.assertEquals([], r.get_state()) 00685 00686 if __name__ == '__main__': 00687 import rostest 00688 rostest.unitrun('test_rosmaster', sys.argv[0], TestRosmasterRegistrations, coverage_packages=['rosmaster.registrations'])