test_slave_api.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2008, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 
34 # This is a rewrite of the old node API tests, which focus too much on
35 # a completed node API and don't facilitate easy bring up of a new
36 # client library.
37 
38 import os
39 import sys
40 import string
41 import time
42 import unittest
43 try:
44  from xmlrpc.client import Fault, ServerProxy
45 except ImportError:
46  from xmlrpclib import Fault, ServerProxy
47 
48 import rosunit
49 import rosgraph
50 
51 TCPROS = 'TCPROS'
52 
53 CALLER_ID = '/test_harness'
54 TEST_NODE_NAME = '/test_node' #default
55 
56 class TopicDescription(object):
57  def __init__(self, topic_name, topic_type):
58  self.topic_name = topic_name
59  self.topic_type = topic_type
60 
61  #validate topic
62  if not rosgraph.names.is_legal_name(topic_name):
63  raise ValueError('topic name: %s'%(topic_name))
64 
65  # validate type
66  p, t = topic_type.split('/')
67 
68 class TopicDescriptionList(object):
69 
70  def __init__(self, xmlrpcvalue):
71  # [ [topic1, topicType1]...[topicN, topicTypeN]]]
72  if not type(xmlrpcvalue) == list:
73  raise ValueError("publications must be a list")
74  self.topics = []
75  for n, t in xmlrpcvalue:
76  self.topics.append(TopicDescription(n, t))
77 
78  def as_dict(self):
79  d = {}
80  for t in self.topics:
81  d[t.topic_name] = t.topic_type
82  return d
83 
84 class TestSlaveApi(unittest.TestCase):
85 
86  def __init__(self, *args, **kwds):
87  super(TestSlaveApi, self).__init__(*args)
88 
89  self.ns = os.environ.get(rosgraph.ROS_NAMESPACE, rosgraph.names.GLOBALNS)
90 
91  # load in name of test node
92  self.test_node = 'test_node' #default
95 
96  for arg in sys.argv:
97  if arg.startswith("--node="):
98  self.test_node = arg[len("--node="):]
99  if arg.startswith("--profile="):
100  self.test_node_profile = arg[len("--profile="):]
102 
103  # resolve
104  self.test_node = rosgraph.names.ns_join(self.ns, self.test_node)
105 
106  def load_profile(self, filename):
107  import yaml
108  with open(filename) as f:
109  d = yaml.safe_load(f)
110  self.required_pubs = d.get('pubs', {})
111  self.required_subs = d.get('subs', {})
112 
113  def setUp(self):
114  self.caller_id = CALLER_ID
115  # retrieve handle on node
116  # give ourselves 10 seconds for node to appear
117  timeout_t = 10.0 + time.time()
118  self.node_api = None
119  self.master = rosgraph.Master(self.caller_id)
120  while time.time() < timeout_t and not self.node_api:
121  try:
122  self.node_api = self.master.lookupNode(self.test_node)
123  except:
124  time.sleep(0.1)
125  if not self.node_api:
126  self.fail("master did not return XML-RPC API for [%s, %s]"%(self.caller_id, self.test_node))
127  print("[%s] API = %s"%(self.test_node, self.node_api))
128  self.assertTrue(self.node_api.startswith('http'))
129  self.node = ServerProxy(self.node_api)
130 
131  # hack: sleep for a couple seconds just in case the node is
132  # still registering with the master.
133  time.sleep(2.)
134 
135  def apiSuccess(self, args):
136  """
137  unit test assertion that fails if status code is not 1 and otherwise returns the value parameter.
138  @param args: returnv value from ROS API call
139  @type args: [int, str, val]
140  @return: value parameter from args (arg[2] for master/slave API)
141  """
142  self.assertTrue(len(args) == 3, "invalid API return value triplet: %s"%str(args))
143  self.last_code, self.last_msg, self.last_val = args
144  assert self.last_code == 1, "status code is not 1: %s"%self.last_msg
145  return self.last_val
146 
147  def apiFail(self, args):
148  """
149  unit test assertions that fails if status code is not 0 and otherwise returns true.
150  @param args: returnv value from ROS API call
151  @type args: [int, str, val]
152  @return: True if status code is 0
153  """
154  self.assertTrue(len(args) == 3, "invalid API return value triplet: %s"%str(args))
155  self.last_code, self.last_msg, self.last_val = args
156  assert self.last_code == 0, "Call should have failed with status code 0: %s"%self.last_msg
157 
158  def apiError(self, args, msg=None):
159  """
160  unit test assertion that fails if status code is not -1 and otherwise returns true.
161  @param args: returnv value from ROS API call
162  @type args: [int, str, val]
163  @return: True if status code is -1
164  """
165  self.assertTrue(len(args) == 3, "invalid API return value triplet: %s"%str(args))
166  self.last_code, self.last_msg, self.last_val = args
167  if msg:
168  assert self.last_code == -1, "%s (return msg was %s)"%(msg, self.last_msg)
169  else:
170  assert self.last_code == -1, "Call should have returned error -1 code: %s"%self.last_msg
171 
172  def check_uri(self, uri):
173  """
174  validates a URI as being http(s)
175  """
176  try:
177  import urllib.parse as urlparse
178  except ImportError:
179  import urlparse
180  parsed = urlparse.urlparse(uri)
181  self.assertTrue(parsed[0] in ['http', 'https'], 'protocol [%s] is [%s] invalid'%(parsed[0], uri))
182  self.assertTrue(parsed[1], 'host missing [%s]'%uri)
183  self.assertTrue(parsed.port, 'port missing/invalid [%s]'%uri)
184 
185  def test_getPid(self):
186  """
187  validate node.getPid(caller_id)
188  """
189  # test success
190  pid = self.apiSuccess(self.node.getPid(self.caller_id))
191  self.assertTrue(pid > 0)
192 
193  # test with bad arity: accept error or fault
194  try:
195  self.apiError(self.node.getPid())
196  except Fault:
197  pass
198 
199  def test_rosout(self):
200  """
201  make sure rosout is in publication and connection list
202  """
203  val = self.apiSuccess(self.node.getPublications(self.caller_id))
204  pubs_d = TopicDescriptionList(val).as_dict()
205  self.assertTrue('/rosout' in pubs_d, "node is not publishing to rosout")
206  self.assertEqual('rosgraph_msgs/Log', pubs_d['/rosout'], "/rosout is not correct type")
207 
208  def test_simtime(self):
209  """
210  test that node obeys simtime (/Clock) contract
211 
212  http://wiki.ros.org/Clock
213  """
214  try:
215  use_sim_time = self.master.getParam('/use_sim_time')
216  except:
217  use_sim_time = False
218 
219  val = self.apiSuccess(self.node.getSubscriptions(self.caller_id))
220  subs_d = TopicDescriptionList(val).as_dict()
221  if use_sim_time:
222  self.assertTrue('/clock' in subs_d, "node is not subscribing to clock")
223  self.assertEqual('rosgraph_msgs/Clock', subs_d['/clock'], "/clock is not correct type")
224  else:
225  self.assertFalse('/clock' in subs_d, "node is subscribed to /clock even though /use_sim_time is false")
226 
228  """
229  validate node.getPublications(caller_id)
230  """
231  # test success
232  pubs_value = self.apiSuccess(self.node.getPublications(self.caller_id))
233  pubs = TopicDescriptionList(pubs_value)
234 
235  pubs_dict = pubs.as_dict()
236  # this is separately tested by test_rosout
237  if '/rosout' in pubs_dict:
238  del pubs_dict['/rosout']
239  self.assertEqual(self.required_pubs, pubs_dict)
240 
241  # test with bad arity: accept error or fault
242  try:
243  self.apiError(self.node.getPublications())
244  except Fault:
245  pass
246  try:
247  self.apiError(self.node.getPublications(self.caller_id, 'something extra'))
248  except Fault:
249  pass
250 
252  """
253  validate node.getSubscriptions(caller_id)
254  """
255 
256  # test success
257  value = self.apiSuccess(self.node.getSubscriptions(self.caller_id))
258  subs = TopicDescriptionList(value)
259 
260  subs_dict = subs.as_dict()
261  self.assertEqual(self.required_subs, subs_dict)
262 
263  # test with bad arity: accept error or fault
264  try:
265  self.apiError(self.node.getSubscriptions())
266  except Fault:
267  pass
268  try:
269  self.apiError(self.node.getSubscriptions(self.caller_id, 'something extra'))
270  except Fault:
271  pass
272 
273 
274  def test_paramUpdate(self):
275  node = self.node
276  good_key = rosgraph.names.ns_join(self.ns, 'good_key')
277  bad_key = rosgraph.names.ns_join(self.ns, 'bad_key')
278 
279  # node is not subscribed to good_key (yet)
280  self.apiError(node.paramUpdate(self.caller_id, good_key, 'good_value'))
281 
282  # test bad key
283  self.apiError(node.paramUpdate(self.caller_id, '', 'bad'))
284  self.apiError(node.paramUpdate(self.caller_id, 'no_namespace', 'bad'))
285 
286  # test with bad arity: accept error or fault
287  try:
288  self.apiError(node.paramUpdate(self.caller_id, bad_key))
289  except Fault:
290  pass
291 
292  try:
293  self.apiError(node.paramUpdate(self.caller_id))
294  except Fault:
295  pass
296 
297  # we can't actually test success cases without forcing node to subscribe
298  #self.apiSuccess(node.paramUpdate(self.caller_id, good_key, 1))
299  #self.apiSuccess(node.paramUpdate(self.caller_id, good_key, True))
300  #self.apiSuccess(node.paramUpdate(self.caller_id, good_key, 10.0))
301 
302  def xtest_getUri(self):
303  """
304  Future: validate node.getUri(caller_id). It would be nice to
305  make this official API as it provides some debugging info.
306  """
307  # test success
308  self.check_uri(self.apiSuccess(self.node.getUri(self.caller_id)))
309 
310  # test bad arity
311  try:
312  self.apiError(self.node.getUri(self.caller_id, 'bad'))
313  except Fault:
314  pass
315  try:
316  self.apiError(self.node.getUri())
317  except Fault:
318  pass
319 
320  def test_getMasterUri(self):
321  """
322  validate node.getMasterUri(caller_id)
323  """
324  # test success
325  uri = self.apiSuccess(self.node.getMasterUri(self.caller_id))
326  self.check_uri(uri)
327 
328  # check against env, canonicalize for comparison
329  try:
330  import urllib.parse as urlparse
331  except ImportError:
332  import urlparse
333  master_env = rosgraph.get_master_uri()
334  if not master_env.endswith('/'):
335  master_env = master_env + '/'
336  self.assertEqual(urlparse.urlparse(master_env), urlparse.urlparse(uri))
337 
338  # test bad arity
339  try:
340  self.apiError(self.node.getMasterUri(self.caller_id, 'bad'))
341  except Fault:
342  pass
343  try:
344  self.apiError(self.node.getMasterUri())
345  except Fault:
346  pass
347 
349  """
350  validate node.publisherUpdate(caller_id, topic, uris)
351  """
352  node = self.node
353  probe_topic = rosgraph.names.ns_join(self.ns, 'probe_topic')
354  fake_topic = rosgraph.names.ns_join(self.ns, 'fake_topic')
355 
356  # test success
357  # still success even if not actually interested in topic
358  self.apiSuccess(node.publisherUpdate(self.caller_id, fake_topic,
359  ['http://localhost:1234', 'http://localhost:5678']))
360  self.apiSuccess(node.publisherUpdate(self.caller_id, fake_topic,
361  []))
362  # try it with it the /probe_topic, which will exercise some error branches in the client
363  self.apiSuccess(node.publisherUpdate(self.caller_id, probe_topic,
364  ['http://unroutablefakeservice:1234']))
365  # give it some time to make sure it's attempted contact
366  time.sleep(1.0)
367  # check that it's still there
368  self.apiSuccess(node.publisherUpdate(self.caller_id, probe_topic,
369  []))
370 
371  # test bad args
372  try:
373  self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', 'bad'))
374  except Fault:
375  pass
376  try:
377  self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', 2))
378  except Fault:
379  pass
380  try:
381  self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', False))
382  except Fault:
383  pass
384  try:
385  self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', ['bad']))
386  except Fault:
387  pass
388 
389  # test bad arity
390  try:
391  self.apiError(node.publisherUpdate())
392  except Fault:
393  pass
394  try:
395  self.apiError(node.getBusStats(self.caller_id, 'bad'))
396  except Fault:
397  pass
398  try:
399  self.apiError(node.getBusStats())
400  except Fault:
401  pass
402 
403  def check_TCPROS(self, protocol_params):
404  self.assertTrue(protocol_params, "no protocol params returned")
405  self.assertTrue(type(protocol_params) == list, "protocol params must be a list: %s"%protocol_params)
406  self.assertEqual(3, len(protocol_params), "TCPROS params should have length 3: %s"%protocol_params)
407  self.assertEqual(protocol_params[0], TCPROS)
408  # expect ['TCPROS', 1.2.3.4, 1234]
409  self.assertEqual(protocol_params[0], TCPROS)
410 
411  def testRequestTopic(self):
412  node = self.node
413  protocols = [[TCPROS]]
414 
415  publications = node.getPublications(self.caller_id)
416 
417  topics = self.required_pubs.keys()
418  probe_topic = topics[0] if topics else None
419  fake_topic = rosgraph.names.ns_join(self.ns, 'fake_topic')
420 
421  # currently only support TCPROS as we require all clients to support this
422  protocols = [[TCPROS]]
423  for topic in topics:
424  self.check_TCPROS(self.apiSuccess(node.requestTopic(self.caller_id, topic, protocols)))
425  protocols = [['FakeTransport', 1234, 5678], [TCPROS], ['AnotherFakeTransport']]
426  # try each one more time, this time with more protocol choices
427  for topic in topics:
428  self.check_TCPROS(self.apiSuccess(node.requestTopic(self.caller_id, topic, protocols)))
429 
430  # test bad arity
431  if probe_topic:
432  try:
433  self.apiError(node.requestTopic(self.caller_id, probe_topic, protocols, 'extra stuff'))
434  except Fault:
435  pass
436  try:
437  self.apiError(node.requestTopic(self.caller_id, probe_topic))
438  except Fault:
439  pass
440  try:
441  self.apiError(node.requestTopic(self.caller_id))
442  except Fault:
443  pass
444  try:
445  self.apiError(node.requestTopic())
446  except Fault:
447  pass
448 
449  # test bad args
450  try:
451  self.apiError(node.requestTopic(self.caller_id, 1, protocols))
452  except Fault:
453  pass
454  try:
455  self.apiError(node.requestTopic(self.caller_id, '', protocols))
456  except Fault:
457  pass
458  try:
459  self.apiError(node.requestTopic(self.caller_id, fake_topic, protocols))
460  except Fault:
461  pass
462  try:
463  self.apiError(node.requestTopic(self.caller_id, probe_topic, 'fake-protocols'))
464  except Fault:
465  pass
466 
467 
468  def test_getBusInfo(self):
469  #TODO: finish
470  # there should be a connection to rosout
471 
472  # test bad arity
473  try:
474  self.apiError(self.node.getBusInfo(self.caller_id, 'bad'))
475  except Fault:
476  pass
477  try:
478  self.apiError(self.node.getBusInfo())
479  except Fault:
480  pass
481 
482 
483 
485  # setUp() ensures the node has registered with the master
486 
487  # check actual URIs
488  node_name = self.test_node
489  pubs, subs, srvs = self.master.getSystemState()
490  pub_topics = [t for t, _ in pubs]
491  sub_topics = [t for t, _ in subs]
492 
493  # make sure all required topics are registered
494  for t in self.required_pubs:
495  self.assertTrue(t in pub_topics, "node did not register publication %s on master"%(t))
496  for t in self.required_subs:
497  self.assertTrue(t in sub_topics, "node did not register subscription %s on master"%(t))
498 
499  # check for node URI on master
500  for topic, node_list in pubs:
501  if topic in self.required_pubs:
502  self.assertTrue(node_name in node_list, "%s not in %s"%(self.node_api, node_list))
503  for topic, node_list in subs:
504  if topic in self.required_subs:
505  self.assertTrue(node_name in node_list, "%s not in %s"%(self.node_api, node_list))
506  for service, srv_list in srvs:
507  #TODO: no service tests yet
508  pass
509 
510 if __name__ == '__main__':
511  rosunit.unitrun('test_rosmaster', sys.argv[0], TestSlaveApi)
test_slave_api.TestSlaveApi.testRequestTopic
def testRequestTopic(self)
Definition: test_slave_api.py:411
test_slave_api.TestSlaveApi.test_getSubscriptions
def test_getSubscriptions(self)
Definition: test_slave_api.py:251
test_slave_api.TestSlaveApi.last_val
last_val
Definition: test_slave_api.py:143
test_slave_api.TestSlaveApi.xtest_getUri
def xtest_getUri(self)
Definition: test_slave_api.py:302
test_slave_api.TestSlaveApi.test_rosout
def test_rosout(self)
Definition: test_slave_api.py:199
test_slave_api.TestSlaveApi.apiError
def apiError(self, args, msg=None)
Definition: test_slave_api.py:158
test_slave_api.TestSlaveApi.required_subs
required_subs
Definition: test_slave_api.py:94
test_slave_api.TestSlaveApi.apiSuccess
def apiSuccess(self, args)
Definition: test_slave_api.py:135
test_slave_api.TopicDescriptionList.topics
topics
Definition: test_slave_api.py:74
test_slave_api.TopicDescription.__init__
def __init__(self, topic_name, topic_type)
Definition: test_slave_api.py:57
test_slave_api.TopicDescription.topic_type
topic_type
Definition: test_slave_api.py:59
test_slave_api.TestSlaveApi.apiFail
def apiFail(self, args)
Definition: test_slave_api.py:147
test_slave_api.TestSlaveApi.test_simtime
def test_simtime(self)
Definition: test_slave_api.py:208
test_slave_api.TestSlaveApi.test_publisherUpdate
def test_publisherUpdate(self)
Definition: test_slave_api.py:348
test_slave_api.TestSlaveApi.node_api
node_api
Definition: test_slave_api.py:118
test_slave_api.TestSlaveApi.test_getMasterUri
def test_getMasterUri(self)
Definition: test_slave_api.py:320
test_slave_api.TestSlaveApi.test_paramUpdate
def test_paramUpdate(self)
validate node.paramUpdate(caller_id, key, value)
Definition: test_slave_api.py:274
test_slave_api.TestSlaveApi.setUp
def setUp(self)
Definition: test_slave_api.py:113
test_slave_api.TopicDescriptionList.as_dict
def as_dict(self)
Definition: test_slave_api.py:78
test_slave_api.TestSlaveApi.ns
ns
Definition: test_slave_api.py:89
test_slave_api.TestSlaveApi.check_TCPROS
def check_TCPROS(self, protocol_params)
Definition: test_slave_api.py:403
test_slave_api.TestSlaveApi
Definition: test_slave_api.py:84
test_slave_api.TestSlaveApi.test_getPid
def test_getPid(self)
Definition: test_slave_api.py:185
test_slave_api.TestSlaveApi.node
node
Definition: test_slave_api.py:129
test_slave_api.TestSlaveApi.last_code
last_code
Definition: test_slave_api.py:144
test_slave_api.TopicDescription.topic_name
topic_name
Definition: test_slave_api.py:58
test_slave_api.TestSlaveApi.test_getBusInfo
def test_getBusInfo(self)
Definition: test_slave_api.py:468
test_slave_api.TestSlaveApi.required_pubs
required_pubs
Definition: test_slave_api.py:93
test_slave_api.TopicDescriptionList.__init__
def __init__(self, xmlrpcvalue)
Definition: test_slave_api.py:70
test_slave_api.TestSlaveApi.test_node
test_node
Definition: test_slave_api.py:92
test_slave_api.TestSlaveApi.check_uri
def check_uri(self, uri)
Definition: test_slave_api.py:172
test_slave_api.TestSlaveApi.test_getPublications
def test_getPublications(self)
Definition: test_slave_api.py:227
test_slave_api.TestSlaveApi.__init__
def __init__(self, *args, **kwds)
Definition: test_slave_api.py:86
test_slave_api.TestSlaveApi.test_registrations
def test_registrations(self)
test the state of the master based on expected node registration
Definition: test_slave_api.py:484
test_slave_api.TestSlaveApi.load_profile
def load_profile(self, filename)
Definition: test_slave_api.py:106
test_slave_api.TopicDescriptionList
Definition: test_slave_api.py:68
test_slave_api.TestSlaveApi.caller_id
caller_id
Definition: test_slave_api.py:114
test_slave_api.TestSlaveApi.test_node_profile
test_node_profile
Definition: test_slave_api.py:100
test_slave_api.TestSlaveApi.master
master
Definition: test_slave_api.py:119
test_slave_api.TopicDescription
Definition: test_slave_api.py:56


test_rosmaster
Author(s): Ken Conley, Dirk Thomas , Jacob Perron
autogenerated on Tue May 20 2025 03:00:35