40 from xmlrpc.client
import ServerProxy
42 from xmlrpclib
import ServerProxy
47 from rosclient
import *
49 NODE_INTEGRATION_NAME =
"node_integration_test" 63 raise Exception(
"set_node_name has not been called yet")
64 ros_ns = os.environ.get(rosgraph.ROS_NAMESPACE, rosgraph.names.GLOBALNS)
65 return rosgraph.names.ns_join(ros_ns, _name)
70 super(_MasterTestCase, self).
__init__(*args)
71 self.
ns = os.environ.get(rosgraph.ROS_NAMESPACE, rosgraph.names.GLOBALNS)
75 super(_MasterTestCase, self).
setUp()
76 self.
master_uri = os.environ.get(rosgraph.ROS_MASTER_URI,
None)
83 parsed = urlparse.urlparse(uri)
84 self.assert_(parsed[0]
in [
'http',
'https'],
'protocol [%s] in [%s] invalid'%(parsed[0], uri))
85 self.assert_(parsed[1],
'host missing [%s]'%uri)
86 if not sys.version.startswith(
'2.4'):
87 self.assert_(parsed.port,
'port missing/invalid [%s]'%uri)
95 self.
apiError(self.master.getMasterUri())
102 parsed = urlparse.urlparse(uri)
105 self.assertEquals(parsed.port, parsed2.port,
"expected ports do not match")
113 self.assert_(pid > 0)
121 self.assert_(type(uri) == str)
127 caller_id =
'/service_node' 128 caller_api =
'http://localhost:4567/' 129 service_base =
'/service' 132 for i
in range(0, 10):
133 service_name =
"%s-%s"%(service_base, i)
134 service_api =
'rosrpc://localhost:123%s/'%i
136 self.
apiSuccess(master.registerService(caller_id, service_name, service_api, caller_api))
138 val = self.
apiSuccess(master.lookupService(caller_id, service_name))
139 self.assertEquals(service_api, val)
141 self.assertEquals(caller_api, val)
144 for j
in range(0, i+1):
145 jservice_name =
"%s-%s"%(service_base, j)
146 jentry = [jservice_name, [caller_id]]
147 self.assert_(jentry
in srvs,
"master service list %s is missing %s"%(srvs, jentry))
159 caller_id =
'/service_node' 160 caller_api =
'http://localhost:4567/' 161 service_base =
'/service' 163 for i
in range(0, 10):
164 service_name =
"%s-%s"%(service_base, i)
165 service_api =
'rosrpc://localhost:123%s/'%i
168 code, msg, val = master.unregisterService(caller_id, service_name, service_api)
169 self.assertEquals(code, 1,
"code != 1, return message was [%s]"%msg)
172 self.
apiError(master.lookupService(self.
caller_id, service_name),
"master has a reference to unregistered service. message from master for unregister was [%s]"%msg)
176 self.assertEquals(caller_api, val,
"master prematurely invalidated node entry for [%s] (lookupNode)"%caller_id)
179 for j
in range(0, i+1):
180 jservice_name =
"%s-%s"%(service_base, j)
181 jentry = [jservice_name, [caller_id]]
182 self.assert_(jentry
not in srvs,
"master service list %s should not have %s"%(srvs, jentry))
183 for j
in range(i+1, 10):
184 jservice_name =
"%s-%s"%(service_base, j)
185 jentry = [jservice_name, [caller_id]]
186 self.assert_(jentry
in srvs,
"master service list %s is missing %s"%(srvs, jentry))
193 self.
apiError(master.lookupNode(self.
caller_id, caller_id),
"master has a stale reference to unregistered service node API")
195 srvs = [s
for s
in srvs
if not s[0].startswith(
'/rosout/')
and not s[0].endswith(
'/get_loggers')
and not s[0].endswith(
'/set_logger_level')]
196 self.assertEquals(0, len(srvs),
"all services should have been unregistered: %s"%srvs)
202 service_api =
'rosrpc://localhost:1234/' 203 caller_api =
'http://localhost:4567/' 206 self.
apiError(master.registerService())
211 self.
apiError(master.registerService(self.
caller_id,
'', service_api, caller_api))
219 service_api =
'rosrpc://localhost:1234/' 222 self.
apiError(master.unregisterService())
233 topic_type =
'test_rosmaster/String' 234 caller_api =
'http://localhost:4567/' 237 self.
apiError(master.registerPublisher())
242 self.
apiError(master.registerPublisher(self.
caller_id,
'', topic_type, caller_api))
250 caller_api =
'http://localhost:4567/' 253 self.
apiError(master.unregisterPublisher())
264 topic_type =
'test_rosmaster/String' 265 caller_api =
'http://localhost:4567/' 268 self.
apiError(master.registerSubscriber())
273 self.
apiError(master.registerSubscriber(self.
caller_id,
'', topic_type, caller_api))
281 caller_api =
'http://localhost:4567/' 284 self.
apiError(master.registerSubscriber())
295 caller_id =
'/pub_node' 296 caller_api =
'http://localhost:4567/' 297 topic_base =
'/pub_topic' 298 topic_type =
'test_rosmaster/String' 301 for i
in range(0, 10):
302 topic_name =
"%s-%s"%(topic_base, i)
304 self.
apiSuccess(master.registerPublisher(caller_id, topic_name, topic_type, caller_api))
308 self.assertEquals(caller_api, val)
311 self.assert_([topic_name, topic_type]
in val,
"master does not know topic type: %s"%val)
314 self.assert_([topic_name, topic_type]
in val,
"master does not know topic type: %s"%val)
317 for j
in range(0, i+1):
318 jtopic_name =
"%s-%s"%(topic_base, j)
319 jentry = [jtopic_name, [caller_id]]
320 self.assert_(jentry
in pubs,
"master pub/sub list %s is missing %s"%(pubs, jentry))
327 caller_id =
'/pub_node' 328 caller_api =
'http://localhost:4567/' 329 topic_name =
'/type_test_pub_topic' 332 val = self.
apiSuccess(master.registerPublisher(caller_id, topic_name,
'*', caller_api))
333 self.assertEquals([], val)
335 self.assert_([topic_name,
'*']
in val,
"master is not reporting * as type: %s"%val)
338 self.assert_([topic_name,
'*']
in val,
"master is not reporting * as type: %s"%val)
341 for t
in [
'test_rosmaster/String',
'*']:
342 val = self.
apiSuccess(master.registerPublisher(caller_id, topic_name, t, caller_api))
343 self.assertEquals([], val)
345 self.assert_([topic_name,
'test_rosmaster/String']
in val,
"master is not reporting test_rosmaster/String as type: %s"%val)
348 self.assert_([topic_name,
'test_rosmaster/String']
in val,
"master is not reporting test_rosmaster/String as type: %s"%val)
356 topic =
'/pub_topic-0' 357 type =
'test_rosmaster/String' 358 pub_caller_api =
'http://localhost:4567/' 361 for i
in range(5678, 5685):
362 api =
'http://localhost:%s'%i
364 self.
apiSuccess(master.registerSubscriber(
'/sub_node-%i'%i, topic, type, api))
365 val = self.
apiSuccess(master.registerPublisher(
'/pub_node', topic, type, pub_caller_api))
366 self.assertEquals(subs, val)
371 caller_id =
'/pub_node' 372 caller_api =
'http://localhost:4567/' 373 topic_base =
'/pub_topic' 375 for i
in range(0, 10):
376 topic_name =
"%s-%s"%(topic_base, i)
379 code, msg, val = master.unregisterPublisher(caller_id, topic_name, caller_api)
380 self.assertEquals(code, 1,
"code != 1, return message was [%s]"%msg)
385 self.assertEquals(caller_api, val,
"master prematurely invalidated node entry for [%s] (lookupNode)"%caller_id)
388 for j
in range(0, i+1):
389 jtopic_name =
"%s-%s"%(topic_base, j)
390 jentry = [jtopic_name, [caller_id]]
391 self.assert_(jentry
not in pubs,
"master pub/sub list %s should not have %s"%(pubs, jentry))
392 for j
in range(i+1, 10):
393 jtopic_name =
"%s-%s"%(topic_base, j)
394 jentry = [jtopic_name, [caller_id]]
395 self.assert_(jentry
in pubs,
"master pub/sub list %s is missing %s"%(pubs, jentry))
400 self.
apiError(master.lookupNode(self.
caller_id, caller_id),
"master has a stale reference to unregistered topic node API. pubs are %s"%pubs)
407 caller_id =
'/sub_node' 408 caller_api =
'http://localhost:4567/' 409 topic_base =
'/sub_topic' 410 topic_type =
'test_rosmaster/String' 413 for i
in range(0, 10):
414 topic_name =
"%s-%s"%(topic_base, i)
416 self.
apiSuccess(master.registerSubscriber(caller_id, topic_name, topic_type, caller_api))
420 self.assertEquals(caller_api, val)
424 self.assert_([topic_name, topic_type]
in val,
"master does not know topic type: %s"%val)
427 for j
in range(0, i+1):
428 jtopic_name =
"%s-%s"%(topic_base, j)
429 jentry = [jtopic_name, [caller_id]]
430 self.assert_(jentry
in subs,
"master pub/sub list %s is missing %s"%(subs, jentry))
439 caller_id =
'/sub_node' 440 caller_api =
'http://localhost:4567/' 441 topic_base =
'/sub_topic' 443 for i
in range(0, 10):
444 topic_name =
"%s-%s"%(topic_base, i)
447 code, msg, val = master.unregisterSubscriber(caller_id, topic_name, caller_api)
448 self.assertEquals(code, 1,
"code != 1, return message was [%s]"%msg)
453 self.assertEquals(caller_api, val,
"master prematurely invalidated node entry for [%s] (lookupNode)"%caller_id)
456 for j
in range(0, i+1):
457 jtopic_name =
"%s-%s"%(topic_base, j)
458 jentry = [jtopic_name, [caller_id]]
459 self.assert_(jentry
not in subs,
"master pub/sub list %s should not have %s"%(subs, jentry))
460 for j
in range(i+1, 10):
461 jtopic_name =
"%s-%s"%(topic_base, j)
462 jentry = [jtopic_name, [caller_id]]
463 self.assert_(jentry
in subs,
"master pub/sub list %s is missing %s"%(subs, jentry))
466 self.
apiError(master.lookupNode(self.
caller_id, caller_id),
"master has a stale reference to unregistered topic node API. subs are %s"%subs)
def _testRegisterServiceSuccess(self)
validate master.registerService(caller_id, service, service_api, caller_api)
def _testRegisterServiceInvalid(self)
def _testRegisterPublisherInvalid(self)
def get_caller_id()
reimplementation of caller ID spec separately from rospy
def set_node_name(name)
set_node_name() must be called prior to the unit test so that the test harness knows its ROS name...
def _testGetUri(self)
validate master.getUri(caller_id)
def _testUnregisterPublisherSuccess(self)
Expects a single test node to be running with name 'test_node' and subscribed to 'test_string'.
def _testRegisterPublisherSuccess(self)
validate master.registerPublisher(caller_id, topic, topic_api, caller_api)
def _subTestRegisterSubscriberSimpleSuccess(self)
common test subroutine of both register and unregister tests.
def _testUnregisterSubscriberInvalid(self)
def apiSuccess(self, args)
unit test assertion that fails if status code is not 1 and otherwise returns the value parameter ...
def _testRegisterSubscriberInvalid(self)
def apiError(self, args, msg=None)
unit test assertion that fails if status code is not -1 and otherwise returns true ...
def _testRegisterSubscriberSimpleSuccess(self)
validate master.registerSubscriber(caller_id, topic, topic_api, caller_api)
def _testGetPid(self)
validate master.getPid(caller_id)
def _testGetMasterUri(self)
validate master.getMasterUri(caller_id)
def _subTestRegisterPublisherSuccess(self)
common test subroutine of both register and unregister tests.
def _checkUri(self, uri)
validates a URI as being http(s)
def _testUnregisterServiceInvalid(self)
def _testUnregisterSubscriberSuccess(self)
def _subTestRegisterServiceSuccess(self)
common test subroutine of both register and unregister tests.
def _testUnregisterServiceSuccess(self)
def _testUnregisterPublisherInvalid(self)
def _testRegisterPublisherTypes(self)
#591: this test may change if we make registering '*' unsupported