38 import rosgraph.masterapi
46 self.
m = rosgraph.masterapi.Master(_ID)
54 self.assert_(val.startswith(
'http://'))
57 uri =
'http://localhost:897' 58 rpcuri =
'rosrpc://localhost:9812' 59 self.m.registerService(
'/bar/service', rpcuri, uri)
60 self.assertEquals(rpcuri, self.m.lookupService(
'/bar/service'))
62 self.assertEquals(uri, self.m.lookupService(
'/fake/service'))
63 self.fail(
"should have thrown")
64 except rosgraph.masterapi.Error:
68 self.m.registerService(
'/bar/service',
'rosrpc://localhost:9812',
'http://localhost:893')
71 self.m.registerService(
'/unreg_service/service',
'rosrpc://localhost:9812',
'http://localhost:893')
72 val = self.m.registerService(
'/unreg_service/service',
'rosrpc://localhost:9812',
'http://localhost:893')
73 self.assertEquals(1, val)
76 val = self.m.registerSubscriber(
'/reg_sub/node',
'std_msgs/String',
'http://localhost:9812')
77 self.assertEquals([], val)
80 self.m.registerSubscriber(
'/reg_unsub/node',
'std_msgs/String',
'http://localhost:9812')
81 val = self.m.unregisterSubscriber(
'/reg_unsub/node',
'http://localhost:9812')
82 self.assertEquals(1, val)
85 val = self.m.registerPublisher(
'/reg_pub/topic',
'std_msgs/String',
'http://localhost:9812')
88 uri =
'http://localhost:9812' 89 self.m.registerPublisher(
'/unreg_pub/fake_topic',
'std_msgs/String', uri)
90 self.m.unregisterPublisher(
'/unreg_pub/fake_topic', uri)
94 uri =
'http://localhost:12345' 95 self.m.registerPublisher(
'fake_topic',
'std_msgs/String', uri)
96 self.assertEquals(uri, self.m.lookupNode(_ID))
99 self.m.lookupNode(
'/non/existent')
100 self.fail(
"should have thrown")
101 except rosgraph.masterapi.Error:
105 topics = self.m.getPublishedTopics(
'/')
108 topic_types = self.m.getTopicTypes()
111 pub, sub, srvs = self.m.getSystemState()
114 self.assert_(rosgraph.masterapi.is_online())
115 self.assert_(self.m.is_online())
119 self.m.getParam(
'fake_param')
120 self.fail(
"should have failed to lookup fake parameter")
121 except rosgraph.masterapi.Error:
125 self.failIf(self.m.hasParam(
'fake_param'),
"should have failed to lookup fake parameter")
126 self.assert_(self.m.hasParam(
'/run_id'),
"should have failed to lookup fake parameter")
129 self.m.setParam(
'/foo', 1)
132 self.assertEquals(
"/run_id", self.m.searchParam(
'run_id'))
135 self.assert_(type(self.m.getParamNames()) == list)
137 if __name__ ==
'__main__':
138 rostest.rosrun(
'test_rosgrap',
'test_rosgraph_masterapi_online', MasterApiOnlineTest)
def test_getSystemState(self)
def test_searchParam(self)
def test_registerService(self)
def test_getParamNames(self)
def test_registerPublisher(self)
def test_getPublishedTopics(self)
def test_unregisterSubscriber(self)
def test_lookupService(self)
def test_lookupNode(self)
def test_unregisterService(self)
def test_unregisterPublisher(self)
def test_getTopicTypes(self)
def test_registerSubscriber(self)