44 from xmlrpc.client
import Fault, ServerProxy
46 from xmlrpclib
import Fault, ServerProxy
53 CALLER_ID =
'/test_harness' 54 TEST_NODE_NAME =
'/test_node' 62 if not rosgraph.names.is_legal_name(topic_name):
63 raise ValueError(
'topic name: %s'%(topic_name))
66 p, t = topic_type.split(
'/')
72 if not type(xmlrpcvalue) == list:
73 raise ValueError(
"publications must be a list")
75 for n, t
in xmlrpcvalue:
81 d[t.topic_name] = t.topic_type
87 super(TestSlaveApi, self).
__init__(*args)
89 self.
ns = os.environ.get(rosgraph.ROS_NAMESPACE, rosgraph.names.GLOBALNS)
97 if arg.startswith(
"--node="):
99 if arg.startswith(
"--profile="):
108 with open(filename)
as f:
117 timeout_t = 10.0 + time.time()
120 while time.time() < timeout_t
and not self.
node_api:
126 self.fail(
"master did not return XML-RPC API for [%s, %s]"%(self.
caller_id, self.
test_node))
128 self.assert_(self.node_api.startswith(
'http'))
137 unit test assertion that fails if status code is not 1 and otherwise returns the value parameter. 138 @param args: returnv value from ROS API call 139 @type args: [int, str, val] 140 @return: value parameter from args (arg[2] for master/slave API) 142 self.assert_(len(args) == 3,
"invalid API return value triplet: %s"%str(args))
144 assert self.
last_code == 1,
"status code is not 1: %s"%self.last_msg
149 unit test assertions that fails if status code is not 0 and otherwise returns true. 150 @param args: returnv value from ROS API call 151 @type args: [int, str, val] 152 @return: True if status code is 0 154 self.assert_(len(args) == 3,
"invalid API return value triplet: %s"%str(args))
156 assert self.
last_code == 0,
"Call should have failed with status code 0: %s"%self.last_msg
160 unit test assertion that fails if status code is not -1 and otherwise returns true. 161 @param args: returnv value from ROS API call 162 @type args: [int, str, val] 163 @return: True if status code is -1 165 self.assert_(len(args) == 3,
"invalid API return value triplet: %s"%str(args))
168 assert self.
last_code == -1,
"%s (return msg was %s)"%(msg, self.last_msg)
170 assert self.
last_code == -1,
"Call should have returned error -1 code: %s"%self.last_msg
174 validates a URI as being http(s) 177 parsed = urlparse.urlparse(uri)
178 self.assert_(parsed[0]
in [
'http',
'https'],
'protocol [%s] is [%s] invalid'%(parsed[0], uri))
179 self.assert_(parsed[1],
'host missing [%s]'%uri)
180 self.assert_(parsed.port,
'port missing/invalid [%s]'%uri)
184 validate node.getPid(caller_id) 188 self.assert_(pid > 0)
198 make sure rosout is in publication and connection list 202 self.assertTrue(
'/rosout' in pubs_d,
"node is not publishing to rosout")
203 self.assertEquals(
'rosgraph_msgs/Log', pubs_d[
'/rosout'],
"/rosout is not correct type")
207 test that node obeys simtime (/Clock) contract 209 http://www.ros.org/wiki/Clock 212 use_sim_time = self.master.getParam(
'/use_sim_time')
219 self.assertTrue(
'/clock' in subs_d,
"node is not subscribing to clock")
220 self.assertEquals(
'rosgraph_msgs/Clock', subs_d[
'/clock'],
"/clock is not correct type")
222 self.assertFalse(
'/clock' in subs_d,
"node is subscribed to /clock even though /use_sim_time is false")
226 validate node.getPublications(caller_id) 232 pubs_dict = pubs.as_dict()
234 if '/rosout' in pubs_dict:
235 del pubs_dict[
'/rosout']
240 self.
apiError(self.node.getPublications())
250 validate node.getSubscriptions(caller_id) 257 subs_dict = subs.as_dict()
262 self.
apiError(self.node.getSubscriptions())
273 good_key = rosgraph.names.ns_join(self.
ns,
'good_key')
274 bad_key = rosgraph.names.ns_join(self.
ns,
'bad_key')
301 Future: validate node.getUri(caller_id). It would be nice to 302 make this official API as it provides some debugging info. 319 validate node.getMasterUri(caller_id) 327 master_env = rosgraph.get_master_uri()
328 if not master_env.endswith(
'/'):
329 master_env = master_env +
'/' 330 self.assertEquals(urlparse.urlparse(master_env), urlparse.urlparse(uri))
338 self.
apiError(self.node.getMasterUri())
344 validate node.publisherUpdate(caller_id, topic, uris) 347 probe_topic = rosgraph.names.ns_join(self.
ns,
'probe_topic')
348 fake_topic = rosgraph.names.ns_join(self.
ns,
'fake_topic')
353 [
'http://localhost:1234',
'http://localhost:5678']))
358 [
'http://unroutablefakeservice:1234']))
385 self.
apiError(node.publisherUpdate())
398 self.assert_(protocol_params,
"no protocol params returned")
399 self.assert_(type(protocol_params) == list,
"protocol params must be a list: %s"%protocol_params)
400 self.assertEquals(3, len(protocol_params),
"TCPROS params should have length 3: %s"%protocol_params)
401 self.assertEquals(protocol_params[0], TCPROS)
403 self.assertEquals(protocol_params[0], TCPROS)
407 protocols = [[TCPROS]]
409 publications = node.getPublications(self.
caller_id)
411 topics = self.required_pubs.keys()
412 probe_topic = topics[0]
if topics
else None 413 fake_topic = rosgraph.names.ns_join(self.
ns,
'fake_topic')
416 protocols = [[TCPROS]]
419 protocols = [[
'FakeTransport', 1234, 5678], [TCPROS], [
'AnotherFakeTransport']]
427 self.
apiError(node.requestTopic(self.
caller_id, probe_topic, protocols,
'extra stuff'))
472 self.
apiError(self.node.getBusInfo())
483 pubs, subs, srvs = self.master.getSystemState()
484 pub_topics = [t
for t, _
in pubs]
485 sub_topics = [t
for t, _
in subs]
489 self.assert_(t
in pub_topics,
"node did not register publication %s on master"%(t))
491 self.assert_(t
in sub_topics,
"node did not register subscription %s on master"%(t))
494 for topic, node_list
in pubs:
496 self.assert_(node_name
in node_list,
"%s not in %s"%(self.
node_api, node_list))
497 for topic, node_list
in subs:
499 self.assert_(node_name
in node_list,
"%s not in %s"%(self.
node_api, node_list))
500 for service, srv_list
in srvs:
504 if __name__ ==
'__main__':
505 rosunit.unitrun(
'test_rosmaster', sys.argv[0], TestSlaveApi)
def test_getSubscriptions(self)
def testRequestTopic(self)
def __init__(self, args, kwds)
def __init__(self, topic_name, topic_type)
def load_profile(self, filename)
def __init__(self, xmlrpcvalue)
def check_TCPROS(self, protocol_params)
def test_getBusInfo(self)
def test_registrations(self)
test the state of the master based on expected node registration
def test_paramUpdate(self)
validate node.paramUpdate(caller_id, key, value)
def test_getPublications(self)
def apiError(self, args, msg=None)
def test_publisherUpdate(self)
def test_getMasterUri(self)
def apiSuccess(self, args)