test_slave_api.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2008, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 
34 # This is a rewrite of the old node API tests, which focus too much on
35 # a completed node API and don't facilitate easy bring up of a new
36 # client library.
37 
38 import os
39 import sys
40 import string
41 import time
42 import unittest
43 try:
44  from xmlrpc.client import Fault, ServerProxy
45 except ImportError:
46  from xmlrpclib import Fault, ServerProxy
47 
48 import rosunit
49 import rosgraph
50 
51 TCPROS = 'TCPROS'
52 
53 CALLER_ID = '/test_harness'
54 TEST_NODE_NAME = '/test_node' #default
55 
56 class TopicDescription(object):
57  def __init__(self, topic_name, topic_type):
58  self.topic_name = topic_name
59  self.topic_type = topic_type
60 
61  #validate topic
62  if not rosgraph.names.is_legal_name(topic_name):
63  raise ValueError('topic name: %s'%(topic_name))
64 
65  # validate type
66  p, t = topic_type.split('/')
67 
68 class TopicDescriptionList(object):
69 
70  def __init__(self, xmlrpcvalue):
71  # [ [topic1, topicType1]...[topicN, topicTypeN]]]
72  if not type(xmlrpcvalue) == list:
73  raise ValueError("publications must be a list")
74  self.topics = []
75  for n, t in xmlrpcvalue:
76  self.topics.append(TopicDescription(n, t))
77 
78  def as_dict(self):
79  d = {}
80  for t in self.topics:
81  d[t.topic_name] = t.topic_type
82  return d
83 
84 class TestSlaveApi(unittest.TestCase):
85 
86  def __init__(self, *args, **kwds):
87  super(TestSlaveApi, self).__init__(*args)
88 
89  self.ns = os.environ.get(rosgraph.ROS_NAMESPACE, rosgraph.names.GLOBALNS)
90 
91  # load in name of test node
92  self.test_node = 'test_node' #default
95 
96  for arg in sys.argv:
97  if arg.startswith("--node="):
98  self.test_node = arg[len("--node="):]
99  if arg.startswith("--profile="):
100  self.test_node_profile = arg[len("--profile="):]
102 
103  # resolve
104  self.test_node = rosgraph.names.ns_join(self.ns, self.test_node)
105 
106  def load_profile(self, filename):
107  import yaml
108  with open(filename) as f:
109  d = yaml.load(f)
110  self.required_pubs = d.get('pubs', {})
111  self.required_subs = d.get('subs', {})
112 
113  def setUp(self):
114  self.caller_id = CALLER_ID
115  # retrieve handle on node
116  # give ourselves 10 seconds for node to appear
117  timeout_t = 10.0 + time.time()
118  self.node_api = None
119  self.master = rosgraph.Master(self.caller_id)
120  while time.time() < timeout_t and not self.node_api:
121  try:
122  self.node_api = self.master.lookupNode(self.test_node)
123  except:
124  time.sleep(0.1)
125  if not self.node_api:
126  self.fail("master did not return XML-RPC API for [%s, %s]"%(self.caller_id, self.test_node))
127  print "[%s] API = %s"%(self.test_node, self.node_api)
128  self.assert_(self.node_api.startswith('http'))
129  self.node = ServerProxy(self.node_api)
130 
131  # hack: sleep for a couple seconds just in case the node is
132  # still registering with the master.
133  time.sleep(2.)
134 
135  def apiSuccess(self, args):
136  """
137  unit test assertion that fails if status code is not 1 and otherwise returns the value parameter.
138  @param args: returnv value from ROS API call
139  @type args: [int, str, val]
140  @return: value parameter from args (arg[2] for master/slave API)
141  """
142  self.assert_(len(args) == 3, "invalid API return value triplet: %s"%str(args))
143  self.last_code, self.last_msg, self.last_val = args
144  assert self.last_code == 1, "status code is not 1: %s"%self.last_msg
145  return self.last_val
146 
147  def apiFail(self, args):
148  """
149  unit test assertions that fails if status code is not 0 and otherwise returns true.
150  @param args: returnv value from ROS API call
151  @type args: [int, str, val]
152  @return: True if status code is 0
153  """
154  self.assert_(len(args) == 3, "invalid API return value triplet: %s"%str(args))
155  self.last_code, self.last_msg, self.last_val = args
156  assert self.last_code == 0, "Call should have failed with status code 0: %s"%self.last_msg
157 
158  def apiError(self, args, msg=None):
159  """
160  unit test assertion that fails if status code is not -1 and otherwise returns true.
161  @param args: returnv value from ROS API call
162  @type args: [int, str, val]
163  @return: True if status code is -1
164  """
165  self.assert_(len(args) == 3, "invalid API return value triplet: %s"%str(args))
166  self.last_code, self.last_msg, self.last_val = args
167  if msg:
168  assert self.last_code == -1, "%s (return msg was %s)"%(msg, self.last_msg)
169  else:
170  assert self.last_code == -1, "Call should have returned error -1 code: %s"%self.last_msg
171 
172  def check_uri(self, uri):
173  """
174  validates a URI as being http(s)
175  """
176  import urlparse
177  parsed = urlparse.urlparse(uri)
178  self.assert_(parsed[0] in ['http', 'https'], 'protocol [%s] is [%s] invalid'%(parsed[0], uri))
179  self.assert_(parsed[1], 'host missing [%s]'%uri)
180  self.assert_(parsed.port, 'port missing/invalid [%s]'%uri)
181 
182  def test_getPid(self):
183  """
184  validate node.getPid(caller_id)
185  """
186  # test success
187  pid = self.apiSuccess(self.node.getPid(self.caller_id))
188  self.assert_(pid > 0)
189 
190  # test with bad arity: accept error or fault
191  try:
192  self.apiError(self.node.getPid())
193  except Fault:
194  pass
195 
196  def test_rosout(self):
197  """
198  make sure rosout is in publication and connection list
199  """
200  val = self.apiSuccess(self.node.getPublications(self.caller_id))
201  pubs_d = TopicDescriptionList(val).as_dict()
202  self.assertTrue('/rosout' in pubs_d, "node is not publishing to rosout")
203  self.assertEquals('rosgraph_msgs/Log', pubs_d['/rosout'], "/rosout is not correct type")
204 
205  def test_simtime(self):
206  """
207  test that node obeys simtime (/Clock) contract
208 
209  http://www.ros.org/wiki/Clock
210  """
211  try:
212  use_sim_time = self.master.getParam('/use_sim_time')
213  except:
214  use_sim_time = False
215 
216  val = self.apiSuccess(self.node.getSubscriptions(self.caller_id))
217  subs_d = TopicDescriptionList(val).as_dict()
218  if use_sim_time:
219  self.assertTrue('/clock' in subs_d, "node is not subscribing to clock")
220  self.assertEquals('rosgraph_msgs/Clock', subs_d['/clock'], "/clock is not correct type")
221  else:
222  self.assertFalse('/clock' in subs_d, "node is subscribed to /clock even though /use_sim_time is false")
223 
225  """
226  validate node.getPublications(caller_id)
227  """
228  # test success
229  pubs_value = self.apiSuccess(self.node.getPublications(self.caller_id))
230  pubs = TopicDescriptionList(pubs_value)
231 
232  pubs_dict = pubs.as_dict()
233  # this is separately tested by test_rosout
234  if '/rosout' in pubs_dict:
235  del pubs_dict['/rosout']
236  self.assertEquals(self.required_pubs, pubs_dict)
237 
238  # test with bad arity: accept error or fault
239  try:
240  self.apiError(self.node.getPublications())
241  except Fault:
242  pass
243  try:
244  self.apiError(self.node.getPublications(self.caller_id, 'something extra'))
245  except Fault:
246  pass
247 
249  """
250  validate node.getSubscriptions(caller_id)
251  """
252 
253  # test success
254  value = self.apiSuccess(self.node.getSubscriptions(self.caller_id))
255  subs = TopicDescriptionList(value)
256 
257  subs_dict = subs.as_dict()
258  self.assertEquals(self.required_subs, subs_dict)
259 
260  # test with bad arity: accept error or fault
261  try:
262  self.apiError(self.node.getSubscriptions())
263  except Fault:
264  pass
265  try:
266  self.apiError(self.node.getSubscriptions(self.caller_id, 'something extra'))
267  except Fault:
268  pass
269 
270  ## validate node.paramUpdate(caller_id, key, value)
271  def test_paramUpdate(self):
272  node = self.node
273  good_key = rosgraph.names.ns_join(self.ns, 'good_key')
274  bad_key = rosgraph.names.ns_join(self.ns, 'bad_key')
275 
276  # node is not subscribed to good_key (yet)
277  self.apiError(node.paramUpdate(self.caller_id, good_key, 'good_value'))
278 
279  # test bad key
280  self.apiError(node.paramUpdate(self.caller_id, '', 'bad'))
281  self.apiError(node.paramUpdate(self.caller_id, 'no_namespace', 'bad'))
282 
283  # test with bad arity: accept error or fault
284  try:
285  self.apiError(node.paramUpdate(self.caller_id, bad_key))
286  except Fault:
287  pass
288 
289  try:
290  self.apiError(node.paramUpdate(self.caller_id))
291  except Fault:
292  pass
293 
294  # we can't actually test success cases without forcing node to subscribe
295  #self.apiSuccess(node.paramUpdate(self.caller_id, good_key, 1))
296  #self.apiSuccess(node.paramUpdate(self.caller_id, good_key, True))
297  #self.apiSuccess(node.paramUpdate(self.caller_id, good_key, 10.0))
298 
299  def xtest_getUri(self):
300  """
301  Future: validate node.getUri(caller_id). It would be nice to
302  make this official API as it provides some debugging info.
303  """
304  # test success
305  self.check_uri(self.apiSuccess(self.node.getUri(self.caller_id)))
306 
307  # test bad arity
308  try:
309  self.apiError(self.node.getUri(self.caller_id, 'bad'))
310  except Fault:
311  pass
312  try:
313  self.apiError(self.node.getUri())
314  except Fault:
315  pass
316 
317  def test_getMasterUri(self):
318  """
319  validate node.getMasterUri(caller_id)
320  """
321  # test success
322  uri = self.apiSuccess(self.node.getMasterUri(self.caller_id))
323  self.check_uri(uri)
324 
325  # check against env, canonicalize for comparison
326  import urlparse
327  master_env = rosgraph.get_master_uri()
328  if not master_env.endswith('/'):
329  master_env = master_env + '/'
330  self.assertEquals(urlparse.urlparse(master_env), urlparse.urlparse(uri))
331 
332  # test bad arity
333  try:
334  self.apiError(self.node.getMasterUri(self.caller_id, 'bad'))
335  except Fault:
336  pass
337  try:
338  self.apiError(self.node.getMasterUri())
339  except Fault:
340  pass
341 
343  """
344  validate node.publisherUpdate(caller_id, topic, uris)
345  """
346  node = self.node
347  probe_topic = rosgraph.names.ns_join(self.ns, 'probe_topic')
348  fake_topic = rosgraph.names.ns_join(self.ns, 'fake_topic')
349 
350  # test success
351  # still success even if not actually interested in topic
352  self.apiSuccess(node.publisherUpdate(self.caller_id, fake_topic,
353  ['http://localhost:1234', 'http://localhost:5678']))
354  self.apiSuccess(node.publisherUpdate(self.caller_id, fake_topic,
355  []))
356  # try it with it the /probe_topic, which will exercise some error branches in the client
357  self.apiSuccess(node.publisherUpdate(self.caller_id, probe_topic,
358  ['http://unroutablefakeservice:1234']))
359  # give it some time to make sure it's attempted contact
360  time.sleep(1.0)
361  # check that it's still there
362  self.apiSuccess(node.publisherUpdate(self.caller_id, probe_topic,
363  []))
364 
365  # test bad args
366  try:
367  self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', 'bad'))
368  except Fault:
369  pass
370  try:
371  self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', 2))
372  except Fault:
373  pass
374  try:
375  self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', False))
376  except Fault:
377  pass
378  try:
379  self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', ['bad']))
380  except Fault:
381  pass
382 
383  # test bad arity
384  try:
385  self.apiError(node.publisherUpdate())
386  except Fault:
387  pass
388  try:
389  self.apiError(node.getBusStats(self.caller_id, 'bad'))
390  except Fault:
391  pass
392  try:
393  self.apiError(node.getBusStats())
394  except Fault:
395  pass
396 
397  def check_TCPROS(self, protocol_params):
398  self.assert_(protocol_params, "no protocol params returned")
399  self.assert_(type(protocol_params) == list, "protocol params must be a list: %s"%protocol_params)
400  self.assertEquals(3, len(protocol_params), "TCPROS params should have length 3: %s"%protocol_params)
401  self.assertEquals(protocol_params[0], TCPROS)
402  # expect ['TCPROS', 1.2.3.4, 1234]
403  self.assertEquals(protocol_params[0], TCPROS)
404 
405  def testRequestTopic(self):
406  node = self.node
407  protocols = [[TCPROS]]
408 
409  publications = node.getPublications(self.caller_id)
410 
411  topics = self.required_pubs.keys()
412  probe_topic = topics[0] if topics else None
413  fake_topic = rosgraph.names.ns_join(self.ns, 'fake_topic')
414 
415  # currently only support TCPROS as we require all clients to support this
416  protocols = [[TCPROS]]
417  for topic in topics:
418  self.check_TCPROS(self.apiSuccess(node.requestTopic(self.caller_id, topic, protocols)))
419  protocols = [['FakeTransport', 1234, 5678], [TCPROS], ['AnotherFakeTransport']]
420  # try each one more time, this time with more protocol choices
421  for topic in topics:
422  self.check_TCPROS(self.apiSuccess(node.requestTopic(self.caller_id, topic, protocols)))
423 
424  # test bad arity
425  if probe_topic:
426  try:
427  self.apiError(node.requestTopic(self.caller_id, probe_topic, protocols, 'extra stuff'))
428  except Fault:
429  pass
430  try:
431  self.apiError(node.requestTopic(self.caller_id, probe_topic))
432  except Fault:
433  pass
434  try:
435  self.apiError(node.requestTopic(self.caller_id))
436  except Fault:
437  pass
438  try:
439  self.apiError(node.requestTopic())
440  except Fault:
441  pass
442 
443  # test bad args
444  try:
445  self.apiError(node.requestTopic(self.caller_id, 1, protocols))
446  except Fault:
447  pass
448  try:
449  self.apiError(node.requestTopic(self.caller_id, '', protocols))
450  except Fault:
451  pass
452  try:
453  self.apiError(node.requestTopic(self.caller_id, fake_topic, protocols))
454  except Fault:
455  pass
456  try:
457  self.apiError(node.requestTopic(self.caller_id, probe_topic, 'fake-protocols'))
458  except Fault:
459  pass
460 
461 
462  def test_getBusInfo(self):
463  #TODO: finish
464  # there should be a connection to rosout
465 
466  # test bad arity
467  try:
468  self.apiError(self.node.getBusInfo(self.caller_id, 'bad'))
469  except Fault:
470  pass
471  try:
472  self.apiError(self.node.getBusInfo())
473  except Fault:
474  pass
475 
476 
477  ## test the state of the master based on expected node registration
479  # setUp() ensures the node has registered with the master
480 
481  # check actual URIs
482  node_name = self.test_node
483  pubs, subs, srvs = self.master.getSystemState()
484  pub_topics = [t for t, _ in pubs]
485  sub_topics = [t for t, _ in subs]
486 
487  # make sure all required topics are registered
488  for t in self.required_pubs:
489  self.assert_(t in pub_topics, "node did not register publication %s on master"%(t))
490  for t in self.required_subs:
491  self.assert_(t in sub_topics, "node did not register subscription %s on master"%(t))
492 
493  # check for node URI on master
494  for topic, node_list in pubs:
495  if topic in self.required_pubs:
496  self.assert_(node_name in node_list, "%s not in %s"%(self.node_api, node_list))
497  for topic, node_list in subs:
498  if topic in self.required_subs:
499  self.assert_(node_name in node_list, "%s not in %s"%(self.node_api, node_list))
500  for service, srv_list in srvs:
501  #TODO: no service tests yet
502  pass
503 
504 if __name__ == '__main__':
505  rosunit.unitrun('test_rosmaster', sys.argv[0], TestSlaveApi)
def __init__(self, args, kwds)
def __init__(self, topic_name, topic_type)
def load_profile(self, filename)
def __init__(self, xmlrpcvalue)
def check_TCPROS(self, protocol_params)
def test_registrations(self)
test the state of the master based on expected node registration
def test_paramUpdate(self)
validate node.paramUpdate(caller_id, key, value)
def apiError(self, args, msg=None)


test_rosmaster
Author(s): Ken Conley, Dirk Thomas
autogenerated on Mon Nov 2 2020 03:52:54