00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 import os
00036 import sys
00037 import string
00038 import time
00039 import xmlrpclib
00040
00041 import rospy
00042 import rosgraph
00043
00044 from test_ros.rosclient import *
00045
00046 NODE_INTEGRATION_NAME = "node_integration_test"
00047
00048
00049 _required_subscriptions = 'test_string_in', 'test_primitives_in', 'test_arrays_in', 'test_header_in', 'probe_topic'
00050
00051
00052 _required_publications_map = {
00053 'test_string_out': 'test_ros/TestString',
00054 'test_primitives_out': 'test_ros/TestPrimitives',
00055 'test_arrays_out': 'test_ros/TestArrays',
00056 'test_header_out': 'test_ros/TestHeader',
00057 }
00058 _required_publications = _required_publications_map.keys()
00059
00060 _TCPROS = 'TCPROS'
00061
00062
00063
00064
00065 _name = None
00066
00067
00068 def set_node_name(name):
00069 global _name
00070 _name = name
00071
00072
00073
00074
00075 def get_caller_id():
00076 if _name is None:
00077 raise Exception("set_node_name has not been called yet")
00078 ros_ns = os.environ.get(rosgraph.ROS_NAMESPACE, rosgraph.names.GLOBALNS)
00079 return rosgraph.names.ns_join(ros_ns, _name)
00080
00081
00082 class _NodeTestCase(TestRosClient):
00083
00084 def __init__(self, *args):
00085 super(_NodeTestCase, self).__init__(*args)
00086
00087 self.ns = os.environ.get(rosgraph.ROS_NAMESPACE, rosgraph.names.GLOBALNS)
00088 self.caller_id = get_caller_id()
00089
00090
00091 self.test_node = 'test_node'
00092 for arg in sys.argv:
00093 if arg.startswith("--node="):
00094 self.test_node = arg[len("--node="):]
00095
00096 self.test_node = rosgraph.names.ns_join(self.ns, self.test_node)
00097
00098
00099 def setUp(self):
00100 super(_NodeTestCase, self).setUp()
00101
00102
00103 import time
00104 timeout_t = 5.0 + time.time()
00105 self.node_api = None
00106 while time.time() < timeout_t and not self.node_api:
00107 code, msg, node_api = self.master.lookupNode(self.caller_id, self.test_node)
00108 if code == 1:
00109 self.node_api = node_api
00110 if not self.node_api:
00111 self.fail("master did not return XML-RPC API for [%s, %s]"%(self.caller_id, self.test_node))
00112 print "[%s] API = %s"%(self.test_node, self.node_api)
00113 self.assert_(self.node_api.startswith('http'))
00114 self.node = xmlrpclib.ServerProxy(self.node_api)
00115
00116
00117 def _checkUri(self, uri):
00118 import urlparse
00119 parsed = urlparse.urlparse(uri)
00120 self.assert_(parsed[0] in ['http', 'https'], 'protocol [%s] in [%s] invalid'%(parsed[0], uri))
00121 self.assert_(parsed[1], 'host missing [%s]'%uri)
00122 if not sys.version.startswith('2.4'):
00123 self.assert_(parsed.port, 'port missing/invalid [%s]'%uri)
00124
00125
00126 def _createTopicTypeMap(self):
00127 new_map = {}
00128 for t in _required_publications_map.iterkeys():
00129 new_map[rospy.resolve_name(t)] = _required_publications_map[t]
00130 return new_map
00131
00132
00133 class NodeApiTestCase(_NodeTestCase):
00134
00135
00136 def testGetPid(self):
00137
00138 self.apiError(self.node.getPid())
00139
00140 pid = self.apiSuccess(self.node.getPid(self.caller_id))
00141 self.assert_(pid > 0)
00142
00143
00144 def _checkTopics(self, required, actual):
00145 actual = [t for t, _ in actual]
00146 missing = set(required) - set(actual)
00147 self.failIf(len(missing), 'missing required topics: %s'%(','.join(missing)))
00148
00149
00150 def testGetPublications(self):
00151
00152 self.apiError(self.node.getPublications())
00153 self.apiError(self.node.getPublications(self.caller_id, 'something extra'))
00154
00155
00156 self._checkTopics([rospy.resolve_name(t) for t in _required_publications],
00157 self.apiSuccess(self.node.getPublications(self.caller_id)))
00158
00159 def testGetSubscriptions(self):
00160
00161 self.apiError(self.node.getSubscriptions())
00162 self.apiError(self.node.getSubscriptions(self.caller_id, 'something extra'))
00163
00164 self._checkTopics([rospy.resolve_name(t) for t in _required_subscriptions],
00165 self.apiSuccess(self.node.getSubscriptions(self.caller_id)))
00166
00167
00168 def testParamUpdate(self):
00169 node = self.node
00170 good_key = rosgraph.names.ns_join(self.ns, 'good_key')
00171 bad_key = rosgraph.names.ns_join(self.ns, 'bad_key')
00172
00173
00174 self.apiError(node.paramUpdate(self.caller_id, '', 'bad'))
00175 self.apiError(node.paramUpdate(self.caller_id, 'no_namespace', 'bad'))
00176
00177 self.apiError(node.paramUpdate(self.caller_id, bad_key))
00178 self.apiError(node.paramUpdate(self.caller_id))
00179
00180
00181 self.apiError(node.paramUpdate(self.caller_id, good_key, 'good_value'))
00182
00183
00184
00185
00186
00187
00188
00189 def testGetUri(self):
00190
00191 self.apiError(self.node.getUri(self.caller_id, 'bad'))
00192 self.apiError(self.node.getUri())
00193
00194 self._checkUri(self.apiSuccess(self.node.getUri(self.caller_id)))
00195
00196
00197 def testGetName(self):
00198
00199 self.apiError(self.node.getName(self.caller_id, 'bad'))
00200 self.apiError(self.node.getName())
00201
00202 val = self.apiSuccess(self.node.getName(self.caller_id))
00203 self.assert_(len(val), "empty name")
00204
00205
00206 def testGetMasterUri(self):
00207
00208 self.apiError(self.node.getMasterUri(self.caller_id, 'bad'))
00209 self.apiError(self.node.getMasterUri())
00210
00211 uri = self.apiSuccess(self.node.getMasterUri(self.caller_id))
00212 self._checkUri(uri)
00213 self.assertEquals(rosgraph.get_master_uri(), uri)
00214
00215
00216 def testPublisherUpdate(self):
00217 node = self.node
00218 probe_topic = rosgraph.names.ns_join(self.ns, 'probe_topic')
00219 fake_topic = rosgraph.names.ns_join(self.ns, 'fake_topic')
00220
00221 self.apiError(node.getBusStats(self.caller_id, 'bad'))
00222 self.apiError(node.getBusStats())
00223
00224 self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', 'bad'))
00225 self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', 2))
00226 self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', False))
00227 self.apiError(node.publisherUpdate(self.caller_id, '/bad_topic', ['bad']))
00228 self.apiError(node.publisherUpdate())
00229
00230
00231
00232 self.apiSuccess(node.publisherUpdate(self.caller_id, fake_topic,
00233 ['http://localhost:1234', 'http://localhost:5678']))
00234 self.apiSuccess(node.publisherUpdate(self.caller_id, fake_topic,
00235 []))
00236
00237 self.apiSuccess(node.publisherUpdate(self.caller_id, probe_topic,
00238 ['http://unroutablefakeservice:1234']))
00239
00240 time.sleep(1.0)
00241
00242 self.apiSuccess(node.publisherUpdate(self.caller_id, probe_topic,
00243 []))
00244
00245 def _checkTCPROS(self, protocol_params):
00246 self.assert_(protocol_params, "no protocol params returned")
00247 self.assert_(type(protocol_params) == list, "protocol params must be a list: %s"%protocol_params)
00248 self.assertEquals(3, len(protocol_params), "TCPROS params should have length 3: %s"%protocol_params)
00249 self.assertEquals(protocol_params[0], _TCPROS)
00250
00251 self.assertEquals(protocol_params[0], _TCPROS)
00252
00253 def testRequestTopic(self):
00254 node = self.node
00255 protocols = [[_TCPROS]]
00256 probe_topic = rosgraph.names.ns_join(self.ns, 'probe_topic')
00257 fake_topic = rosgraph.names.ns_join(self.ns, 'fake_topic')
00258
00259
00260 self.apiError(node.requestTopic(self.caller_id, probe_topic, protocols, 'extra stuff'))
00261 self.apiError(node.requestTopic(self.caller_id, probe_topic))
00262 self.apiError(node.requestTopic(self.caller_id))
00263 self.apiError(node.requestTopic())
00264
00265 self.apiError(node.requestTopic(self.caller_id, 1, protocols))
00266 self.apiError(node.requestTopic(self.caller_id, '', protocols))
00267 self.apiError(node.requestTopic(self.caller_id, fake_topic, protocols))
00268 self.apiError(node.requestTopic(self.caller_id, probe_topic, 'fake-protocols'))
00269
00270 topics = [rosgraph.names.ns_join(self.ns, t) for t in _required_publications]
00271
00272 protocols = [[_TCPROS]]
00273 for topic in topics:
00274 self._checkTCPROS(self.apiSuccess(node.requestTopic(self.caller_id, topic, protocols)))
00275 protocols = [['FakeTransport', 1234, 5678], [_TCPROS], ['AnotherFakeTransport']]
00276
00277 for topic in topics:
00278 self._checkTCPROS(self.apiSuccess(node.requestTopic(self.caller_id, topic, protocols)))
00279
00280 def testGetBusInfo(self):
00281
00282 self.apiError(self.node.getBusInfo(self.caller_id, 'bad'))
00283 self.apiError(self.node.getBusInfo())
00284
00285
00286 def testGetBusStats(self):
00287
00288 self.apiError(self.node.getBusStats(self.caller_id, 'bad'))
00289 self.apiError(self.node.getBusStats())
00290
00291
00292
00293 def testRegistrations(self):
00294
00295 topics = self.apiSuccess(self.master.getPublishedTopics(self.caller_id, ''))
00296 topic_names = [t for t, type in topics]
00297 required_topic_pubs = [rospy.resolve_name(t) for t in _required_publications]
00298 required_topic_subs = [rospy.resolve_name(t) for t in _required_subscriptions]
00299 self._checkTopics(required_topic_pubs, topics)
00300
00301
00302 topicTypeMap = self._createTopicTypeMap()
00303 for topic, type in topics:
00304 if topic in topicTypeMap:
00305 self.assertEquals(type, topicTypeMap[topic], "topic [%s]: type [%s] does not match expected [%s]"%(type, topic, topicTypeMap[topic]))
00306
00307
00308 node_name = self.test_node
00309 systemState = self.apiSuccess(self.master.getSystemState(self.caller_id))
00310 pubs, subs, srvs = systemState
00311 for topic, list in pubs:
00312 if topic in required_topic_pubs:
00313 self.assert_(node_name in list, "%s not in %s"%(self.node_api, list))
00314 for topic, list in subs:
00315 if topic in required_topic_subs:
00316 self.assert_(node_name in list, "%s not in %s"%(self.node_api, list))
00317 for service, list in srvs:
00318
00319 pass
00320
00321
00322
00323
00324 class NodeIntegrationTestCase(_NodeTestCase):
00325
00326 def __init__(self, *args):
00327 super(NodeIntegrationTestCase, self).__init__(*args)
00328 rospy.init_node(NODE_INTEGRATION_NAME)
00329
00330 def testString(self):
00331 pub = rospy.Publisher('test_string_in', std_msgs.msg.String)
00332 sub = rospy.Subscriber('test_string_in', std_msgs.msg.String)
00333
00334 pub.unregister()
00335 sub.unregister()
00336
00337 def testPrimitives(self):
00338 pub = rospy.Publisher('test_primitives_in', std_msgs.msg.String)
00339 sub = rospy.Subscriber('test_primitives_out', std_msgs.msg.String)
00340
00341 pub.unregister()
00342 sub.unregister()
00343
00344 def testArrays(self):
00345 pub = rospy.Publisher('test_header_in', std_msgs.msg.String)
00346 sub = rospy.Subscriber('test_header_out', std_msgs.msg.String)
00347
00348 pub.unregister()
00349 sub.unregister()
00350
00351 def testHeader(self):
00352
00353 pub = rospy.Publisher('test_header_in', std_msgs.msg.String)
00354 sub = rospy.Subscriber('test_header_out', std_msgs.msg.String)
00355
00356 pub.unregister()
00357 sub.unregister()
00358
00359