44 from xmlrpc.client
import Fault, ServerProxy
46 from xmlrpclib
import Fault, ServerProxy
53 CALLER_ID =
'/test_harness'
54 TEST_NODE_NAME =
'/test_node'
62 if not rosgraph.names.is_legal_name(topic_name):
63 raise ValueError(
'topic name: %s'%(topic_name))
66 p, t = topic_type.split(
'/')
72 if not type(xmlrpcvalue) == list:
73 raise ValueError(
"publications must be a list")
75 for n, t
in xmlrpcvalue:
81 d[t.topic_name] = t.topic_type
87 super(TestSlaveApi, self).
__init__(*args)
89 self.
ns = os.environ.get(rosgraph.ROS_NAMESPACE, rosgraph.names.GLOBALNS)
97 if arg.startswith(
"--node="):
99 if arg.startswith(
"--profile="):
108 with open(filename)
as f:
109 d = yaml.safe_load(f)
117 timeout_t = 10.0 + time.time()
120 while time.time() < timeout_t
and not self.
node_api:
126 self.fail(
"master did not return XML-RPC API for [%s, %s]"%(self.
caller_id, self.
test_node))
128 self.assertTrue(self.
node_api.startswith(
'http'))
137 unit test assertion that fails if status code is not 1 and otherwise returns the value parameter.
138 @param args: returnv value from ROS API call
139 @type args: [int, str, val]
140 @return: value parameter from args (arg[2] for master/slave API)
142 self.assertTrue(len(args) == 3,
"invalid API return value triplet: %s"%str(args))
144 assert self.
last_code == 1,
"status code is not 1: %s"%self.last_msg
149 unit test assertions that fails if status code is not 0 and otherwise returns true.
150 @param args: returnv value from ROS API call
151 @type args: [int, str, val]
152 @return: True if status code is 0
154 self.assertTrue(len(args) == 3,
"invalid API return value triplet: %s"%str(args))
156 assert self.
last_code == 0,
"Call should have failed with status code 0: %s"%self.last_msg
160 unit test assertion that fails if status code is not -1 and otherwise returns true.
161 @param args: returnv value from ROS API call
162 @type args: [int, str, val]
163 @return: True if status code is -1
165 self.assertTrue(len(args) == 3,
"invalid API return value triplet: %s"%str(args))
168 assert self.
last_code == -1,
"%s (return msg was %s)"%(msg, self.last_msg)
170 assert self.
last_code == -1,
"Call should have returned error -1 code: %s"%self.last_msg
174 validates a URI as being http(s)
177 import urllib.parse
as urlparse
180 parsed = urlparse.urlparse(uri)
181 self.assertTrue(parsed[0]
in [
'http',
'https'],
'protocol [%s] is [%s] invalid'%(parsed[0], uri))
182 self.assertTrue(parsed[1],
'host missing [%s]'%uri)
183 self.assertTrue(parsed.port,
'port missing/invalid [%s]'%uri)
187 validate node.getPid(caller_id)
191 self.assertTrue(pid > 0)
201 make sure rosout is in publication and connection list
205 self.assertTrue(
'/rosout' in pubs_d,
"node is not publishing to rosout")
206 self.assertEqual(
'rosgraph_msgs/Log', pubs_d[
'/rosout'],
"/rosout is not correct type")
210 test that node obeys simtime (/Clock) contract
212 http://wiki.ros.org/Clock
215 use_sim_time = self.
master.getParam(
'/use_sim_time')
222 self.assertTrue(
'/clock' in subs_d,
"node is not subscribing to clock")
223 self.assertEqual(
'rosgraph_msgs/Clock', subs_d[
'/clock'],
"/clock is not correct type")
225 self.assertFalse(
'/clock' in subs_d,
"node is subscribed to /clock even though /use_sim_time is false")
229 validate node.getPublications(caller_id)
235 pubs_dict = pubs.as_dict()
237 if '/rosout' in pubs_dict:
238 del pubs_dict[
'/rosout']
253 validate node.getSubscriptions(caller_id)
260 subs_dict = subs.as_dict()
276 good_key = rosgraph.names.ns_join(self.
ns,
'good_key')
277 bad_key = rosgraph.names.ns_join(self.
ns,
'bad_key')
304 Future: validate node.getUri(caller_id). It would be nice to
305 make this official API as it provides some debugging info.
322 validate node.getMasterUri(caller_id)
330 import urllib.parse
as urlparse
333 master_env = rosgraph.get_master_uri()
334 if not master_env.endswith(
'/'):
335 master_env = master_env +
'/'
336 self.assertEqual(urlparse.urlparse(master_env), urlparse.urlparse(uri))
350 validate node.publisherUpdate(caller_id, topic, uris)
353 probe_topic = rosgraph.names.ns_join(self.
ns,
'probe_topic')
354 fake_topic = rosgraph.names.ns_join(self.
ns,
'fake_topic')
359 [
'http://localhost:1234',
'http://localhost:5678']))
364 [
'http://unroutablefakeservice:1234']))
391 self.
apiError(node.publisherUpdate())
404 self.assertTrue(protocol_params,
"no protocol params returned")
405 self.assertTrue(type(protocol_params) == list,
"protocol params must be a list: %s"%protocol_params)
406 self.assertEqual(3, len(protocol_params),
"TCPROS params should have length 3: %s"%protocol_params)
407 self.assertEqual(protocol_params[0], TCPROS)
409 self.assertEqual(protocol_params[0], TCPROS)
413 protocols = [[TCPROS]]
415 publications = node.getPublications(self.
caller_id)
418 probe_topic = topics[0]
if topics
else None
419 fake_topic = rosgraph.names.ns_join(self.
ns,
'fake_topic')
422 protocols = [[TCPROS]]
425 protocols = [[
'FakeTransport', 1234, 5678], [TCPROS], [
'AnotherFakeTransport']]
433 self.
apiError(node.requestTopic(self.
caller_id, probe_topic, protocols,
'extra stuff'))
489 pubs, subs, srvs = self.
master.getSystemState()
490 pub_topics = [t
for t, _
in pubs]
491 sub_topics = [t
for t, _
in subs]
495 self.assertTrue(t
in pub_topics,
"node did not register publication %s on master"%(t))
497 self.assertTrue(t
in sub_topics,
"node did not register subscription %s on master"%(t))
500 for topic, node_list
in pubs:
502 self.assertTrue(node_name
in node_list,
"%s not in %s"%(self.
node_api, node_list))
503 for topic, node_list
in subs:
505 self.assertTrue(node_name
in node_list,
"%s not in %s"%(self.
node_api, node_list))
506 for service, srv_list
in srvs:
510 if __name__ ==
'__main__':
511 rosunit.unitrun(
'test_rosmaster', sys.argv[0], TestSlaveApi)