00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038 import os
00039 import sys
00040 import string
00041 import time
00042 import unittest
00043 import xmlrpclib
00044
00045 import rosunit
00046 import rosgraph
00047
00048 TCPROS = 'TCPROS'
00049
00050 CALLER_ID = '/test_harness'
00051 TEST_NODE_NAME = '/test_node'
00052
00053 class TopicDescription(object):
00054 def __init__(self, topic_name, topic_type):
00055 self.topic_name = topic_name
00056 self.topic_type = topic_type
00057
00058
00059 if not rosgraph.names.is_legal_name(topic_name):
00060 raise ValueError('topic name: %s'%(topic_name))
00061
00062
00063 p, t = topic_type.split('/')
00064
00065 class TopicDescriptionList(object):
00066
00067 def __init__(self, xmlrpcvalue):
00068
00069 if not type(xmlrpcvalue) == list:
00070 raise ValueError("publications must be a list")
00071 self.topics = []
00072 for n, t in xmlrpcvalue:
00073 self.topics.append(TopicDescription(n, t))
00074
00075 def as_dict(self):
00076 d = {}
00077 for t in self.topics:
00078 d[t.topic_name] = t.topic_type
00079 return d
00080
00081 class TestSlaveApi(unittest.TestCase):
00082
00083 def __init__(self, *args, **kwds):
00084 super(TestSlaveApi, self).__init__(*args)
00085
00086 self.ns = os.environ.get(rosgraph.ROS_NAMESPACE, rosgraph.names.GLOBALNS)
00087
00088
00089 self.test_node = 'test_node'
00090 self.required_pubs = TopicDescriptionList([])
00091 self.required_subs = TopicDescriptionList([])
00092
00093 for arg in sys.argv:
00094 if arg.startswith("--node="):
00095 self.test_node = arg[len("--node="):]
00096 if arg.startswith("--profile="):
00097 self.test_node_profile = arg[len("--profile="):]
00098 self.load_profile(self.test_node_profile)
00099
00100
00101 self.test_node = rosgraph.names.ns_join(self.ns, self.test_node)
00102
00103 def load_profile(self, filename):
00104 import yaml
00105 with open(filename) as f:
00106 d = yaml.load(f)
00107 self.required_pubs = d.get('pubs', {})
00108 self.required_subs = d.get('subs', {})
00109
00110 def setUp(self):
00111 self.caller_id = CALLER_ID
00112
00113
00114 timeout_t = 10.0 + time.time()
00115 self.node_api = None
00116 self.master = rosgraph.Master(self.caller_id)
00117 while time.time() < timeout_t and not self.node_api:
00118 try:
00119 self.node_api = self.master.lookupNode(self.test_node)
00120 except:
00121 time.sleep(0.1)
00122 if not self.node_api:
00123 self.fail("master did not return XML-RPC API for [%s, %s]"%(self.caller_id, self.test_node))
00124 print "[%s] API = %s"%(self.test_node, self.node_api)
00125 self.assert_(self.node_api.startswith('http'))
00126 self.node = xmlrpclib.ServerProxy(self.node_api)
00127
00128
00129
00130 time.sleep(2.)
00131
00132 def apiSuccess(self, args):
00133 """
00134 unit test assertion that fails if status code is not 1 and otherwise returns the value parameter.
00135 @param args: returnv value from ROS API call
00136 @type args: [int, str, val]
00137 @return: value parameter from args (arg[2] for master/slave API)
00138 """
00139 self.assert_(len(args) == 3, "invalid API return value triplet: %s"%str(args))
00140 self.last_code, self.last_msg, self.last_val = args
00141 assert self.last_code == 1, "status code is not 1: %s"%self.last_msg
00142 return self.last_val
00143
00144 def apiFail(self, args):
00145 """
00146 unit test assertions that fails if status code is not 0 and otherwise returns true.
00147 @param args: returnv value from ROS API call
00148 @type args: [int, str, val]
00149 @return: True if status code is 0
00150 """
00151 self.assert_(len(args) == 3, "invalid API return value triplet: %s"%str(args))
00152 self.last_code, self.last_msg, self.last_val = args
00153 assert self.last_code == 0, "Call should have failed with status code 0: %s"%self.last_msg
00154
00155 def apiError(self, args, msg=None):
00156 """
00157 unit test assertion that fails if status code is not -1 and otherwise returns true.
00158 @param args: returnv value from ROS API call
00159 @type args: [int, str, val]
00160 @return: True if status code is -1
00161 """
00162 self.assert_(len(args) == 3, "invalid API return value triplet: %s"%str(args))
00163 self.last_code, self.last_msg, self.last_val = args
00164 if msg:
00165 assert self.last_code == -1, "%s (return msg was %s)"%(msg, self.last_msg)
00166 else:
00167 assert self.last_code == -1, "Call should have returned error -1 code: %s"%self.last_msg
00168
00169 def check_uri(self, uri):
00170 """
00171 validates a URI as being http(s)
00172 """
00173 import urlparse
00174 parsed = urlparse.urlparse(uri)
00175 self.assert_(parsed[0] in ['http', 'https'], 'protocol [%s] is [%s] invalid'%(parsed[0], uri))
00176 self.assert_(parsed[1], 'host missing [%s]'%uri)
00177 self.assert_(parsed.port, 'port missing/invalid [%s]'%uri)
00178
00179 def test_getPid(self):
00180 """
00181 validate node.getPid(caller_id)
00182 """
00183
00184 pid = self.apiSuccess(self.node.getPid(self.caller_id))
00185 self.assert_(pid > 0)
00186
00187
00188 try:
00189 self.apiError(self.node.getPid())
00190 except xmlrpclib.Fault:
00191 pass
00192
00193 def test_rosout(self):
00194 """
00195 make sure rosout is in publication and connection list
00196 """
00197 val = self.apiSuccess(self.node.getPublications(self.caller_id))
00198 pubs_d = TopicDescriptionList(val).as_dict()
00199 self.assertTrue('/rosout' in pubs_d, "node is not publishing to rosout")
00200 self.assertEquals('rosgraph_msgs/Log', pubs_d['/rosout'], "/rosout is not correct type")
00201
00202 def test_simtime(self):
00203 """
00204 test that node obeys simtime (/Clock) contract
00205
00206 http://www.ros.org/wiki/Clock
00207 """
00208 try:
00209 use_sim_time = self.master.getParam('/use_sim_time')
00210 except:
00211 use_sim_time = False
00212
00213 val = self.apiSuccess(self.node.getSubscriptions(self.caller_id))
00214 subs_d = TopicDescriptionList(val).as_dict()
00215 if use_sim_time:
00216 self.assertTrue('/clock' in subs_d, "node is not subscribing to clock")
00217 self.assertEquals('rosgraph_msgs/Clock', subs_d['/clock'], "/clock is not correct type")
00218 else:
00219 self.assertFalse('/clock' in subs_d, "node is subscribed to /clock even though /use_sim_time is false")
00220
00221 def test_getPublications(self):
00222 """
00223 validate node.getPublications(caller_id)
00224 """
00225
00226 pubs_value = self.apiSuccess(self.node.getPublications(self.caller_id))
00227 pubs = TopicDescriptionList(pubs_value)
00228
00229 pubs_dict = pubs.as_dict()
00230
00231 if '/rosout' in pubs_dict:
00232 del pubs_dict['/rosout']
00233 self.assertEquals(self.required_pubs, pubs_dict)
00234
00235
00236 try:
00237 self.apiError(self.node.getPublications())
00238 except xmlrpclib.Fault:
00239 pass
00240 try:
00241 self.apiError(self.node.getPublications(self.caller_id, 'something extra'))
00242 except xmlrpclib.Fault:
00243 pass
00244
00245 def test_getSubscriptions(self):
00246 """
00247 validate node.getSubscriptions(caller_id)
00248 """
00249
00250
00251 value = self.apiSuccess(self.node.getSubscriptions(self.caller_id))
00252 subs = TopicDescriptionList(value)
00253
00254 subs_dict = subs.as_dict()
00255 self.assertEquals(self.required_subs, subs_dict)
00256
00257
00258 try:
00259 self.apiError(self.node.getSubscriptions())
00260 except xmlrpclib.Fault:
00261 pass
00262 try:
00263 self.apiError(self.node.getSubscriptions(self.caller_id, 'something extra'))
00264 except xmlrpclib.Fault:
00265 pass
00266
00267
00268 def test_paramUpdate(self):
00269 node = self.node
00270 good_key = rosgraph.names.ns_join(self.ns, 'good_key')
00271 bad_key = rosgraph.names.ns_join(self.ns, 'bad_key')
00272
00273
00274 self.apiError(node.paramUpdate(self.caller_id, good_key, 'good_value'))
00275
00276
00277 self.apiError(node.paramUpdate(self.caller_id, '', 'bad'))
00278 self.apiError(node.paramUpdate(self.caller_id, 'no_namespace', 'bad'))
00279
00280
00281 try:
00282 self.apiError(node.paramUpdate(self.caller_id, bad_key))
00283 except xmlrpclib.Fault:
00284 pass
00285
00286 try:
00287 self.apiError(node.paramUpdate(self.caller_id))
00288 except xmlrpclib.Fault:
00289 pass
00290
00291
00292
00293
00294
00295
00296 def xtest_getUri(self):
00297 """
00298 Future: validate node.getUri(caller_id). It would be nice to
00299 make this official API as it provides some debugging info.
00300 """
00301
00302 self.check_uri(self.apiSuccess(self.node.getUri(self.caller_id)))
00303
00304
00305 try:
00306 self.apiError(self.node.getUri(self.caller_id, 'bad'))
00307 except xmlrpclib.Fault:
00308 pass
00309 try:
00310 self.apiError(self.node.getUri())
00311 except xmlrpclib.Fault:
00312 pass
00313
00314 def test_getMasterUri(self):
00315 """
00316 validate node.getMasterUri(caller_id)
00317 """
00318
00319 uri = self.apiSuccess(self.node.getMasterUri(self.caller_id))
00320 self.check_uri(uri)
00321
00322
00323 import urlparse
00324 master_env = rosgraph.get_master_uri()
00325 if not master_env.endswith('/'):
00326 master_env = master_env + '/'
00327 self.assertEquals(urlparse.urlparse(master_env), urlparse.urlparse(uri))
00328
00329
00330 try:
00331 self.apiError(self.node.getMasterUri(self.caller_id, 'bad'))
00332 except xmlrpclib.Fault:
00333 pass
00334 try:
00335 self.apiError(self.node.getMasterUri())
00336 except xmlrpclib.Fault:
00337 pass
00338
00339 def test_publisherUpdate(self):
00340 """
00341 validate node.publisherUpdate(caller_id, topic, uris)
00342 """
00343 node = self.node
00344 probe_topic = rosgraph.names.ns_join(self.ns, 'probe_topic')
00345 fake_topic = rosgraph.names.ns_join(self.ns, 'fake_topic')
00346
00347
00348
00349 self.apiSuccess(node.publisherUpdate(self.caller_id, fake_topic,
00350 ['http://localhost:1234', 'http://localhost:5678']))
00351 self.apiSuccess(node.publisherUpdate(self.caller_id, fake_topic,
00352 []))
00353
00354 self.apiSuccess(node.publisherUpdate(self.caller_id, probe_topic,
00355 ['http://unroutablefakeservice:1234']))
00356
00357 time.sleep(1.0)
00358
00359 self.apiSuccess(node.publisherUpdate(self.caller_id, probe_topic,
00360 []))
00361
00362
00363 try:
00364 self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', 'bad'))
00365 except xmlrpclib.Fault:
00366 pass
00367 try:
00368 self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', 2))
00369 except xmlrpclib.Fault:
00370 pass
00371 try:
00372 self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', False))
00373 except xmlrpclib.Fault:
00374 pass
00375 try:
00376 self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', ['bad']))
00377 except xmlrpclib.Fault:
00378 pass
00379
00380
00381 try:
00382 self.apiError(node.publisherUpdate())
00383 except xmlrpclib.Fault:
00384 pass
00385 try:
00386 self.apiError(node.getBusStats(self.caller_id, 'bad'))
00387 except xmlrpclib.Fault:
00388 pass
00389 try:
00390 self.apiError(node.getBusStats())
00391 except xmlrpclib.Fault:
00392 pass
00393
00394 def check_TCPROS(self, protocol_params):
00395 self.assert_(protocol_params, "no protocol params returned")
00396 self.assert_(type(protocol_params) == list, "protocol params must be a list: %s"%protocol_params)
00397 self.assertEquals(3, len(protocol_params), "TCPROS params should have length 3: %s"%protocol_params)
00398 self.assertEquals(protocol_params[0], TCPROS)
00399
00400 self.assertEquals(protocol_params[0], TCPROS)
00401
00402 def testRequestTopic(self):
00403 node = self.node
00404 protocols = [[TCPROS]]
00405
00406 publications = node.getPublications(self.caller_id)
00407
00408 topics = self.required_pubs.keys()
00409 probe_topic = topics[0] if topics else None
00410 fake_topic = rosgraph.names.ns_join(self.ns, 'fake_topic')
00411
00412
00413 protocols = [[TCPROS]]
00414 for topic in topics:
00415 self.check_TCPROS(self.apiSuccess(node.requestTopic(self.caller_id, topic, protocols)))
00416 protocols = [['FakeTransport', 1234, 5678], [TCPROS], ['AnotherFakeTransport']]
00417
00418 for topic in topics:
00419 self.check_TCPROS(self.apiSuccess(node.requestTopic(self.caller_id, topic, protocols)))
00420
00421
00422 if probe_topic:
00423 try:
00424 self.apiError(node.requestTopic(self.caller_id, probe_topic, protocols, 'extra stuff'))
00425 except xmlrpclib.Fault:
00426 pass
00427 try:
00428 self.apiError(node.requestTopic(self.caller_id, probe_topic))
00429 except xmlrpclib.Fault:
00430 pass
00431 try:
00432 self.apiError(node.requestTopic(self.caller_id))
00433 except xmlrpclib.Fault:
00434 pass
00435 try:
00436 self.apiError(node.requestTopic())
00437 except xmlrpclib.Fault:
00438 pass
00439
00440
00441 try:
00442 self.apiError(node.requestTopic(self.caller_id, 1, protocols))
00443 except xmlrpclib.Fault:
00444 pass
00445 try:
00446 self.apiError(node.requestTopic(self.caller_id, '', protocols))
00447 except xmlrpclib.Fault:
00448 pass
00449 try:
00450 self.apiError(node.requestTopic(self.caller_id, fake_topic, protocols))
00451 except xmlrpclib.Fault:
00452 pass
00453 try:
00454 self.apiError(node.requestTopic(self.caller_id, probe_topic, 'fake-protocols'))
00455 except xmlrpclib.Fault:
00456 pass
00457
00458
00459 def test_getBusInfo(self):
00460
00461
00462
00463
00464 try:
00465 self.apiError(self.node.getBusInfo(self.caller_id, 'bad'))
00466 except xmlrpclib.Fault:
00467 pass
00468 try:
00469 self.apiError(self.node.getBusInfo())
00470 except xmlrpclib.Fault:
00471 pass
00472
00473
00474
00475 def test_registrations(self):
00476
00477
00478
00479 node_name = self.test_node
00480 pubs, subs, srvs = self.master.getSystemState()
00481 pub_topics = [t for t, _ in pubs]
00482 sub_topics = [t for t, _ in subs]
00483
00484
00485 for t in self.required_pubs:
00486 self.assert_(t in pub_topics, "node did not register publication %s on master"%(t))
00487 for t in self.required_subs:
00488 self.assert_(t in sub_topics, "node did not register subscription %s on master"%(t))
00489
00490
00491 for topic, node_list in pubs:
00492 if topic in self.required_pubs:
00493 self.assert_(node_name in node_list, "%s not in %s"%(self.node_api, node_list))
00494 for topic, node_list in subs:
00495 if topic in self.required_subs:
00496 self.assert_(node_name in node_list, "%s not in %s"%(self.node_api, node_list))
00497 for service, srv_list in srvs:
00498
00499 pass
00500
00501 if __name__ == '__main__':
00502 rosunit.unitrun('test_rosmaster', sys.argv[0], TestSlaveApi)