00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 import roslib; roslib.load_manifest('test_rosgraph')
00033
00034 import os
00035 import sys
00036 import unittest
00037
00038 import rosgraph.masterapi
00039 import rostest
00040
00041 _ID = '/caller_id'
00042 _MASTER_URI = 'http://localhost:12345'
00043
00044 class MasterMock(object):
00045 """
00046 Mock for testing Master without using actual master
00047 """
00048
00049 def __init__(self):
00050 self.call = None
00051 self.return_val = None
00052
00053 def getMasterUri(self, caller_id):
00054 self.call = ('getMasterUri', caller_id)
00055 return self.return_val
00056
00057 def getPid(self, caller_id):
00058 self.call = ('getPid', caller_id)
00059 return self.return_val
00060
00061 def getUri(self, caller_id):
00062 self.call = ('getUri', caller_id)
00063 return self.return_val
00064
00065 def registerService(self, caller_id, service, service_api, caller_api):
00066 self.call = ('registerService', caller_id, service, service_api, caller_api)
00067 return self.return_val
00068
00069 def lookupService(self, caller_id, service):
00070 self.call = ('lookupService', caller_id, service)
00071 return self.return_val
00072
00073 def unregisterService(self, caller_id, service, service_api):
00074 self.call = ('unregisterService', caller_id, service, service_api)
00075 return self.return_val
00076
00077 def registerSubscriber(self, caller_id, topic, topic_type, caller_api):
00078 self.call = ('registerSubscriber', caller_id, topic, topic_type, caller_api)
00079 return self.return_val
00080
00081 def unregisterSubscriber(self, caller_id, topic, caller_api):
00082 self.call = ('unregisterSubscriber', caller_id, topic, caller_api)
00083 return self.return_val
00084
00085 def registerPublisher(self, caller_id, topic, topic_type, caller_api):
00086 self.call = ('registerPublisher', caller_id, topic, topic_type, caller_api)
00087 return self.return_val
00088
00089 def unregisterPublisher(self, caller_id, topic, caller_api):
00090 self.call = ('unregisterPublisher', caller_id, topic, caller_api)
00091 return self.return_val
00092
00093 def lookupNode(self, caller_id, node_name):
00094 self.call = ('lookupNode', caller_id, node_name)
00095 return self.return_val
00096
00097 def getPublishedTopics(self, caller_id, subgraph):
00098 self.call = ('getPublishedTopics', caller_id, subgraph)
00099 return self.return_val
00100
00101 def getTopicTypes(self, caller_id):
00102 self.call = ('getTopicTypes', caller_id)
00103 return self.return_val
00104
00105 def getSystemState(self, caller_id):
00106 self.call = ('getSystemState', caller_id)
00107 return self.return_val
00108
00109 def getParam(self, caller_id, p):
00110 self.call = ('getParam', caller_id, p)
00111 return self.return_val
00112
00113 def hasParam(self, caller_id, p):
00114 self.call = ('hasParam', caller_id, p)
00115 return self.return_val
00116
00117 def deleteParam(self, caller_id, p):
00118 self.call = ('deleteParam', caller_id, p)
00119 return self.return_val
00120
00121 def searchParam(self, caller_id, p):
00122 self.call = ('searchParam', caller_id, p)
00123 return self.return_val
00124
00125 def setParam(self, caller_id, p, v):
00126 self.call = ('setParam', caller_id, p, v)
00127 return self.return_val
00128
00129 def subscribeParam(self, caller_id, api, p):
00130 self.call = ('subscribeParam', caller_id, api, p)
00131 return self.return_val
00132
00133 def unsubscribeParam(self, caller_id, api, p):
00134 self.call = ('unsubscribeParam', caller_id, api, p)
00135 return self.return_val
00136
00137 def getParamNames(self, caller_id):
00138 self.call = ('getParamNames', caller_id)
00139 return self.return_val
00140
00141 class MasterApiOfflineTest(unittest.TestCase):
00142
00143 def setUp(self):
00144 self.m = rosgraph.masterapi.Master(_ID, master_uri = _MASTER_URI)
00145
00146 self.m.handle = MasterMock()
00147
00148 def throw_failure(self, attr, args, ret_val):
00149 self.m.handle.return_val = ret_val
00150 f = getattr(self.m, attr)
00151 try:
00152 f(*args)
00153 self.fail("[%s] should have thrown Failure with args [%s], ret_val [%s]"%(attr, str(args), str(ret_val)))
00154 except rosgraph.masterapi.Failure:
00155 pass
00156
00157 def throw_error(self, attr, args, ret_val):
00158 self.m.handle.return_val = ret_val
00159 f = getattr(self.m, attr)
00160 try:
00161 f(*args)
00162 self.fail("[%s] should have thrown Error with args [%s], ret_val [%s]"%(attr, str(args), str(ret_val)))
00163 except rosgraph.masterapi.Error:
00164 pass
00165
00166 def test_Master(self):
00167
00168 m = rosgraph.masterapi.Master(_ID, master_uri = 'http://localhost:12345')
00169 self.assertEquals(_ID, m.caller_id)
00170 self.assertEquals(_MASTER_URI, m.master_uri)
00171
00172 m = rosgraph.masterapi.Master(_ID)
00173 self.assertEquals(os.environ['ROS_MASTER_URI'], m.master_uri)
00174
00175 id = '/some/other/id'
00176 m = rosgraph.masterapi.Master(id)
00177 self.assertEquals(id, m.caller_id)
00178
00179 def test_getPid(self):
00180 h = self.m.handle
00181 r = 1235
00182 h.return_val = (1, '', r)
00183 self.assertEquals(r, self.m.getPid())
00184 self.assertEquals(('getPid',_ID), h.call)
00185 self.throw_failure('getPid', (), (0, '', r))
00186 self.throw_error('getPid', (), (-1, '', r))
00187
00188 def test_getPid(self):
00189 h = self.m.handle
00190 r = 'http://foo:1234'
00191 h.return_val = (1, '', r)
00192 self.assertEquals(r, self.m.getUri())
00193 self.assertEquals(('getUri',_ID), h.call)
00194 self.throw_failure('getUri', (), (0, '', r))
00195 self.throw_error('getUri', (), (-1, '', r))
00196
00197 def test_lookupService(self):
00198 h = self.m.handle
00199 r = 'rosrpc://localhost:12345'
00200 h.return_val = (1, '', r)
00201 self.assertEquals(r, self.m.lookupService('/bar/service'))
00202 self.assertEquals(('lookupService',_ID, '/bar/service'), h.call)
00203 self.throw_failure('lookupService', ('/bar/service',), (0, '', r))
00204 self.throw_error('lookupService', ('/bar/service',), (-1, '', r))
00205
00206 def test_registerService(self):
00207 h = self.m.handle
00208 r = 11
00209 h.return_val = (1, '', r)
00210 self.assertEquals(r, self.m.registerService('/bar/service', 'rosrpc://localhost:9812', 'http://localhost:893'))
00211 self.assertEquals(('registerService',_ID, '/bar/service', 'rosrpc://localhost:9812', 'http://localhost:893'), h.call)
00212 args = ('/bar/service', 'rosrpc://localhost:9812', 'http://localhost:893')
00213 self.throw_failure('registerService', args, (0, '', r))
00214 self.throw_error('registerService', args, (-1, '', r))
00215
00216 def test_unregisterService(self):
00217 h = self.m.handle
00218 r = 1
00219 h.return_val = (1, '', r)
00220 self.assertEquals(r, self.m.unregisterService('/bar/service', 'rosrpc://localhost:9812'))
00221 self.assertEquals(('unregisterService',_ID, '/bar/service', 'rosrpc://localhost:9812'), h.call)
00222 args = ('/bar/service', 'rosrpc://localhost:9812')
00223 self.throw_failure('unregisterService', args, (0, '', r))
00224 self.throw_error('unregisterService', args, (-1, '', r))
00225
00226 def test_registerSubscriber(self):
00227 h = self.m.handle
00228 r = ['http://localhost:1234', 'http://localhost:98127']
00229 h.return_val = (1, '', r)
00230 self.assertEquals(r, self.m.registerSubscriber('/foo/node', 'std_msgs/String', 'http://localhost:9812'))
00231 self.assertEquals(('registerSubscriber',_ID, '/foo/node', 'std_msgs/String', 'http://localhost:9812'), h.call)
00232 args = ('/foo/node', 'std_msgs/String', 'http://localhost:9812')
00233 self.throw_failure('registerSubscriber', args, (0, '', r))
00234 self.throw_error('registerSubscriber', args, (-1, '', r))
00235
00236 def test_unregisterSubscriber(self):
00237 h = self.m.handle
00238 r = 1
00239 h.return_val = (1, '', r)
00240 self.assertEquals(r, self.m.unregisterSubscriber('/foo/node', 'http://localhost:9812'))
00241 self.assertEquals(('unregisterSubscriber',_ID, '/foo/node', 'http://localhost:9812'), h.call)
00242 args = ('/foo/node', 'http://localhost:9812')
00243 self.throw_failure('unregisterSubscriber', args, (0, '', r))
00244 self.throw_error('unregisterSubscriber', args, (-1, '', r))
00245
00246 def test_registerPublisher(self):
00247 h = self.m.handle
00248 r = 5
00249 h.return_val = (1, '', r)
00250 self.assertEquals(r, self.m.registerPublisher('/foo/node', 'std_msgs/String', 'http://localhost:9812'))
00251 self.assertEquals(('registerPublisher',_ID, '/foo/node', 'std_msgs/String', 'http://localhost:9812'), h.call)
00252 args = ('/foo/node', 'std_msgs/String', 'http://localhost:9812')
00253 self.throw_failure('registerPublisher', args, (0, '', r))
00254 self.throw_error('registerPublisher', args, (-1, '', r))
00255
00256 def test_unregisterPublisher(self):
00257 h = self.m.handle
00258 r = 10
00259 h.return_val = (1, '', r)
00260 self.assertEquals(r, self.m.unregisterPublisher('/foo/node', 'http://localhost:9812'))
00261 self.assertEquals(('unregisterPublisher',_ID, '/foo/node', 'http://localhost:9812'), h.call)
00262 args = ('/foo/node', 'http://localhost:9812')
00263 self.throw_failure('unregisterPublisher', args, (0, '', r))
00264 self.throw_error('unregisterPublisher', args, (-1, '', r))
00265
00266 def test_lookupNode(self):
00267 h = self.m.handle
00268 r = 'http://localhost:123'
00269 h.return_val = (1, '', r)
00270 self.assertEquals(r, self.m.lookupNode('/foo/node'))
00271 self.assertEquals(('lookupNode',_ID, '/foo/node'), h.call)
00272 args = ('/foo/node',)
00273 self.throw_failure('lookupNode', args, (0, '', r))
00274 self.throw_error('lookupNode', args, (-1, '', r))
00275
00276 def test_getPublishedTopics(self):
00277 h = self.m.handle
00278 r = [ ['foo', 'bar'], ['baz', 'blah'] ]
00279 h.return_val = (1, '', r)
00280 self.assertEquals(r, self.m.getPublishedTopics('/foo'))
00281 self.assertEquals(('getPublishedTopics',_ID, '/foo'), h.call)
00282 args = ('/baz',)
00283 self.throw_failure('getPublishedTopics', args, (0, '', r))
00284 self.throw_error('getPublishedTopics', args, (-1, '', r))
00285
00286 def test_getTopicTypes(self):
00287 h = self.m.handle
00288 r = [ ['/foo', 'std_msgs/String'], ['/baz', 'std_msgs/Int32'] ]
00289 h.return_val = (1, '', r)
00290 self.assertEquals(r, self.m.getTopicTypes())
00291 self.assertEquals(('getTopicTypes',_ID), h.call)
00292 self.throw_failure('getTopicTypes', (), (0, '', r))
00293 self.throw_error('getTopicTypes', (), (-1, '', r))
00294
00295
00296 def test_getSystemState(self):
00297 h = self.m.handle
00298 r = [ [], [], [] ]
00299 h.return_val = (1, '', r)
00300 self.assertEquals(r, self.m.getSystemState())
00301 self.assertEquals(('getSystemState', _ID), h.call)
00302 self.throw_failure('getSystemState', (), (0, '', r))
00303 self.throw_error('getSystemState', (), (-1, '', r))
00304
00305
00306
00307
00308 def test_getParam(self):
00309 h = self.m.handle
00310 r = 1
00311 h.return_val = (1, '', r)
00312 p = '/test_param'
00313 self.assertEquals(r, self.m.getParam(p))
00314 self.assertEquals(('getParam', _ID, p), h.call)
00315 args = (p,)
00316 self.throw_failure('getParam', args, (0, '', r))
00317 self.throw_error('getParam', args, (-1, '', r))
00318
00319 def test_getParamNames(self):
00320 h = self.m.handle
00321 r = [ '/foo' ]
00322 h.return_val = (1, '', r)
00323 self.assertEquals(r, self.m.getParamNames())
00324 self.assertEquals(('getParamNames', _ID), h.call)
00325 self.throw_failure('getParamNames', (), (0, '', r))
00326 self.throw_error('getParamNames', (), (-1, '', r))
00327
00328 def test_hasParam(self):
00329 h = self.m.handle
00330 r = True
00331 h.return_val = (1, '', r)
00332 p = '/test_param'
00333 self.assertEquals(r, self.m.hasParam(p))
00334 self.assertEquals(('hasParam', _ID, p), h.call)
00335 self.throw_failure('hasParam', (p,), (0, '', r))
00336 self.throw_error('hasParam', (p,), (-1, '', r))
00337
00338 def test_searchParam(self):
00339 h = self.m.handle
00340 r = '/foo'
00341 h.return_val = (1, '', r)
00342 p = '/test_param'
00343 self.assertEquals(r, self.m.searchParam(p))
00344 self.assertEquals(('searchParam', _ID, p), h.call)
00345 self.throw_failure('searchParam', (p,), (0, '', r))
00346 self.throw_error('searchParam', (p,), (-1, '', r))
00347
00348 def test_deleteParam(self):
00349 h = self.m.handle
00350 r = '/foo'
00351 h.return_val = (1, '', r)
00352 p = '/test_param'
00353 self.assertEquals(r, self.m.deleteParam(p))
00354 self.assertEquals(('deleteParam', _ID, p), h.call)
00355 self.throw_failure('deleteParam', (p,), (0, '', r))
00356 self.throw_error('deleteParam', (p,), (-1, '', r))
00357
00358 def test_is_online(self):
00359 self.failIf(rosgraph.masterapi.is_online(master_uri="http://fake:12345"))
00360
00361 self.m.handle.return_val = (1, '', 1235)
00362 self.assert_(self.m.is_online())
00363
00364 def test_subscribeParam(self):
00365 h = self.m.handle
00366 r = 1
00367 h.return_val = (1, '', r)
00368 args = ('http://bar:12345', '/test_param')
00369 self.assertEquals(r, self.m.subscribeParam(*args))
00370 self.assertEquals(('subscribeParam', _ID, args[0], args[1]), h.call)
00371 self.throw_failure('subscribeParam', args, (0, '', r))
00372 self.throw_error('subscribeParam', args, (-1, '', r))
00373
00374 def test_unsubscribeParam(self):
00375 h = self.m.handle
00376 r = 1
00377 h.return_val = (1, '', r)
00378 args = ('http://bar:12345', '/test_param')
00379 self.assertEquals(r, self.m.unsubscribeParam(*args))
00380 self.assertEquals(('unsubscribeParam', _ID, args[0], args[1]), h.call)
00381 self.throw_failure('unsubscribeParam', args, (0, '', r))
00382 self.throw_error('unsubscribeParam', args, (-1, '', r))
00383
00384 def test_setParam(self):
00385 h = self.m.handle
00386 r = 1
00387 h.return_val = (1, '', r)
00388 args = ('/test_set_param', 'foo')
00389 self.assertEquals(r, self.m.setParam(*args))
00390 self.assertEquals(('setParam', _ID, args[0], args[1]), h.call)
00391 self.throw_failure('setParam', args, (0, '', r))
00392 self.throw_error('setParam', args, (-1, '', r))
00393
00394 if __name__ == '__main__':
00395 rostest.unitrun('test_rosgraph', 'test_rosgraph_masterapi_offline', MasterApiOfflineTest, coverage_packages=['rosgraph.masterapi'])
00396