00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038 import os
00039 import sys
00040 import string
00041 import time
00042 import unittest
00043 try:
00044 from xmlrpc.client import Fault, ServerProxy
00045 except ImportError:
00046 from xmlrpclib import Fault, ServerProxy
00047
00048 import rosunit
00049 import rosgraph
00050
00051 TCPROS = 'TCPROS'
00052
00053 CALLER_ID = '/test_harness'
00054 TEST_NODE_NAME = '/test_node'
00055
00056 class TopicDescription(object):
00057 def __init__(self, topic_name, topic_type):
00058 self.topic_name = topic_name
00059 self.topic_type = topic_type
00060
00061
00062 if not rosgraph.names.is_legal_name(topic_name):
00063 raise ValueError('topic name: %s'%(topic_name))
00064
00065
00066 p, t = topic_type.split('/')
00067
00068 class TopicDescriptionList(object):
00069
00070 def __init__(self, xmlrpcvalue):
00071
00072 if not type(xmlrpcvalue) == list:
00073 raise ValueError("publications must be a list")
00074 self.topics = []
00075 for n, t in xmlrpcvalue:
00076 self.topics.append(TopicDescription(n, t))
00077
00078 def as_dict(self):
00079 d = {}
00080 for t in self.topics:
00081 d[t.topic_name] = t.topic_type
00082 return d
00083
00084 class TestSlaveApi(unittest.TestCase):
00085
00086 def __init__(self, *args, **kwds):
00087 super(TestSlaveApi, self).__init__(*args)
00088
00089 self.ns = os.environ.get(rosgraph.ROS_NAMESPACE, rosgraph.names.GLOBALNS)
00090
00091
00092 self.test_node = 'test_node'
00093 self.required_pubs = TopicDescriptionList([])
00094 self.required_subs = TopicDescriptionList([])
00095
00096 for arg in sys.argv:
00097 if arg.startswith("--node="):
00098 self.test_node = arg[len("--node="):]
00099 if arg.startswith("--profile="):
00100 self.test_node_profile = arg[len("--profile="):]
00101 self.load_profile(self.test_node_profile)
00102
00103
00104 self.test_node = rosgraph.names.ns_join(self.ns, self.test_node)
00105
00106 def load_profile(self, filename):
00107 import yaml
00108 with open(filename) as f:
00109 d = yaml.load(f)
00110 self.required_pubs = d.get('pubs', {})
00111 self.required_subs = d.get('subs', {})
00112
00113 def setUp(self):
00114 self.caller_id = CALLER_ID
00115
00116
00117 timeout_t = 10.0 + time.time()
00118 self.node_api = None
00119 self.master = rosgraph.Master(self.caller_id)
00120 while time.time() < timeout_t and not self.node_api:
00121 try:
00122 self.node_api = self.master.lookupNode(self.test_node)
00123 except:
00124 time.sleep(0.1)
00125 if not self.node_api:
00126 self.fail("master did not return XML-RPC API for [%s, %s]"%(self.caller_id, self.test_node))
00127 print "[%s] API = %s"%(self.test_node, self.node_api)
00128 self.assert_(self.node_api.startswith('http'))
00129 self.node = ServerProxy(self.node_api)
00130
00131
00132
00133 time.sleep(2.)
00134
00135 def apiSuccess(self, args):
00136 """
00137 unit test assertion that fails if status code is not 1 and otherwise returns the value parameter.
00138 @param args: returnv value from ROS API call
00139 @type args: [int, str, val]
00140 @return: value parameter from args (arg[2] for master/slave API)
00141 """
00142 self.assert_(len(args) == 3, "invalid API return value triplet: %s"%str(args))
00143 self.last_code, self.last_msg, self.last_val = args
00144 assert self.last_code == 1, "status code is not 1: %s"%self.last_msg
00145 return self.last_val
00146
00147 def apiFail(self, args):
00148 """
00149 unit test assertions that fails if status code is not 0 and otherwise returns true.
00150 @param args: returnv value from ROS API call
00151 @type args: [int, str, val]
00152 @return: True if status code is 0
00153 """
00154 self.assert_(len(args) == 3, "invalid API return value triplet: %s"%str(args))
00155 self.last_code, self.last_msg, self.last_val = args
00156 assert self.last_code == 0, "Call should have failed with status code 0: %s"%self.last_msg
00157
00158 def apiError(self, args, msg=None):
00159 """
00160 unit test assertion that fails if status code is not -1 and otherwise returns true.
00161 @param args: returnv value from ROS API call
00162 @type args: [int, str, val]
00163 @return: True if status code is -1
00164 """
00165 self.assert_(len(args) == 3, "invalid API return value triplet: %s"%str(args))
00166 self.last_code, self.last_msg, self.last_val = args
00167 if msg:
00168 assert self.last_code == -1, "%s (return msg was %s)"%(msg, self.last_msg)
00169 else:
00170 assert self.last_code == -1, "Call should have returned error -1 code: %s"%self.last_msg
00171
00172 def check_uri(self, uri):
00173 """
00174 validates a URI as being http(s)
00175 """
00176 import urlparse
00177 parsed = urlparse.urlparse(uri)
00178 self.assert_(parsed[0] in ['http', 'https'], 'protocol [%s] is [%s] invalid'%(parsed[0], uri))
00179 self.assert_(parsed[1], 'host missing [%s]'%uri)
00180 self.assert_(parsed.port, 'port missing/invalid [%s]'%uri)
00181
00182 def test_getPid(self):
00183 """
00184 validate node.getPid(caller_id)
00185 """
00186
00187 pid = self.apiSuccess(self.node.getPid(self.caller_id))
00188 self.assert_(pid > 0)
00189
00190
00191 try:
00192 self.apiError(self.node.getPid())
00193 except Fault:
00194 pass
00195
00196 def test_rosout(self):
00197 """
00198 make sure rosout is in publication and connection list
00199 """
00200 val = self.apiSuccess(self.node.getPublications(self.caller_id))
00201 pubs_d = TopicDescriptionList(val).as_dict()
00202 self.assertTrue('/rosout' in pubs_d, "node is not publishing to rosout")
00203 self.assertEquals('rosgraph_msgs/Log', pubs_d['/rosout'], "/rosout is not correct type")
00204
00205 def test_simtime(self):
00206 """
00207 test that node obeys simtime (/Clock) contract
00208
00209 http://www.ros.org/wiki/Clock
00210 """
00211 try:
00212 use_sim_time = self.master.getParam('/use_sim_time')
00213 except:
00214 use_sim_time = False
00215
00216 val = self.apiSuccess(self.node.getSubscriptions(self.caller_id))
00217 subs_d = TopicDescriptionList(val).as_dict()
00218 if use_sim_time:
00219 self.assertTrue('/clock' in subs_d, "node is not subscribing to clock")
00220 self.assertEquals('rosgraph_msgs/Clock', subs_d['/clock'], "/clock is not correct type")
00221 else:
00222 self.assertFalse('/clock' in subs_d, "node is subscribed to /clock even though /use_sim_time is false")
00223
00224 def test_getPublications(self):
00225 """
00226 validate node.getPublications(caller_id)
00227 """
00228
00229 pubs_value = self.apiSuccess(self.node.getPublications(self.caller_id))
00230 pubs = TopicDescriptionList(pubs_value)
00231
00232 pubs_dict = pubs.as_dict()
00233
00234 if '/rosout' in pubs_dict:
00235 del pubs_dict['/rosout']
00236 self.assertEquals(self.required_pubs, pubs_dict)
00237
00238
00239 try:
00240 self.apiError(self.node.getPublications())
00241 except Fault:
00242 pass
00243 try:
00244 self.apiError(self.node.getPublications(self.caller_id, 'something extra'))
00245 except Fault:
00246 pass
00247
00248 def test_getSubscriptions(self):
00249 """
00250 validate node.getSubscriptions(caller_id)
00251 """
00252
00253
00254 value = self.apiSuccess(self.node.getSubscriptions(self.caller_id))
00255 subs = TopicDescriptionList(value)
00256
00257 subs_dict = subs.as_dict()
00258 self.assertEquals(self.required_subs, subs_dict)
00259
00260
00261 try:
00262 self.apiError(self.node.getSubscriptions())
00263 except Fault:
00264 pass
00265 try:
00266 self.apiError(self.node.getSubscriptions(self.caller_id, 'something extra'))
00267 except Fault:
00268 pass
00269
00270
00271 def test_paramUpdate(self):
00272 node = self.node
00273 good_key = rosgraph.names.ns_join(self.ns, 'good_key')
00274 bad_key = rosgraph.names.ns_join(self.ns, 'bad_key')
00275
00276
00277 self.apiError(node.paramUpdate(self.caller_id, good_key, 'good_value'))
00278
00279
00280 self.apiError(node.paramUpdate(self.caller_id, '', 'bad'))
00281 self.apiError(node.paramUpdate(self.caller_id, 'no_namespace', 'bad'))
00282
00283
00284 try:
00285 self.apiError(node.paramUpdate(self.caller_id, bad_key))
00286 except Fault:
00287 pass
00288
00289 try:
00290 self.apiError(node.paramUpdate(self.caller_id))
00291 except Fault:
00292 pass
00293
00294
00295
00296
00297
00298
00299 def xtest_getUri(self):
00300 """
00301 Future: validate node.getUri(caller_id). It would be nice to
00302 make this official API as it provides some debugging info.
00303 """
00304
00305 self.check_uri(self.apiSuccess(self.node.getUri(self.caller_id)))
00306
00307
00308 try:
00309 self.apiError(self.node.getUri(self.caller_id, 'bad'))
00310 except Fault:
00311 pass
00312 try:
00313 self.apiError(self.node.getUri())
00314 except Fault:
00315 pass
00316
00317 def test_getMasterUri(self):
00318 """
00319 validate node.getMasterUri(caller_id)
00320 """
00321
00322 uri = self.apiSuccess(self.node.getMasterUri(self.caller_id))
00323 self.check_uri(uri)
00324
00325
00326 import urlparse
00327 master_env = rosgraph.get_master_uri()
00328 if not master_env.endswith('/'):
00329 master_env = master_env + '/'
00330 self.assertEquals(urlparse.urlparse(master_env), urlparse.urlparse(uri))
00331
00332
00333 try:
00334 self.apiError(self.node.getMasterUri(self.caller_id, 'bad'))
00335 except Fault:
00336 pass
00337 try:
00338 self.apiError(self.node.getMasterUri())
00339 except Fault:
00340 pass
00341
00342 def test_publisherUpdate(self):
00343 """
00344 validate node.publisherUpdate(caller_id, topic, uris)
00345 """
00346 node = self.node
00347 probe_topic = rosgraph.names.ns_join(self.ns, 'probe_topic')
00348 fake_topic = rosgraph.names.ns_join(self.ns, 'fake_topic')
00349
00350
00351
00352 self.apiSuccess(node.publisherUpdate(self.caller_id, fake_topic,
00353 ['http://localhost:1234', 'http://localhost:5678']))
00354 self.apiSuccess(node.publisherUpdate(self.caller_id, fake_topic,
00355 []))
00356
00357 self.apiSuccess(node.publisherUpdate(self.caller_id, probe_topic,
00358 ['http://unroutablefakeservice:1234']))
00359
00360 time.sleep(1.0)
00361
00362 self.apiSuccess(node.publisherUpdate(self.caller_id, probe_topic,
00363 []))
00364
00365
00366 try:
00367 self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', 'bad'))
00368 except Fault:
00369 pass
00370 try:
00371 self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', 2))
00372 except Fault:
00373 pass
00374 try:
00375 self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', False))
00376 except Fault:
00377 pass
00378 try:
00379 self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', ['bad']))
00380 except Fault:
00381 pass
00382
00383
00384 try:
00385 self.apiError(node.publisherUpdate())
00386 except Fault:
00387 pass
00388 try:
00389 self.apiError(node.getBusStats(self.caller_id, 'bad'))
00390 except Fault:
00391 pass
00392 try:
00393 self.apiError(node.getBusStats())
00394 except Fault:
00395 pass
00396
00397 def check_TCPROS(self, protocol_params):
00398 self.assert_(protocol_params, "no protocol params returned")
00399 self.assert_(type(protocol_params) == list, "protocol params must be a list: %s"%protocol_params)
00400 self.assertEquals(3, len(protocol_params), "TCPROS params should have length 3: %s"%protocol_params)
00401 self.assertEquals(protocol_params[0], TCPROS)
00402
00403 self.assertEquals(protocol_params[0], TCPROS)
00404
00405 def testRequestTopic(self):
00406 node = self.node
00407 protocols = [[TCPROS]]
00408
00409 publications = node.getPublications(self.caller_id)
00410
00411 topics = self.required_pubs.keys()
00412 probe_topic = topics[0] if topics else None
00413 fake_topic = rosgraph.names.ns_join(self.ns, 'fake_topic')
00414
00415
00416 protocols = [[TCPROS]]
00417 for topic in topics:
00418 self.check_TCPROS(self.apiSuccess(node.requestTopic(self.caller_id, topic, protocols)))
00419 protocols = [['FakeTransport', 1234, 5678], [TCPROS], ['AnotherFakeTransport']]
00420
00421 for topic in topics:
00422 self.check_TCPROS(self.apiSuccess(node.requestTopic(self.caller_id, topic, protocols)))
00423
00424
00425 if probe_topic:
00426 try:
00427 self.apiError(node.requestTopic(self.caller_id, probe_topic, protocols, 'extra stuff'))
00428 except Fault:
00429 pass
00430 try:
00431 self.apiError(node.requestTopic(self.caller_id, probe_topic))
00432 except Fault:
00433 pass
00434 try:
00435 self.apiError(node.requestTopic(self.caller_id))
00436 except Fault:
00437 pass
00438 try:
00439 self.apiError(node.requestTopic())
00440 except Fault:
00441 pass
00442
00443
00444 try:
00445 self.apiError(node.requestTopic(self.caller_id, 1, protocols))
00446 except Fault:
00447 pass
00448 try:
00449 self.apiError(node.requestTopic(self.caller_id, '', protocols))
00450 except Fault:
00451 pass
00452 try:
00453 self.apiError(node.requestTopic(self.caller_id, fake_topic, protocols))
00454 except Fault:
00455 pass
00456 try:
00457 self.apiError(node.requestTopic(self.caller_id, probe_topic, 'fake-protocols'))
00458 except Fault:
00459 pass
00460
00461
00462 def test_getBusInfo(self):
00463
00464
00465
00466
00467 try:
00468 self.apiError(self.node.getBusInfo(self.caller_id, 'bad'))
00469 except Fault:
00470 pass
00471 try:
00472 self.apiError(self.node.getBusInfo())
00473 except Fault:
00474 pass
00475
00476
00477
00478 def test_registrations(self):
00479
00480
00481
00482 node_name = self.test_node
00483 pubs, subs, srvs = self.master.getSystemState()
00484 pub_topics = [t for t, _ in pubs]
00485 sub_topics = [t for t, _ in subs]
00486
00487
00488 for t in self.required_pubs:
00489 self.assert_(t in pub_topics, "node did not register publication %s on master"%(t))
00490 for t in self.required_subs:
00491 self.assert_(t in sub_topics, "node did not register subscription %s on master"%(t))
00492
00493
00494 for topic, node_list in pubs:
00495 if topic in self.required_pubs:
00496 self.assert_(node_name in node_list, "%s not in %s"%(self.node_api, node_list))
00497 for topic, node_list in subs:
00498 if topic in self.required_subs:
00499 self.assert_(node_name in node_list, "%s not in %s"%(self.node_api, node_list))
00500 for service, srv_list in srvs:
00501
00502 pass
00503
00504 if __name__ == '__main__':
00505 rosunit.unitrun('test_rosmaster', sys.argv[0], TestSlaveApi)