00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 import roslib; roslib.load_manifest('test_rosmaster')
00037
00038 import os
00039 import sys
00040 import struct
00041 import unittest
00042 import time
00043
00044
00045 class ThreadPoolMock(object):
00046 def queue_task(*args): pass
00047
00048 class TestRosmasterRegistrations(unittest.TestCase):
00049
00050 def test_NodeRef_services(self):
00051 from rosmaster.registrations import NodeRef, Registrations
00052 n = NodeRef('n1', 'http://localhost:1234')
00053
00054 n.add(Registrations.SERVICE, 'add_two_ints')
00055 self.failIf(n.is_empty())
00056 self.assert_('add_two_ints' in n.services)
00057 self.assertEquals(['add_two_ints'], n.services)
00058
00059 n.add(Registrations.SERVICE, 'add_three_ints')
00060 self.failIf(n.is_empty())
00061 self.assert_('add_three_ints' in n.services)
00062 self.assert_('add_two_ints' in n.services)
00063
00064 n.remove(Registrations.SERVICE, 'add_two_ints')
00065 self.assert_('add_three_ints' in n.services)
00066 self.assertEquals(['add_three_ints'], n.services)
00067 self.failIf('add_two_ints' in n.services)
00068 self.failIf(n.is_empty())
00069
00070 n.remove(Registrations.SERVICE, 'add_three_ints')
00071 self.failIf('add_three_ints' in n.services)
00072 self.failIf('add_two_ints' in n.services)
00073 self.assertEquals([], n.services)
00074 self.assert_(n.is_empty())
00075
00076 def test_NodeRef_subs(self):
00077 from rosmaster.registrations import NodeRef, Registrations
00078 n = NodeRef('n1', 'http://localhost:1234')
00079
00080 n.add(Registrations.TOPIC_SUBSCRIPTIONS, 'topic1')
00081 self.failIf(n.is_empty())
00082 self.assert_('topic1' in n.topic_subscriptions)
00083 self.assertEquals(['topic1'], n.topic_subscriptions)
00084
00085 n.add(Registrations.TOPIC_SUBSCRIPTIONS, 'topic2')
00086 self.failIf(n.is_empty())
00087 self.assert_('topic2' in n.topic_subscriptions)
00088 self.assert_('topic1' in n.topic_subscriptions)
00089
00090 n.remove(Registrations.TOPIC_SUBSCRIPTIONS, 'topic1')
00091 self.assert_('topic2' in n.topic_subscriptions)
00092 self.assertEquals(['topic2'], n.topic_subscriptions)
00093 self.failIf('topic1' in n.topic_subscriptions)
00094 self.failIf(n.is_empty())
00095
00096 n.remove(Registrations.TOPIC_SUBSCRIPTIONS, 'topic2')
00097 self.failIf('topic2' in n.topic_subscriptions)
00098 self.failIf('topic1' in n.topic_subscriptions)
00099 self.assertEquals([], n.topic_subscriptions)
00100 self.assert_(n.is_empty())
00101
00102 def test_NodeRef_pubs(self):
00103 from rosmaster.registrations import NodeRef, Registrations
00104 n = NodeRef('n1', 'http://localhost:1234')
00105
00106 n.add(Registrations.TOPIC_PUBLICATIONS, 'topic1')
00107 self.failIf(n.is_empty())
00108 self.assert_('topic1' in n.topic_publications)
00109 self.assertEquals(['topic1'], n.topic_publications)
00110
00111 n.add(Registrations.TOPIC_PUBLICATIONS, 'topic2')
00112 self.failIf(n.is_empty())
00113 self.assert_('topic2' in n.topic_publications)
00114 self.assert_('topic1' in n.topic_publications)
00115
00116 n.remove(Registrations.TOPIC_PUBLICATIONS, 'topic1')
00117 self.assert_('topic2' in n.topic_publications)
00118 self.assertEquals(['topic2'], n.topic_publications)
00119 self.failIf('topic1' in n.topic_publications)
00120 self.failIf(n.is_empty())
00121
00122 n.remove(Registrations.TOPIC_PUBLICATIONS, 'topic2')
00123 self.failIf('topic2' in n.topic_publications)
00124 self.failIf('topic1' in n.topic_publications)
00125 self.assertEquals([], n.topic_publications)
00126 self.assert_(n.is_empty())
00127
00128 def test_NodeRef_base(self):
00129 import rosmaster.exceptions
00130 from rosmaster.registrations import NodeRef, Registrations
00131 n = NodeRef('n1', 'http://localhost:1234')
00132 self.assertEquals('http://localhost:1234', n.api)
00133 self.assertEquals([], n.param_subscriptions)
00134 self.assertEquals([], n.topic_subscriptions)
00135 self.assertEquals([], n.topic_publications)
00136 self.assertEquals([], n.services)
00137 self.assert_(n.is_empty())
00138
00139 try:
00140 n.add(12345, 'topic')
00141 self.fail("should have failed with invalid type")
00142 except rosmaster.exceptions.InternalException: pass
00143 try:
00144 n.remove(12345, 'topic')
00145 self.fail("should have failed with invalid type")
00146 except rosmaster.exceptions.InternalException: pass
00147
00148 n.add(Registrations.TOPIC_PUBLICATIONS, 'topic1')
00149 n.add(Registrations.TOPIC_PUBLICATIONS, 'topic2')
00150 n.add(Registrations.TOPIC_SUBSCRIPTIONS, 'topic2')
00151 n.add(Registrations.TOPIC_SUBSCRIPTIONS, 'topic3')
00152 n.add(Registrations.PARAM_SUBSCRIPTIONS, 'topic4')
00153 n.add(Registrations.SERVICE, 'serv')
00154 self.failIf(n.is_empty())
00155
00156 n.clear()
00157 self.assert_(n.is_empty())
00158
00159 def test_NodeRef_param_subs(self):
00160 from rosmaster.registrations import NodeRef, Registrations
00161 n = NodeRef('n1', 'http://localhost:1234')
00162
00163 n.add(Registrations.PARAM_SUBSCRIPTIONS, 'param1')
00164 self.failIf(n.is_empty())
00165 self.assert_('param1' in n.param_subscriptions)
00166 self.assertEquals(['param1'], n.param_subscriptions)
00167
00168 n.add(Registrations.PARAM_SUBSCRIPTIONS, 'param2')
00169 self.failIf(n.is_empty())
00170 self.assert_('param2' in n.param_subscriptions)
00171 self.assert_('param1' in n.param_subscriptions)
00172
00173 n.remove(Registrations.PARAM_SUBSCRIPTIONS, 'param1')
00174 self.assert_('param2' in n.param_subscriptions)
00175 self.assertEquals(['param2'], n.param_subscriptions)
00176 self.failIf('param1' in n.param_subscriptions)
00177 self.failIf(n.is_empty())
00178
00179 n.remove(Registrations.PARAM_SUBSCRIPTIONS, 'param2')
00180 self.failIf('param2' in n.param_subscriptions)
00181 self.failIf('param1' in n.param_subscriptions)
00182 self.assertEquals([], n.param_subscriptions)
00183 self.assert_(n.is_empty())
00184
00185
00186
00187 def _subtest_Registrations_basic(self, r):
00188
00189
00190
00191 r.register('topic1', 'node1', 'http://node1:5678')
00192 self.assert_('topic1' in r)
00193 self.assert_(r.has_key('topic1'))
00194 self.assertEquals(['topic1'], [k for k in r.iterkeys()])
00195 self.assertEquals(['http://node1:5678'], r.get_apis('topic1'))
00196 self.assertEquals([('node1', 'http://node1:5678')], r['topic1'])
00197 self.failIf(not r)
00198 self.assertEquals(None, r.get_service_api('topic1'))
00199 self.assertEquals([['topic1', ['node1']]], r.get_state())
00200
00201 r.register('topic1', 'node2', 'http://node2:5678')
00202 self.assertEquals(['topic1'], [k for k in r.iterkeys()])
00203 self.assertEquals(['topic1'], [k for k in r.iterkeys()])
00204 self.assertEquals(2, len(r.get_apis('topic1')))
00205 self.assert_('http://node1:5678' in r.get_apis('topic1'))
00206 self.assert_('http://node2:5678' in r.get_apis('topic1'))
00207 self.assertEquals(2, len(r['topic1']))
00208 self.assert_(('node1', 'http://node1:5678') in r['topic1'], r['topic1'])
00209 self.assert_(('node2', 'http://node2:5678') in r['topic1'])
00210 self.assertEquals([['topic1', ['node1', 'node2']]], r.get_state())
00211
00212
00213 r.register('topic2', 'node3', 'http://node3:5678')
00214 self.assert_('topic2' in r)
00215 self.assert_(r.has_key('topic2'))
00216 self.assert_('topic1' in [k for k in r.iterkeys()])
00217 self.assert_('topic2' in [k for k in r.iterkeys()])
00218 self.assertEquals(['http://node3:5678'], r.get_apis('topic2'))
00219 self.assertEquals([('node3', 'http://node3:5678')], r['topic2'])
00220 self.failIf(not r)
00221 self.assert_(['topic1', ['node1', 'node2']] in r.get_state(), r.get_state())
00222 self.assert_(['topic2', ['node3']] in r.get_state(), r.get_state())
00223
00224
00225
00226
00227 code, _, val = r.unregister('topic1', 'node3', 'http://node3:5678')
00228 self.assertEquals(0, val)
00229
00230 code, _, val = r.unregister('topic2', 'node2', 'http://node2:5678')
00231 self.assertEquals(0, val)
00232
00233 code, _, val = r.unregister('topic2', 'node2', 'http://fakenode2:5678')
00234 self.assertEquals(0, val)
00235
00236
00237 code, _, val = r.unregister('topic1', 'node1', 'http://node1:5678')
00238 self.assertEquals(1, code)
00239 self.assertEquals(1, val)
00240 self.assert_('topic1' in r)
00241 self.assert_(r.has_key('topic1'))
00242 self.assert_('topic1' in [k for k in r.iterkeys()])
00243 self.assert_('topic2' in [k for k in r.iterkeys()])
00244 self.assertEquals(['http://node2:5678'], r.get_apis('topic1'))
00245 self.assertEquals([('node2', 'http://node2:5678')], r['topic1'])
00246 self.failIf(not r)
00247 self.assert_(['topic1', ['node2']] in r.get_state())
00248 self.assert_(['topic2', ['node3']] in r.get_state())
00249
00250 code, _, val = r.unregister('topic1', 'node2', 'http://node2:5678')
00251 self.assertEquals(1, code)
00252 self.assertEquals(1, val)
00253 self.failIf('topic1' in r)
00254 self.failIf(r.has_key('topic1'))
00255 self.assertEquals(['topic2'], [k for k in r.iterkeys()])
00256 self.assertEquals([], r.get_apis('topic1'))
00257 self.assertEquals([], r['topic1'])
00258 self.failIf(not r)
00259 self.assertEquals([['topic2', ['node3']]], r.get_state())
00260
00261
00262 code, _, val = r.unregister('topic2', 'node3', 'http://node3:5678')
00263 self.assertEquals(1, code)
00264 self.assertEquals(1, val)
00265 self.failIf('topic2' in r)
00266 self.assert_(not r)
00267 self.assertEquals([], r.get_state())
00268
00269 def test_Registrations(self):
00270 import rosmaster.exceptions
00271 from rosmaster.registrations import Registrations
00272 types = [Registrations.TOPIC_SUBSCRIPTIONS,
00273 Registrations.TOPIC_PUBLICATIONS,
00274 Registrations.SERVICE,
00275 Registrations.PARAM_SUBSCRIPTIONS]
00276
00277 self.assertEquals(4, len(set(types)))
00278 try:
00279 r = Registrations(-1)
00280 self.fail("Registrations accepted invalid type")
00281 except rosmaster.exceptions.InternalException, e: pass
00282
00283 for t in types:
00284 r = Registrations(t)
00285 self.assertEquals(t, r.type)
00286 self.assert_(not r)
00287 self.failIf('topic1' in r)
00288 self.failIf(r.has_key('topic1'))
00289 self.failIf([k for k in r.iterkeys()])
00290 self.assertEquals(None, r.get_service_api('non-existent'))
00291
00292
00293 r = Registrations(Registrations.TOPIC_SUBSCRIPTIONS)
00294 self._subtest_Registrations_basic(r)
00295 r = Registrations(Registrations.TOPIC_PUBLICATIONS)
00296 self._subtest_Registrations_basic(r)
00297 r = Registrations(Registrations.PARAM_SUBSCRIPTIONS)
00298 self._subtest_Registrations_basic(r)
00299
00300 r = Registrations(Registrations.SERVICE)
00301 self._subtest_Registrations_services(r)
00302
00303 def test_RegistrationManager_services(self):
00304 from rosmaster.registrations import Registrations, RegistrationManager
00305 rm = RegistrationManager(ThreadPoolMock())
00306
00307 self.assertEquals(None, rm.get_node('caller1'))
00308
00309
00310 code, msg, val = rm.unregister_service('s1', 'caller1', 'rosrpc://one:1234')
00311 self.assertEquals(1, code)
00312 self.assertEquals(0, val)
00313
00314 rm.register_service('s1', 'caller1', 'http://one:1234', 'rosrpc://one:1234')
00315 self.assert_(rm.services.has_key('s1'))
00316 self.assertEquals('rosrpc://one:1234', rm.services.get_service_api('s1'))
00317 self.assertEquals('http://one:1234', rm.get_node('caller1').api)
00318 self.assertEquals([['s1', ['caller1']]], rm.services.get_state())
00319
00320
00321 rm.register_service('s1', 'caller1', 'http://oneB:1234', 'rosrpc://one:1234')
00322 self.assert_(rm.services.has_key('s1'))
00323 self.assertEquals('rosrpc://one:1234', rm.services.get_service_api('s1'))
00324 self.assertEquals('http://oneB:1234', rm.get_node('caller1').api)
00325 self.assertEquals([['s1', ['caller1']]], rm.services.get_state())
00326
00327
00328 rm.register_service('s1', 'caller1', 'http://oneB:1234', 'rosrpc://oneB:1234')
00329 self.assert_(rm.services.has_key('s1'))
00330 self.assertEquals('rosrpc://oneB:1234', rm.services.get_service_api('s1'))
00331 self.assertEquals('http://oneB:1234', rm.get_node('caller1').api)
00332 self.assertEquals([['s1', ['caller1']]], rm.services.get_state())
00333
00334 rm.register_service('s2', 'caller2', 'http://two:1234', 'rosrpc://two:1234')
00335 self.assertEquals('http://two:1234', rm.get_node('caller2').api)
00336
00337
00338 code, msg, val = rm.unregister_service('s2', 'caller2', 'rosrpc://b:1234')
00339 self.assertEquals(1, code)
00340 self.assertEquals(0, val)
00341 self.assert_(rm.services.has_key('s2'))
00342 self.assertEquals('http://two:1234', rm.get_node('caller2').api)
00343 self.assertEquals('rosrpc://two:1234', rm.services.get_service_api('s2'))
00344
00345
00346 code, msg, val = rm.unregister_service('unknown', 'caller2', 'rosrpc://two:1234')
00347 self.assertEquals(1, code)
00348 self.assertEquals(0, val)
00349 self.assert_(rm.services.has_key('s2'))
00350 self.assertEquals('http://two:1234', rm.get_node('caller2').api)
00351 self.assertEquals('rosrpc://two:1234', rm.services.get_service_api('s2'))
00352
00353
00354 code,msg, val = rm.unregister_service('s2', 'caller2', 'rosrpc://two:1234')
00355 self.assertEquals(1, code)
00356 self.assertEquals(1, val)
00357 self.assert_(rm.services.has_key('s1'))
00358 self.failIf(rm.services.has_key('s2'))
00359 self.assertEquals(None, rm.get_node('caller2'))
00360
00361 code, msg, val = rm.unregister_service('s1', 'caller1', 'rosrpc://oneB:1234')
00362 self.assertEquals(1, code)
00363 self.assertEquals(1, val)
00364 self.assert_(not rm.services.__nonzero__())
00365 self.failIf(rm.services.has_key('s1'))
00366 self.assertEquals(None, rm.get_node('caller1'))
00367
00368 def test_RegistrationManager_topic_pub(self):
00369 from rosmaster.registrations import Registrations, RegistrationManager
00370 rm = RegistrationManager(ThreadPoolMock())
00371 self.subtest_RegistrationManager(rm, rm.publishers, rm.register_publisher, rm.unregister_publisher)
00372
00373 def test_RegistrationManager_topic_sub(self):
00374 from rosmaster.registrations import Registrations, RegistrationManager
00375 rm = RegistrationManager(ThreadPoolMock())
00376 self.subtest_RegistrationManager(rm, rm.subscribers, rm.register_subscriber, rm.unregister_subscriber)
00377 def test_RegistrationManager_param_sub(self):
00378 from rosmaster.registrations import Registrations, RegistrationManager
00379 rm = RegistrationManager(ThreadPoolMock())
00380 self.subtest_RegistrationManager(rm, rm.param_subscribers, rm.register_param_subscriber, rm.unregister_param_subscriber)
00381
00382 def subtest_RegistrationManager(self, rm, r, register, unregister):
00383 self.assertEquals(None, rm.get_node('caller1'))
00384
00385 register('key1', 'caller1', 'http://one:1234')
00386 self.assert_(r.has_key('key1'))
00387 self.assertEquals('http://one:1234', rm.get_node('caller1').api)
00388 self.assertEquals([['key1', ['caller1']]], r.get_state())
00389
00390
00391 register('key1', 'caller1', 'http://oneB:1234')
00392 self.assert_(r.has_key('key1'))
00393 self.assertEquals('http://oneB:1234', rm.get_node('caller1').api)
00394 self.assertEquals([['key1', ['caller1']]], r.get_state())
00395
00396 register('key2', 'caller2', 'http://two:1234')
00397 self.assertEquals('http://two:1234', rm.get_node('caller2').api)
00398
00399
00400 code, msg, val = unregister('key2', 'caller2', 'http://b:1234')
00401 self.assertEquals(1, code)
00402 self.assertEquals(0, val)
00403 self.assertEquals('http://two:1234', rm.get_node('caller2').api)
00404
00405
00406 code, msg, val = unregister('unknown', 'caller2', 'http://two:1234')
00407 self.assertEquals(1, code)
00408 self.assertEquals(0, val)
00409 self.assert_(r.has_key('key2'))
00410 self.assertEquals('http://two:1234', rm.get_node('caller2').api)
00411
00412
00413 code, msg, val = rm.unregister_publisher('key2', 'unknown', 'http://unknown:1')
00414 self.assertEquals(1, code)
00415 self.assertEquals(0, val)
00416 self.assert_(r.has_key('key2'))
00417
00418
00419 code,msg, val = unregister('key2', 'caller2', 'http://two:1234')
00420 self.assertEquals(1, code)
00421 self.assertEquals(1, val)
00422 self.assert_(r.has_key('key1'))
00423 self.failIf(r.has_key('key2'))
00424 self.assertEquals(None, rm.get_node('caller2'))
00425
00426 code, msg, val = unregister('key1', 'caller1', 'http://oneB:1234')
00427 self.assertEquals(1, code)
00428 self.assertEquals(1, val)
00429 self.assert_(not r.__nonzero__())
00430 self.failIf(r.has_key('key1'))
00431 self.assertEquals(None, rm.get_node('caller1'))
00432
00433 def test_RegistrationManager_base(self):
00434 import rosmaster.exceptions
00435 from rosmaster.registrations import Registrations, RegistrationManager
00436 threadpool = ThreadPoolMock()
00437
00438 rm = RegistrationManager(threadpool)
00439 self.assert_(isinstance(rm.services, Registrations))
00440 self.assertEquals(Registrations.SERVICE, rm.services.type)
00441 self.assert_(isinstance(rm.param_subscribers, Registrations))
00442 self.assertEquals(Registrations.PARAM_SUBSCRIPTIONS, rm.param_subscribers.type)
00443 self.assert_(isinstance(rm.subscribers, Registrations))
00444 self.assertEquals(Registrations.TOPIC_SUBSCRIPTIONS, rm.subscribers.type)
00445 self.assert_(isinstance(rm.subscribers, Registrations))
00446 self.assertEquals(Registrations.TOPIC_PUBLICATIONS, rm.publishers.type)
00447 self.assert_(isinstance(rm.publishers, Registrations))
00448
00449
00450 rm.register_publisher('pub1', 'caller1', 'http://one:1')
00451 rm.register_publisher('pub1', 'caller2', 'http://two:1')
00452 rm.register_publisher('pub1', 'caller3', 'http://three:1')
00453 rm.register_subscriber('sub1', 'caller1', 'http://one:1')
00454 rm.register_subscriber('sub1', 'caller2', 'http://two:1')
00455 rm.register_subscriber('sub1', 'caller3', 'http://three:1')
00456 rm.register_param_subscriber('p1', 'caller1', 'http://one:1')
00457 rm.register_param_subscriber('p1', 'caller2', 'http://two:1')
00458 rm.register_param_subscriber('p1', 'caller3', 'http://three:1')
00459 rm.register_service('s1', 'caller1', 'http://one:1', 'rosrpc://one:1')
00460 self.assertEquals('http://one:1', rm.get_node('caller1').api)
00461 self.assertEquals('http://two:1', rm.get_node('caller2').api)
00462 self.assertEquals('http://three:1', rm.get_node('caller3').api)
00463
00464
00465 rm.register_service('s1', 'caller1', 'http://one:1', 'rosrpc://oneB:1')
00466 n = rm.get_node('caller1')
00467 self.assertEquals(['pub1'], n.topic_publications)
00468 self.assertEquals(['sub1'], n.topic_subscriptions)
00469 self.assertEquals(['p1'], n.param_subscriptions)
00470 self.assertEquals(['s1'], n.services)
00471 self.assert_('http://one:1' in rm.publishers.get_apis('pub1'))
00472 self.assert_('http://one:1' in rm.subscribers.get_apis('sub1'))
00473 self.assert_('http://one:1' in rm.param_subscribers.get_apis('p1'))
00474 self.assert_('http://one:1' in rm.services.get_apis('s1'))
00475
00476
00477 rm.unregister_publisher('pub1', 'caller1', 'http://not:1')
00478 self.assert_('http://one:1' in rm.publishers.get_apis('pub1'))
00479 rm.unregister_subscriber('sub1', 'caller1', 'http://not:1')
00480 self.assert_('http://one:1' in rm.subscribers.get_apis('sub1'))
00481 rm.unregister_param_subscriber('p1', 'caller1', 'http://not:1')
00482 self.assert_('http://one:1' in rm.param_subscribers.get_apis('p1'))
00483 rm.unregister_service('sub1', 'caller1', 'rosrpc://not:1')
00484 self.assert_('http://one:1' in rm.services.get_apis('s1'))
00485
00486
00487
00488 rm.register_publisher('pub1', 'caller1', 'http://newone:1')
00489 self.assertEquals('http://newone:1', rm.get_node('caller1').api)
00490
00491 n = rm.get_node('caller1')
00492 self.assertEquals(['pub1'], n.topic_publications)
00493 self.assertEquals([], n.services)
00494 self.assertEquals([], n.topic_subscriptions)
00495 self.assertEquals([], n.param_subscriptions)
00496
00497 self.assert_('http://newone:1' in rm.publishers.get_apis('pub1'))
00498
00499 self.assert_(rm.subscribers.has_key('sub1'))
00500 self.failIf('http://one:1' in rm.subscribers.get_apis('sub1'))
00501
00502 self.assert_(rm.param_subscribers.has_key('p1'))
00503 self.failIf('http://one:1' in rm.param_subscribers.get_apis('p1'))
00504
00505
00506
00507 self.assert_('http://two:1' in rm.publishers.get_apis('pub1'))
00508 self.assert_('http://two:1' in rm.subscribers.get_apis('sub1'))
00509 self.assert_('http://two:1' in rm.param_subscribers.get_apis('p1'))
00510
00511 rm.register_service('s1', 'caller2', 'http://two:1', 'rosrpc://two:1')
00512 self.assert_('http://two:1' in rm.services.get_apis('s1'))
00513 self.assert_('http://two:1' in rm.publishers.get_apis('pub1'))
00514 self.assert_('http://two:1' in rm.subscribers.get_apis('sub1'))
00515 self.assert_('http://two:1' in rm.param_subscribers.get_apis('p1'))
00516
00517 rm.register_service('s1', 'caller2', 'http://newtwo:1', 'rosrpc://newtwo:1')
00518 self.assertEquals('http://newone:1', rm.get_node('caller1').api)
00519
00520 n = rm.get_node('caller2')
00521 self.assertEquals([], n.topic_publications)
00522 self.assertEquals(['s1'], n.services)
00523 self.assertEquals([], n.topic_subscriptions)
00524 self.assertEquals([], n.param_subscriptions)
00525
00526 self.assert_(rm.publishers.has_key('pub1'))
00527 self.failIf('http://two:1' in rm.publishers.get_apis('pub1'))
00528
00529 self.assert_(rm.subscribers.has_key('sub1'))
00530 self.failIf('http://two:1' in rm.subscribers.get_apis('sub1'))
00531 self.assertEquals([['sub1', ['caller3']]], rm.subscribers.get_state())
00532
00533 self.assert_(rm.param_subscribers.has_key('p1'))
00534 self.failIf('http://two:1' in rm.param_subscribers.get_apis('p1'))
00535 self.assertEquals([['p1', ['caller3']]], rm.param_subscribers.get_state())
00536
00537
00538 def test_Registrations_unregister_all(self):
00539 import rosmaster.exceptions
00540 from rosmaster.registrations import Registrations
00541
00542 r = Registrations(Registrations.TOPIC_SUBSCRIPTIONS)
00543 for k in ['topic1', 'topic1b', 'topic1c', 'topic1d']:
00544 r.register(k, 'node1', 'http://node1:5678')
00545 r.register('topic2', 'node2', 'http://node2:5678')
00546 r.unregister_all('node1')
00547 self.failIf(not r)
00548 for k in ['topic1', 'topic1b', 'topic1c', 'topic1d']:
00549 self.failIf(r.has_key(k))
00550 self.assertEquals(['topic2'], [k for k in r.iterkeys()])
00551
00552 r = Registrations(Registrations.TOPIC_PUBLICATIONS)
00553 for k in ['topic1', 'topic1b', 'topic1c', 'topic1d']:
00554 r.register(k, 'node1', 'http://node1:5678')
00555 r.register('topic2', 'node2', 'http://node2:5678')
00556 r.unregister_all('node1')
00557 self.failIf(not r)
00558 for k in ['topic1', 'topic1b', 'topic1c', 'topic1d']:
00559 self.failIf(r.has_key(k))
00560 self.assertEquals(['topic2'], [k for k in r.iterkeys()])
00561
00562 r = Registrations(Registrations.PARAM_SUBSCRIPTIONS)
00563 r.register('param2', 'node2', 'http://node2:5678')
00564 for k in ['param1', 'param1b', 'param1c', 'param1d']:
00565 r.register(k, 'node1', 'http://node1:5678')
00566 r.unregister_all('node1')
00567 self.failIf(not r)
00568 for k in ['param1', 'param1b', 'param1c', 'param1d']:
00569 self.failIf(r.has_key(k))
00570 self.assertEquals(['param2'], [k for k in r.iterkeys()])
00571
00572 r = Registrations(Registrations.SERVICE)
00573 for k in ['service1', 'service1b', 'service1c', 'service1d']:
00574 r.register(k, 'node1', 'http://node1:5678', 'rosrpc://node1:1234')
00575 r.register('service2', 'node2', 'http://node2:5678', 'rosrpc://node2:1234')
00576 r.unregister_all('node1')
00577 self.failIf(not r)
00578 for k in ['service1', 'service1b', 'service1c', 'service1d']:
00579 self.failIf(r.has_key(k))
00580 self.assertEquals(None, r.get_service_api(k))
00581 self.assertEquals(['service2'], [k for k in r.iterkeys()])
00582 self.assertEquals('rosrpc://node2:1234', r.get_service_api('service2'))
00583
00584 def _subtest_Registrations_services(self, r):
00585 import rosmaster.exceptions
00586
00587
00588 self.assertEquals(None, r.get_service_api('s1'))
00589 r.unregister_all('node1')
00590
00591
00592 code, msg, val = r.unregister('s1', 'caller1', None, 'rosrpc://one:1234')
00593 self.assertEquals(1, code)
00594 self.assertEquals(0, val)
00595
00596 try:
00597 r.register('service1', 'node1', 'http://node1:5678')
00598 self.fail("should require service_api")
00599 except rosmaster.exceptions.InternalException: pass
00600
00601 r.register('service1', 'node1', 'http://node1:5678', 'rosrpc://node1:1234')
00602
00603 self.assert_('service1' in r)
00604 self.assert_(r.has_key('service1'))
00605 self.assertEquals(['service1'], [k for k in r.iterkeys()])
00606 self.assertEquals(['http://node1:5678'], r.get_apis('service1'))
00607 self.assertEquals('rosrpc://node1:1234', r.get_service_api('service1'))
00608 self.assertEquals([('node1', 'http://node1:5678')], r['service1'])
00609 self.failIf(not r)
00610 self.assertEquals([['service1', ['node1']]], r.get_state())
00611
00612 r.register('service1', 'node2', 'http://node2:5678', 'rosrpc://node2:1234')
00613 self.assertEquals(['service1'], [k for k in r.iterkeys()])
00614 self.assertEquals('rosrpc://node2:1234', r.get_service_api('service1'))
00615 self.assertEquals(['http://node2:5678'], r.get_apis('service1'))
00616 self.assertEquals([('node2', 'http://node2:5678')], r['service1'])
00617 self.assertEquals([['service1', ['node2']]], r.get_state())
00618
00619
00620 r.register('service2', 'node3', 'http://node3:5678', 'rosrpc://node3:1234')
00621 self.assertEquals('rosrpc://node3:1234', r.get_service_api('service2'))
00622 self.assertEquals(2, len(r.get_state()))
00623 self.assert_(['service2', ['node3']] in r.get_state(), r.get_state())
00624 self.assert_(['service1', ['node2']] in r.get_state())
00625
00626
00627 r.register('service1b', 'node2', 'http://node2:5678', 'rosrpc://node2:1234')
00628 self.assertEquals(3, len(r.get_state()))
00629 self.assert_(['service2', ['node3']] in r.get_state())
00630 self.assert_(['service1b', ['node2']] in r.get_state())
00631 self.assert_(['service1', ['node2']] in r.get_state())
00632
00633
00634 try:
00635 r.unregister('service1', 'node2', 'http://node2:1234')
00636 self.fail("service_api param must be specified")
00637 except rosmaster.exceptions.InternalException: pass
00638
00639
00640 code, _, val = r.unregister('unknown', 'node2', 'http://node2:5678', 'rosprc://node2:1234')
00641 self.assertEquals(0, val)
00642
00643 code, _, val = r.unregister('service1', 'node3', 'http://node3:5678', 'rosrpc://node3:1234')
00644 self.assertEquals(0, val)
00645
00646 code, _, val = r.unregister('service1', 'node2', 'http://node2b:5678', 'rosrpc://node3:1234')
00647 self.assertEquals(0, val)
00648
00649
00650 code, _, val = r.unregister('service2', 'node3', 'http://node3:5678', 'rosrpc://node3:1234')
00651 self.assertEquals(1, code)
00652 self.assertEquals(1, val)
00653 self.failIf('service2' in r)
00654 self.failIf(r.has_key('service2'))
00655 self.assert_('service1' in [k for k in r.iterkeys()])
00656 self.assert_('service1b' in [k for k in r.iterkeys()])
00657 self.assertEquals([], r.get_apis('service2'))
00658 self.assertEquals([], r['service2'])
00659 self.failIf(not r)
00660 self.assertEquals(2, len(r.get_state()))
00661 self.failIf(['service2', ['node3']] in r.get_state())
00662
00663
00664 code, _, val = r.unregister('service1', 'node2', 'http://node2:5678', 'rosrpc://node2:1234')
00665 self.assertEquals(1, code)
00666 self.assertEquals(1, val)
00667 self.failIf('service1' in r)
00668 self.failIf(r.has_key('service1'))
00669 self.assertEquals(['service1b'], [k for k in r.iterkeys()])
00670 self.assertEquals([], r.get_apis('service1'))
00671 self.assertEquals([], r['service1'])
00672 self.failIf(not r)
00673 self.assertEquals([['service1b', ['node2']]], r.get_state())
00674
00675 code, _, val = r.unregister('service1b', 'node2', 'http://node2:5678', 'rosrpc://node2:1234')
00676 self.assertEquals(1, code)
00677 self.assertEquals(1, val)
00678 self.failIf('service1' in r)
00679 self.failIf(r.has_key('service1'))
00680 self.assertEquals([], [k for k in r.iterkeys()])
00681 self.assertEquals([], r.get_apis('service1'))
00682 self.assertEquals([], r['service1'])
00683 self.assert_(not r)
00684 self.assertEquals([], r.get_state())
00685
00686 if __name__ == '__main__':
00687 import rostest
00688 rostest.unitrun('test_rosmaster', sys.argv[0], TestRosmasterRegistrations, coverage_packages=['rosmaster.registrations'])