00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 import roslib; roslib.load_manifest('test_rosgraph')
00034
00035 import os
00036 import sys
00037 import unittest
00038
00039 import rosgraph.masterapi
00040 import rostest
00041
00042 _ID = '/caller_id'
00043
00044 class MasterApiOnlineTest(unittest.TestCase):
00045
00046 def setUp(self):
00047 self.m = rosgraph.masterapi.Master(_ID)
00048
00049 def test_getPid(self):
00050 val = self.m.getPid()
00051 val = int(val)
00052
00053 def test_getUri(self):
00054 val = self.m.getUri()
00055 self.assert_(val.startswith('http://'))
00056
00057 def test_lookupService(self):
00058 uri = 'http://localhost:897'
00059 rpcuri = 'rosrpc://localhost:9812'
00060 self.m.registerService('/bar/service', rpcuri, uri)
00061 self.assertEquals(rpcuri, self.m.lookupService('/bar/service'))
00062 try:
00063 self.assertEquals(uri, self.m.lookupService('/fake/service'))
00064 self.fail("should have thrown")
00065 except rosgraph.masterapi.Error:
00066 pass
00067
00068 def test_registerService(self):
00069 self.m.registerService('/bar/service', 'rosrpc://localhost:9812', 'http://localhost:893')
00070
00071 def test_unregisterService(self):
00072 self.m.registerService('/unreg_service/service', 'rosrpc://localhost:9812', 'http://localhost:893')
00073 val = self.m.registerService('/unreg_service/service', 'rosrpc://localhost:9812', 'http://localhost:893')
00074 self.assertEquals(1, val)
00075
00076 def test_registerSubscriber(self):
00077 val = self.m.registerSubscriber('/reg_sub/node', 'std_msgs/String', 'http://localhost:9812')
00078 self.assertEquals([], val)
00079
00080 def test_unregisterSubscriber(self):
00081 self.m.registerSubscriber('/reg_unsub/node', 'std_msgs/String', 'http://localhost:9812')
00082 val = self.m.unregisterSubscriber('/reg_unsub/node', 'http://localhost:9812')
00083 self.assertEquals(1, val)
00084
00085 def test_registerPublisher(self):
00086 val = self.m.registerPublisher('/reg_pub/topic', 'std_msgs/String', 'http://localhost:9812')
00087
00088 def test_unregisterPublisher(self):
00089 uri = 'http://localhost:9812'
00090 self.m.registerPublisher('/unreg_pub/fake_topic', 'std_msgs/String', uri)
00091 self.m.unregisterPublisher('/unreg_pub/fake_topic', uri)
00092
00093 def test_lookupNode(self):
00094
00095 uri = 'http://localhost:12345'
00096 self.m.registerPublisher('fake_topic', 'std_msgs/String', uri)
00097 self.assertEquals(uri, self.m.lookupNode(_ID))
00098
00099 try:
00100 self.m.lookupNode('/non/existent')
00101 self.fail("should have thrown")
00102 except rosgraph.masterapi.Error:
00103 pass
00104
00105 def test_getPublishedTopics(self):
00106 topics = self.m.getPublishedTopics('/')
00107
00108 def test_getTopicTypes(self):
00109 topic_types = self.m.getTopicTypes()
00110
00111 def test_getSystemState(self):
00112 pub, sub, srvs = self.m.getSystemState()
00113
00114 def test_is_online(self):
00115 self.assert_(rosgraph.masterapi.is_online())
00116 self.assert_(self.m.is_online())
00117
00118 def test_getParam(self):
00119 try:
00120 self.m.getParam('fake_param')
00121 self.fail("should have failed to lookup fake parameter")
00122 except rosgraph.masterapi.Error:
00123 pass
00124
00125 def test_hasParam(self):
00126 self.failIf(self.m.hasParam('fake_param'), "should have failed to lookup fake parameter")
00127 self.assert_(self.m.hasParam('/run_id'), "should have failed to lookup fake parameter")
00128
00129 def test_setParam(self):
00130 self.m.setParam('/foo', 1)
00131
00132 def test_searchParam(self):
00133 self.assertEquals("/run_id", self.m.searchParam('run_id'))
00134
00135 def test_getParamNames(self):
00136 self.assert_(type(self.m.getParamNames()) == list)
00137
00138
00139 if __name__ == '__main__':
00140 rostest.rosrun('test_rosgrap', 'test_rosgraph_masterapi_online', MasterApiOnlineTest)
00141