node.py
Go to the documentation of this file.
1 # Software License Agreement (BSD License)
2 #
3 # Copyright (c) 2008, Willow Garage, Inc.
4 # All rights reserved.
5 #
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions
8 # are met:
9 #
10 # * Redistributions of source code must retain the above copyright
11 # notice, this list of conditions and the following disclaimer.
12 # * Redistributions in binary form must reproduce the above
13 # copyright notice, this list of conditions and the following
14 # disclaimer in the documentation and/or other materials provided
15 # with the distribution.
16 # * Neither the name of Willow Garage, Inc. nor the names of its
17 # contributors may be used to endorse or promote products derived
18 # from this software without specific prior written permission.
19 #
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
24 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
25 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
26 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
28 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
30 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31 # POSSIBILITY OF SUCH DAMAGE.
32 #
33 # Revision $Id: testSlave.py 1100 2008-05-29 20:23:54Z sfkwc $
34 
35 import os
36 import sys
37 import string
38 import time
39 try:
40  from xmlrpc.client import ServerProxy
41 except ImportError:
42  from xmlrpclib import ServerProxy
43 
44 import rospy
45 import rosgraph
46 
47 from rosclient import *
48 
49 NODE_INTEGRATION_NAME = "node_integration_test"
50 
51 
52 _required_subscriptions = 'test_string_in', 'test_primitives_in', 'test_arrays_in', 'test_header_in', 'probe_topic'
53 
54 # only publishers determine topic type, so we test against their declared spec
55 _required_publications_map = {
56  'test_string_out': 'test_rosmaster/TestString',
57  'test_primitives_out': 'test_rosmaster/TestPrimitives',
58  'test_arrays_out': 'test_rosmaster/TestArrays',
59  'test_header_out': 'test_rosmaster/TestHeader',
60  }
61 _required_publications = _required_publications_map.keys()
62 
63 _TCPROS = 'TCPROS'
64 
65 # NOTE: probe_topic is a unpublished topic that merely exists to test
66 # APIs that talk about subscriptions (e.g. publisherUpdate)
67 
68 _name = None
69 ## set_node_name() must be called prior to the unit test so that the test harness knows its
70 ## ROS name.
71 def set_node_name(name):
72  global _name
73  _name = name
74 
75 # Have to try as hard as possible to not use rospy code in testing rospy code, so this is
76 # a reimplementation of the caller ID spec so that NodeApiTestCase knows its name
77 ## reimplementation of caller ID spec separately from rospy
79  if _name is None:
80  raise Exception("set_node_name has not been called yet")
81  ros_ns = os.environ.get(rosgraph.ROS_NAMESPACE, rosgraph.names.GLOBALNS)
82  return rosgraph.names.ns_join(ros_ns, _name)
83 
84 ## Parent of node API and integration test cases. Performs common state setup
86 
87  def __init__(self, *args):
88  super(_NodeTestCase, self).__init__(*args)
89 
90  self.ns = os.environ.get(rosgraph.ROS_NAMESPACE, rosgraph.names.GLOBALNS)
92 
93  # load in name of test node
94  self.test_node = 'test_node' #default
95  for arg in sys.argv:
96  if arg.startswith("--node="):
97  self.test_node = arg[len("--node="):]
98  # resolve
99  self.test_node = rosgraph.names.ns_join(self.ns, self.test_node)
100 
101 
102  def setUp(self):
103  super(_NodeTestCase, self).setUp()
104  # retrieve handle on node
105  # give ourselves five seconds for node to appear
106  import time
107  timeout_t = 5.0 + time.time()
108  self.node_api = None
109  while time.time() < timeout_t and not self.node_api:
110  code, msg, node_api = self.master.lookupNode(self.caller_id, self.test_node)
111  if code == 1:
112  self.node_api = node_api
113  if not self.node_api:
114  self.fail("master did not return XML-RPC API for [%s, %s]"%(self.caller_id, self.test_node))
115  print("[%s] API = %s" %(self.test_node, self.node_api))
116  self.assert_(self.node_api.startswith('http'))
117  self.node = ServerProxy(self.node_api)
118 
119  ## validates a URI as being http(s)
120  def _checkUri(self, uri):
121  try:
122  from urllib.parse import urlparse
123  except ImportError:
124  from urlparse import urlparse
125  parsed = urlparse(uri)
126  self.assert_(parsed[0] in ['http', 'https'], 'protocol [%s] in [%s] invalid'%(parsed[0], uri))
127  self.assert_(parsed[1], 'host missing [%s]'%uri)
128  if not sys.version.startswith('2.4'): #check not available on py24
129  self.assert_(parsed.port, 'port missing/invalid [%s]'%uri)
130 
131  ## dynamically create the expected topic->type map based on the current name resolution context
133  new_map = {}
134  for t in _required_publications_map.keys():
135  new_map[rospy.resolve_name(t)] = _required_publications_map[t]
136  return new_map
137 
138 ## Expects a single test node to be running with name 'test_node' and subscribed to 'test_string'
140 
141  ## validate node.getPid(caller_id)
142  def testGetPid(self):
143  # test with bad arity
144  self.apiError(self.node.getPid())
145  # test success
146  pid = self.apiSuccess(self.node.getPid(self.caller_id))
147  self.assert_(pid > 0)
148 
149  ## subroutine for testGetSubscriptions/testGetPublications
150  def _checkTopics(self, required, actual):
151  actual = [t for t, _ in actual]
152  missing = set(required) - set(actual)
153  self.failIf(len(missing), 'missing required topics: %s'%(','.join(missing)))
154 
155  ## validate node.getPublications(caller_id)
157  # test with bad arity
158  self.apiError(self.node.getPublications())
159  self.apiError(self.node.getPublications(self.caller_id, 'something extra'))
160 
161  # test success
162  self._checkTopics([rospy.resolve_name(t) for t in _required_publications],
163  self.apiSuccess(self.node.getPublications(self.caller_id)))
164  ## validate node.getSubscriptions(caller_id)
166  # test with bad arity
167  self.apiError(self.node.getSubscriptions())
168  self.apiError(self.node.getSubscriptions(self.caller_id, 'something extra'))
169  # test success
170  self._checkTopics([rospy.resolve_name(t) for t in _required_subscriptions],
171  self.apiSuccess(self.node.getSubscriptions(self.caller_id)))
172 
173  ## validate node.paramUpdate(caller_id, key, value)
174  def testParamUpdate(self):
175  node = self.node
176  good_key = rosgraph.names.ns_join(self.ns, 'good_key')
177  bad_key = rosgraph.names.ns_join(self.ns, 'bad_key')
178 
179  # test bad key
180  self.apiError(node.paramUpdate(self.caller_id, '', 'bad'))
181  self.apiError(node.paramUpdate(self.caller_id, 'no_namespace', 'bad'))
182  # test with bad arity
183  self.apiError(node.paramUpdate(self.caller_id, bad_key))
184  self.apiError(node.paramUpdate(self.caller_id))
185 
186  # node is not subscribed to good_key (yet)
187  self.apiError(node.paramUpdate(self.caller_id, good_key, 'good_value'))
188 
189  # we can't actually test success cases without forcing node to subscribe
190  #self.apiSuccess(node.paramUpdate(self.caller_id, good_key, 1))
191  #self.apiSuccess(node.paramUpdate(self.caller_id, good_key, True))
192  #self.apiSuccess(node.paramUpdate(self.caller_id, good_key, 10.0))
193 
194  ## validate node.getUri(caller_id)
195  def testGetUri(self):
196  # test bad arity
197  self.apiError(self.node.getUri(self.caller_id, 'bad'))
198  self.apiError(self.node.getUri())
199  # test success
200  self._checkUri(self.apiSuccess(self.node.getUri(self.caller_id)))
201 
202  ## validate node.getName(caller_id)
203  def testGetName(self):
204  # test bad arity
205  self.apiError(self.node.getName(self.caller_id, 'bad'))
206  self.apiError(self.node.getName())
207  # test success
208  val = self.apiSuccess(self.node.getName(self.caller_id))
209  self.assert_(len(val), "empty name")
210 
211  ## validate node.getMasterUri(caller_id)
212  def testGetMasterUri(self):
213  # test bad arity
214  self.apiError(self.node.getMasterUri(self.caller_id, 'bad'))
215  self.apiError(self.node.getMasterUri())
216  # test success
217  uri = self.apiSuccess(self.node.getMasterUri(self.caller_id))
218  self._checkUri(uri)
219  self.assertEquals(rosgraph.get_master_uri(), uri)
220 
221  ## validate node.publisherUpdate(caller_id, topic, uris)
223  node = self.node
224  probe_topic = rosgraph.names.ns_join(self.ns, 'probe_topic')
225  fake_topic = rosgraph.names.ns_join(self.ns, 'fake_topic')
226  # test bad arity
227  self.apiError(node.getBusStats(self.caller_id, 'bad'))
228  self.apiError(node.getBusStats())
229  # test bad args
230  self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', 'bad'))
231  self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', 2))
232  self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', False))
233  self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', ['bad']))
234  self.apiError(node.publisherUpdate())
235 
236  # test success
237  # still success even if not actually interested in topic
238  self.apiSuccess(node.publisherUpdate(self.caller_id, fake_topic,
239  ['http://localhost:1234', 'http://localhost:5678']))
240  self.apiSuccess(node.publisherUpdate(self.caller_id, fake_topic,
241  []))
242  # try it with it the /probe_topic, which will exercise some error branches in the client
243  self.apiSuccess(node.publisherUpdate(self.caller_id, probe_topic,
244  ['http://unroutablefakeservice:1234']))
245  # give it some time to make sure it's attempted contact
246  time.sleep(1.0)
247  # check that it's still there
248  self.apiSuccess(node.publisherUpdate(self.caller_id, probe_topic,
249  []))
250 
251  def _checkTCPROS(self, protocol_params):
252  self.assert_(protocol_params, "no protocol params returned")
253  self.assert_(type(protocol_params) == list, "protocol params must be a list: %s"%protocol_params)
254  self.assertEquals(3, len(protocol_params), "TCPROS params should have length 3: %s"%protocol_params)
255  self.assertEquals(protocol_params[0], _TCPROS)
256  # expect ['TCPROS', 1.2.3.4, 1234]
257  self.assertEquals(protocol_params[0], _TCPROS)
258 
259  def testRequestTopic(self):
260  node = self.node
261  protocols = [[_TCPROS]]
262  probe_topic = rosgraph.names.ns_join(self.ns, 'probe_topic')
263  fake_topic = rosgraph.names.ns_join(self.ns, 'fake_topic')
264 
265  # test bad arity
266  self.apiError(node.requestTopic(self.caller_id, probe_topic, protocols, 'extra stuff'))
267  self.apiError(node.requestTopic(self.caller_id, probe_topic))
268  self.apiError(node.requestTopic(self.caller_id))
269  self.apiError(node.requestTopic())
270  # test bad args
271  self.apiError(node.requestTopic(self.caller_id, 1, protocols))
272  self.apiError(node.requestTopic(self.caller_id, '', protocols))
273  self.apiError(node.requestTopic(self.caller_id, fake_topic, protocols))
274  self.apiError(node.requestTopic(self.caller_id, probe_topic, 'fake-protocols'))
275 
276  topics = [rosgraph.names.ns_join(self.ns, t) for t in _required_publications]
277  # currently only support TCPROS as we require all clients to support this
278  protocols = [[_TCPROS]]
279  for topic in topics:
280  self._checkTCPROS(self.apiSuccess(node.requestTopic(self.caller_id, topic, protocols)))
281  protocols = [['FakeTransport', 1234, 5678], [_TCPROS], ['AnotherFakeTransport']]
282  # try each one more time, this time with more protocol choices
283  for topic in topics:
284  self._checkTCPROS(self.apiSuccess(node.requestTopic(self.caller_id, topic, protocols)))
285 
286  def testGetBusInfo(self):
287  # test bad arity
288  self.apiError(self.node.getBusInfo(self.caller_id, 'bad'))
289  self.apiError(self.node.getBusInfo())
290  #TODO: finish
291 
292  def testGetBusStats(self):
293  # test bad arity
294  self.apiError(self.node.getBusStats(self.caller_id, 'bad'))
295  self.apiError(self.node.getBusStats())
296  #TODO: finish
297 
298  ## test the state of the master based on expected node registration
299  def testRegistrations(self):
300  # setUp() ensures the node has registered with the master
301  topics = self.apiSuccess(self.master.getPublishedTopics(self.caller_id, ''))
302  topic_names = [t for t, type in topics]
303  required_topic_pubs = [rospy.resolve_name(t) for t in _required_publications]
304  required_topic_subs = [rospy.resolve_name(t) for t in _required_subscriptions]
305  self._checkTopics(required_topic_pubs, topics)
306 
307  # now check types
308  topicTypeMap = self._createTopicTypeMap()
309  for topic, type in topics:
310  if topic in topicTypeMap:
311  self.assertEquals(type, topicTypeMap[topic], "topic [%s]: type [%s] does not match expected [%s]"%(type, topic, topicTypeMap[topic]))
312 
313  # now check actual URIs
314  node_name = self.test_node
315  systemState = self.apiSuccess(self.master.getSystemState(self.caller_id))
316  pubs, subs, srvs = systemState
317  for topic, list in pubs:
318  if topic in required_topic_pubs:
319  self.assert_(node_name in list, "%s not in %s"%(self.node_api, list))
320  for topic, list in subs:
321  if topic in required_topic_subs:
322  self.assert_(node_name in list, "%s not in %s"%(self.node_api, list))
323  for service, list in srvs:
324  #TODO: no service tests yet
325  pass
326 
327 
328 ## Performs end-to-end integration tests of a test_node. NodeIntegrationTestCase
329 ## itself is a rospy node and thus implicitly depends on rospy functionality.
331 
332  def __init__(self, *args):
333  super(NodeIntegrationTestCase, self).__init__(*args)
334  rospy.init_node(NODE_INTEGRATION_NAME)
335 
336  def testString(self):
337  pub = rospy.Publisher('test_string_in', test_rosmaster.msg.String)
338  sub = rospy.Subscriber('test_string_in', test_rosmaster.msg.String)
339  #TODO: publish a bunch and check sequencing + caller_id
340  pub.unregister()
341  sub.unregister()
342 
343  def testPrimitives(self):
344  pub = rospy.Publisher('test_primitives_in', test_rosmaster.msg.String)
345  sub = rospy.Subscriber('test_primitives_out', test_rosmaster.msg.String)
346  #TODO: publish a bunch and check sequencing + caller_id
347  pub.unregister()
348  sub.unregister()
349 
350  def testArrays(self):
351  pub = rospy.Publisher('test_header_in', test_rosmaster.msg.String)
352  sub = rospy.Subscriber('test_header_out', test_rosmaster.msg.String)
353  #TODO: publish a bunch and check sequencing + caller_id
354  pub.unregister()
355  sub.unregister()
356 
357  def testHeader(self):
358  #msg.auto_header = True
359  pub = rospy.Publisher('test_header_in', test_rosmaster.msg.String)
360  sub = rospy.Subscriber('test_header_out', test_rosmaster.msg.String)
361  #TODO: publish a bunch and check sequencing + caller_id
362  pub.unregister()
363  sub.unregister()
364 
365 
def testGetName(self)
validate node.getName(caller_id)
Definition: node.py:203
def testGetSubscriptions(self)
validate node.getSubscriptions(caller_id)
Definition: node.py:165
def set_node_name(name)
set_node_name() must be called prior to the unit test so that the test harness knows its ROS name...
Definition: node.py:71
def testGetBusStats(self)
Definition: node.py:292
def testGetMasterUri(self)
validate node.getMasterUri(caller_id)
Definition: node.py:212
def _checkTopics(self, required, actual)
subroutine for testGetSubscriptions/testGetPublications
Definition: node.py:150
Parent of node API and integration test cases.
Definition: node.py:85
def testGetBusInfo(self)
Definition: node.py:286
def testRegistrations(self)
test the state of the master based on expected node registration
Definition: node.py:299
Expects a single test node to be running with name &#39;test_node&#39; and subscribed to &#39;test_string&#39;.
Definition: node.py:139
def get_caller_id()
reimplementation of caller ID spec separately from rospy
Definition: node.py:78
def apiSuccess(self, args)
unit test assertion that fails if status code is not 1 and otherwise returns the value parameter ...
Definition: rosclient.py:58
def apiError(self, args, msg=None)
unit test assertion that fails if status code is not -1 and otherwise returns true ...
Definition: rosclient.py:74
def testGetUri(self)
validate node.getUri(caller_id)
Definition: node.py:195
def _checkUri(self, uri)
validates a URI as being http(s)
Definition: node.py:120
def _checkTCPROS(self, protocol_params)
Definition: node.py:251
def _createTopicTypeMap(self)
dynamically create the expected topic->type map based on the current name resolution context ...
Definition: node.py:132
def testParamUpdate(self)
validate node.paramUpdate(caller_id, key, value)
Definition: node.py:174
def setUp(self)
Definition: node.py:102
Performs end-to-end integration tests of a test_node.
Definition: node.py:330
def __init__(self, args)
Definition: node.py:87
def testGetPid(self)
validate node.getPid(caller_id)
Definition: node.py:142
def testRequestTopic(self)
Definition: node.py:259
def testPublisherUpdate(self)
validate node.publisherUpdate(caller_id, topic, uris)
Definition: node.py:222
def testGetPublications(self)
validate node.getPublications(caller_id)
Definition: node.py:156
def __init__(self, args)
Definition: node.py:332


test_rosmaster
Author(s): Ken Conley, Dirk Thomas
autogenerated on Mon Nov 2 2020 03:52:54