40 from xmlrpc.client
import ServerProxy
42 from xmlrpclib
import ServerProxy
47 from rosclient
import *
49 NODE_INTEGRATION_NAME =
"node_integration_test" 52 _required_subscriptions =
'test_string_in',
'test_primitives_in',
'test_arrays_in',
'test_header_in',
'probe_topic' 55 _required_publications_map = {
56 'test_string_out':
'test_rosmaster/TestString',
57 'test_primitives_out':
'test_rosmaster/TestPrimitives',
58 'test_arrays_out':
'test_rosmaster/TestArrays',
59 'test_header_out':
'test_rosmaster/TestHeader',
61 _required_publications = _required_publications_map.keys()
80 raise Exception(
"set_node_name has not been called yet")
81 ros_ns = os.environ.get(rosgraph.ROS_NAMESPACE, rosgraph.names.GLOBALNS)
82 return rosgraph.names.ns_join(ros_ns, _name)
88 super(_NodeTestCase, self).
__init__(*args)
90 self.
ns = os.environ.get(rosgraph.ROS_NAMESPACE, rosgraph.names.GLOBALNS)
96 if arg.startswith(
"--node="):
103 super(_NodeTestCase, self).
setUp()
107 timeout_t = 5.0 + time.time()
109 while time.time() < timeout_t
and not self.
node_api:
114 self.fail(
"master did not return XML-RPC API for [%s, %s]"%(self.
caller_id, self.
test_node))
116 self.assert_(self.node_api.startswith(
'http'))
122 from urllib.parse
import urlparse
124 from urlparse
import urlparse
125 parsed = urlparse(uri)
126 self.assert_(parsed[0]
in [
'http',
'https'],
'protocol [%s] in [%s] invalid'%(parsed[0], uri))
127 self.assert_(parsed[1],
'host missing [%s]'%uri)
128 if not sys.version.startswith(
'2.4'):
129 self.assert_(parsed.port,
'port missing/invalid [%s]'%uri)
134 for t
in _required_publications_map.keys():
135 new_map[rospy.resolve_name(t)] = _required_publications_map[t]
147 self.assert_(pid > 0)
151 actual = [t
for t, _
in actual]
152 missing = set(required) - set(actual)
153 self.failIf(len(missing),
'missing required topics: %s'%(
','.join(missing)))
158 self.
apiError(self.node.getPublications())
162 self.
_checkTopics([rospy.resolve_name(t)
for t
in _required_publications],
167 self.
apiError(self.node.getSubscriptions())
170 self.
_checkTopics([rospy.resolve_name(t)
for t
in _required_subscriptions],
176 good_key = rosgraph.names.ns_join(self.
ns,
'good_key')
177 bad_key = rosgraph.names.ns_join(self.
ns,
'bad_key')
209 self.assert_(len(val),
"empty name")
215 self.
apiError(self.node.getMasterUri())
219 self.assertEquals(rosgraph.get_master_uri(), uri)
224 probe_topic = rosgraph.names.ns_join(self.
ns,
'probe_topic')
225 fake_topic = rosgraph.names.ns_join(self.
ns,
'fake_topic')
234 self.
apiError(node.publisherUpdate())
239 [
'http://localhost:1234',
'http://localhost:5678']))
244 [
'http://unroutablefakeservice:1234']))
252 self.assert_(protocol_params,
"no protocol params returned")
253 self.assert_(type(protocol_params) == list,
"protocol params must be a list: %s"%protocol_params)
254 self.assertEquals(3, len(protocol_params),
"TCPROS params should have length 3: %s"%protocol_params)
255 self.assertEquals(protocol_params[0], _TCPROS)
257 self.assertEquals(protocol_params[0], _TCPROS)
261 protocols = [[_TCPROS]]
262 probe_topic = rosgraph.names.ns_join(self.
ns,
'probe_topic')
263 fake_topic = rosgraph.names.ns_join(self.
ns,
'fake_topic')
266 self.
apiError(node.requestTopic(self.
caller_id, probe_topic, protocols,
'extra stuff'))
276 topics = [rosgraph.names.ns_join(self.
ns, t)
for t
in _required_publications]
278 protocols = [[_TCPROS]]
281 protocols = [[
'FakeTransport', 1234, 5678], [_TCPROS], [
'AnotherFakeTransport']]
289 self.
apiError(self.node.getBusInfo())
295 self.
apiError(self.node.getBusStats())
302 topic_names = [t
for t, type
in topics]
303 required_topic_pubs = [rospy.resolve_name(t)
for t
in _required_publications]
304 required_topic_subs = [rospy.resolve_name(t)
for t
in _required_subscriptions]
309 for topic, type
in topics:
310 if topic
in topicTypeMap:
311 self.assertEquals(type, topicTypeMap[topic],
"topic [%s]: type [%s] does not match expected [%s]"%(type, topic, topicTypeMap[topic]))
316 pubs, subs, srvs = systemState
317 for topic, list
in pubs:
318 if topic
in required_topic_pubs:
319 self.assert_(node_name
in list,
"%s not in %s"%(self.
node_api, list))
320 for topic, list
in subs:
321 if topic
in required_topic_subs:
322 self.assert_(node_name
in list,
"%s not in %s"%(self.
node_api, list))
323 for service, list
in srvs:
333 super(NodeIntegrationTestCase, self).
__init__(*args)
334 rospy.init_node(NODE_INTEGRATION_NAME)
337 pub = rospy.Publisher(
'test_string_in', test_rosmaster.msg.String)
338 sub = rospy.Subscriber(
'test_string_in', test_rosmaster.msg.String)
344 pub = rospy.Publisher(
'test_primitives_in', test_rosmaster.msg.String)
345 sub = rospy.Subscriber(
'test_primitives_out', test_rosmaster.msg.String)
351 pub = rospy.Publisher(
'test_header_in', test_rosmaster.msg.String)
352 sub = rospy.Subscriber(
'test_header_out', test_rosmaster.msg.String)
359 pub = rospy.Publisher(
'test_header_in', test_rosmaster.msg.String)
360 sub = rospy.Subscriber(
'test_header_out', test_rosmaster.msg.String)
def testGetName(self)
validate node.getName(caller_id)
def testGetSubscriptions(self)
validate node.getSubscriptions(caller_id)
def set_node_name(name)
set_node_name() must be called prior to the unit test so that the test harness knows its ROS name...
def testGetBusStats(self)
def testGetMasterUri(self)
validate node.getMasterUri(caller_id)
def _checkTopics(self, required, actual)
subroutine for testGetSubscriptions/testGetPublications
Parent of node API and integration test cases.
def testRegistrations(self)
test the state of the master based on expected node registration
Expects a single test node to be running with name 'test_node' and subscribed to 'test_string'.
def get_caller_id()
reimplementation of caller ID spec separately from rospy
def apiSuccess(self, args)
unit test assertion that fails if status code is not 1 and otherwise returns the value parameter ...
def apiError(self, args, msg=None)
unit test assertion that fails if status code is not -1 and otherwise returns true ...
def testGetUri(self)
validate node.getUri(caller_id)
def _checkUri(self, uri)
validates a URI as being http(s)
def _checkTCPROS(self, protocol_params)
def _createTopicTypeMap(self)
dynamically create the expected topic->type map based on the current name resolution context ...
def testParamUpdate(self)
validate node.paramUpdate(caller_id, key, value)
Performs end-to-end integration tests of a test_node.
def testGetPid(self)
validate node.getPid(caller_id)
def testRequestTopic(self)
def testPublisherUpdate(self)
validate node.publisherUpdate(caller_id, topic, uris)
def testGetPublications(self)
validate node.getPublications(caller_id)