00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 import os
00036 import sys
00037 import string
00038 import time
00039 try:
00040 from xmlrpc.client import ServerProxy
00041 except ImportError:
00042 from xmlrpclib import ServerProxy
00043
00044 import rospy
00045 import rosgraph
00046
00047 from rosclient import *
00048
00049 NODE_INTEGRATION_NAME = "node_integration_test"
00050
00051
00052 _required_subscriptions = 'test_string_in', 'test_primitives_in', 'test_arrays_in', 'test_header_in', 'probe_topic'
00053
00054
00055 _required_publications_map = {
00056 'test_string_out': 'test_rosmaster/TestString',
00057 'test_primitives_out': 'test_rosmaster/TestPrimitives',
00058 'test_arrays_out': 'test_rosmaster/TestArrays',
00059 'test_header_out': 'test_rosmaster/TestHeader',
00060 }
00061 _required_publications = _required_publications_map.keys()
00062
00063 _TCPROS = 'TCPROS'
00064
00065
00066
00067
00068 _name = None
00069
00070
00071 def set_node_name(name):
00072 global _name
00073 _name = name
00074
00075
00076
00077
00078 def get_caller_id():
00079 if _name is None:
00080 raise Exception("set_node_name has not been called yet")
00081 ros_ns = os.environ.get(rosgraph.ROS_NAMESPACE, rosgraph.names.GLOBALNS)
00082 return rosgraph.names.ns_join(ros_ns, _name)
00083
00084
00085 class _NodeTestCase(TestRosClient):
00086
00087 def __init__(self, *args):
00088 super(_NodeTestCase, self).__init__(*args)
00089
00090 self.ns = os.environ.get(rosgraph.ROS_NAMESPACE, rosgraph.names.GLOBALNS)
00091 self.caller_id = get_caller_id()
00092
00093
00094 self.test_node = 'test_node'
00095 for arg in sys.argv:
00096 if arg.startswith("--node="):
00097 self.test_node = arg[len("--node="):]
00098
00099 self.test_node = rosgraph.names.ns_join(self.ns, self.test_node)
00100
00101
00102 def setUp(self):
00103 super(_NodeTestCase, self).setUp()
00104
00105
00106 import time
00107 timeout_t = 5.0 + time.time()
00108 self.node_api = None
00109 while time.time() < timeout_t and not self.node_api:
00110 code, msg, node_api = self.master.lookupNode(self.caller_id, self.test_node)
00111 if code == 1:
00112 self.node_api = node_api
00113 if not self.node_api:
00114 self.fail("master did not return XML-RPC API for [%s, %s]"%(self.caller_id, self.test_node))
00115 print("[%s] API = %s" %(self.test_node, self.node_api))
00116 self.assert_(self.node_api.startswith('http'))
00117 self.node = ServerProxy(self.node_api)
00118
00119
00120 def _checkUri(self, uri):
00121 try:
00122 from urllib.parse import urlparse
00123 except ImportError:
00124 from urlparse import urlparse
00125 parsed = urlparse(uri)
00126 self.assert_(parsed[0] in ['http', 'https'], 'protocol [%s] in [%s] invalid'%(parsed[0], uri))
00127 self.assert_(parsed[1], 'host missing [%s]'%uri)
00128 if not sys.version.startswith('2.4'):
00129 self.assert_(parsed.port, 'port missing/invalid [%s]'%uri)
00130
00131
00132 def _createTopicTypeMap(self):
00133 new_map = {}
00134 for t in _required_publications_map.keys():
00135 new_map[rospy.resolve_name(t)] = _required_publications_map[t]
00136 return new_map
00137
00138
00139 class NodeApiTestCase(_NodeTestCase):
00140
00141
00142 def testGetPid(self):
00143
00144 self.apiError(self.node.getPid())
00145
00146 pid = self.apiSuccess(self.node.getPid(self.caller_id))
00147 self.assert_(pid > 0)
00148
00149
00150 def _checkTopics(self, required, actual):
00151 actual = [t for t, _ in actual]
00152 missing = set(required) - set(actual)
00153 self.failIf(len(missing), 'missing required topics: %s'%(','.join(missing)))
00154
00155
00156 def testGetPublications(self):
00157
00158 self.apiError(self.node.getPublications())
00159 self.apiError(self.node.getPublications(self.caller_id, 'something extra'))
00160
00161
00162 self._checkTopics([rospy.resolve_name(t) for t in _required_publications],
00163 self.apiSuccess(self.node.getPublications(self.caller_id)))
00164
00165 def testGetSubscriptions(self):
00166
00167 self.apiError(self.node.getSubscriptions())
00168 self.apiError(self.node.getSubscriptions(self.caller_id, 'something extra'))
00169
00170 self._checkTopics([rospy.resolve_name(t) for t in _required_subscriptions],
00171 self.apiSuccess(self.node.getSubscriptions(self.caller_id)))
00172
00173
00174 def testParamUpdate(self):
00175 node = self.node
00176 good_key = rosgraph.names.ns_join(self.ns, 'good_key')
00177 bad_key = rosgraph.names.ns_join(self.ns, 'bad_key')
00178
00179
00180 self.apiError(node.paramUpdate(self.caller_id, '', 'bad'))
00181 self.apiError(node.paramUpdate(self.caller_id, 'no_namespace', 'bad'))
00182
00183 self.apiError(node.paramUpdate(self.caller_id, bad_key))
00184 self.apiError(node.paramUpdate(self.caller_id))
00185
00186
00187 self.apiError(node.paramUpdate(self.caller_id, good_key, 'good_value'))
00188
00189
00190
00191
00192
00193
00194
00195 def testGetUri(self):
00196
00197 self.apiError(self.node.getUri(self.caller_id, 'bad'))
00198 self.apiError(self.node.getUri())
00199
00200 self._checkUri(self.apiSuccess(self.node.getUri(self.caller_id)))
00201
00202
00203 def testGetName(self):
00204
00205 self.apiError(self.node.getName(self.caller_id, 'bad'))
00206 self.apiError(self.node.getName())
00207
00208 val = self.apiSuccess(self.node.getName(self.caller_id))
00209 self.assert_(len(val), "empty name")
00210
00211
00212 def testGetMasterUri(self):
00213
00214 self.apiError(self.node.getMasterUri(self.caller_id, 'bad'))
00215 self.apiError(self.node.getMasterUri())
00216
00217 uri = self.apiSuccess(self.node.getMasterUri(self.caller_id))
00218 self._checkUri(uri)
00219 self.assertEquals(rosgraph.get_master_uri(), uri)
00220
00221
00222 def testPublisherUpdate(self):
00223 node = self.node
00224 probe_topic = rosgraph.names.ns_join(self.ns, 'probe_topic')
00225 fake_topic = rosgraph.names.ns_join(self.ns, 'fake_topic')
00226
00227 self.apiError(node.getBusStats(self.caller_id, 'bad'))
00228 self.apiError(node.getBusStats())
00229
00230 self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', 'bad'))
00231 self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', 2))
00232 self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', False))
00233 self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', ['bad']))
00234 self.apiError(node.publisherUpdate())
00235
00236
00237
00238 self.apiSuccess(node.publisherUpdate(self.caller_id, fake_topic,
00239 ['http://localhost:1234', 'http://localhost:5678']))
00240 self.apiSuccess(node.publisherUpdate(self.caller_id, fake_topic,
00241 []))
00242
00243 self.apiSuccess(node.publisherUpdate(self.caller_id, probe_topic,
00244 ['http://unroutablefakeservice:1234']))
00245
00246 time.sleep(1.0)
00247
00248 self.apiSuccess(node.publisherUpdate(self.caller_id, probe_topic,
00249 []))
00250
00251 def _checkTCPROS(self, protocol_params):
00252 self.assert_(protocol_params, "no protocol params returned")
00253 self.assert_(type(protocol_params) == list, "protocol params must be a list: %s"%protocol_params)
00254 self.assertEquals(3, len(protocol_params), "TCPROS params should have length 3: %s"%protocol_params)
00255 self.assertEquals(protocol_params[0], _TCPROS)
00256
00257 self.assertEquals(protocol_params[0], _TCPROS)
00258
00259 def testRequestTopic(self):
00260 node = self.node
00261 protocols = [[_TCPROS]]
00262 probe_topic = rosgraph.names.ns_join(self.ns, 'probe_topic')
00263 fake_topic = rosgraph.names.ns_join(self.ns, 'fake_topic')
00264
00265
00266 self.apiError(node.requestTopic(self.caller_id, probe_topic, protocols, 'extra stuff'))
00267 self.apiError(node.requestTopic(self.caller_id, probe_topic))
00268 self.apiError(node.requestTopic(self.caller_id))
00269 self.apiError(node.requestTopic())
00270
00271 self.apiError(node.requestTopic(self.caller_id, 1, protocols))
00272 self.apiError(node.requestTopic(self.caller_id, '', protocols))
00273 self.apiError(node.requestTopic(self.caller_id, fake_topic, protocols))
00274 self.apiError(node.requestTopic(self.caller_id, probe_topic, 'fake-protocols'))
00275
00276 topics = [rosgraph.names.ns_join(self.ns, t) for t in _required_publications]
00277
00278 protocols = [[_TCPROS]]
00279 for topic in topics:
00280 self._checkTCPROS(self.apiSuccess(node.requestTopic(self.caller_id, topic, protocols)))
00281 protocols = [['FakeTransport', 1234, 5678], [_TCPROS], ['AnotherFakeTransport']]
00282
00283 for topic in topics:
00284 self._checkTCPROS(self.apiSuccess(node.requestTopic(self.caller_id, topic, protocols)))
00285
00286 def testGetBusInfo(self):
00287
00288 self.apiError(self.node.getBusInfo(self.caller_id, 'bad'))
00289 self.apiError(self.node.getBusInfo())
00290
00291
00292 def testGetBusStats(self):
00293
00294 self.apiError(self.node.getBusStats(self.caller_id, 'bad'))
00295 self.apiError(self.node.getBusStats())
00296
00297
00298
00299 def testRegistrations(self):
00300
00301 topics = self.apiSuccess(self.master.getPublishedTopics(self.caller_id, ''))
00302 topic_names = [t for t, type in topics]
00303 required_topic_pubs = [rospy.resolve_name(t) for t in _required_publications]
00304 required_topic_subs = [rospy.resolve_name(t) for t in _required_subscriptions]
00305 self._checkTopics(required_topic_pubs, topics)
00306
00307
00308 topicTypeMap = self._createTopicTypeMap()
00309 for topic, type in topics:
00310 if topic in topicTypeMap:
00311 self.assertEquals(type, topicTypeMap[topic], "topic [%s]: type [%s] does not match expected [%s]"%(type, topic, topicTypeMap[topic]))
00312
00313
00314 node_name = self.test_node
00315 systemState = self.apiSuccess(self.master.getSystemState(self.caller_id))
00316 pubs, subs, srvs = systemState
00317 for topic, list in pubs:
00318 if topic in required_topic_pubs:
00319 self.assert_(node_name in list, "%s not in %s"%(self.node_api, list))
00320 for topic, list in subs:
00321 if topic in required_topic_subs:
00322 self.assert_(node_name in list, "%s not in %s"%(self.node_api, list))
00323 for service, list in srvs:
00324
00325 pass
00326
00327
00328
00329
00330 class NodeIntegrationTestCase(_NodeTestCase):
00331
00332 def __init__(self, *args):
00333 super(NodeIntegrationTestCase, self).__init__(*args)
00334 rospy.init_node(NODE_INTEGRATION_NAME)
00335
00336 def testString(self):
00337 pub = rospy.Publisher('test_string_in', test_rosmaster.msg.String)
00338 sub = rospy.Subscriber('test_string_in', test_rosmaster.msg.String)
00339
00340 pub.unregister()
00341 sub.unregister()
00342
00343 def testPrimitives(self):
00344 pub = rospy.Publisher('test_primitives_in', test_rosmaster.msg.String)
00345 sub = rospy.Subscriber('test_primitives_out', test_rosmaster.msg.String)
00346
00347 pub.unregister()
00348 sub.unregister()
00349
00350 def testArrays(self):
00351 pub = rospy.Publisher('test_header_in', test_rosmaster.msg.String)
00352 sub = rospy.Subscriber('test_header_out', test_rosmaster.msg.String)
00353
00354 pub.unregister()
00355 sub.unregister()
00356
00357 def testHeader(self):
00358
00359 pub = rospy.Publisher('test_header_in', test_rosmaster.msg.String)
00360 sub = rospy.Subscriber('test_header_out', test_rosmaster.msg.String)
00361
00362 pub.unregister()
00363 sub.unregister()
00364
00365