00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 import sys,time
00019 sys.path.insert(1,"../RTM_IDL")
00020 sys.path.insert(1,"../")
00021
00022 import unittest
00023 from omniORB import CORBA
00024
00025
00026 import OpenRTM_aist
00027 import RTC, RTC__POA
00028 import RTM, RTM__POA
00029
00030 configsample_spec = ["implementation_id", "TestComp",
00031 "type_name", "TestComp",
00032 "description", "Test example component",
00033 "version", "1.0",
00034 "vendor", "Shinji Kurihara, AIST",
00035 "category", "example",
00036 "activity_type", "DataFlowComponent",
00037 "max_instance", "10",
00038 "language", "Python",
00039 "lang_type", "compile",
00040
00041 "conf.default.int_param0", "0",
00042 "conf.default.int_param1", "1",
00043 "conf.default.double_param0", "0.11",
00044 "conf.default.double_param1", "9.9",
00045 "conf.default.str_param0", "hoge",
00046 "conf.default.str_param1", "dara",
00047 "conf.default.vector_param0", "0.0,1.0,2.0,3.0,4.0",
00048 ""]
00049
00050 com = None
00051
00052 class TestComp(OpenRTM_aist.DataFlowComponentBase):
00053 def __init_(self, manager):
00054 OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
00055
00056
00057 def TestCompInit(manager):
00058 print "TestCompInit"
00059 global com
00060 profile = OpenRTM_aist.Properties(defaults_str=configsample_spec)
00061 manager.registerFactory(profile,
00062 TestComp,
00063 OpenRTM_aist.Delete)
00064 com = manager.createComponent("TestComp")
00065
00066 def TestEcInit(manager):
00067 profile = OpenRTM_aist.Properties(defaults_str=configsample_spec)
00068 print manager.registerECFactory("Art",
00069 TestComp,
00070 OpenRTM_aist.Delete)
00071 manager.createComponent("TestEc")
00072
00073
00074 class TestManagerServant(unittest.TestCase):
00075
00076 def setUp(self):
00077 self.managerservant = OpenRTM_aist.ManagerServant()
00078
00079 def tearDown(self):
00080 self.managerservant.__del__()
00081 del self.managerservant
00082
00083 OpenRTM_aist.Manager.instance().shutdown()
00084 time.sleep(0.1)
00085 return
00086
00087 """
00088 def test_terminate(self):
00089 self.managerservant.terminate()
00090
00091 def test_shutdown(self):
00092 self.managerservant.runManager(True)
00093 import time
00094 time.sleep(0.1)
00095 self.managerservant.shutdown()
00096 #self.managerservant.runManager()
00097 """
00098
00099 def test_load_unload(self):
00100 self.managerservant.load_module("hoge", "echo")
00101 self.assertNotEqual(len(self.managerservant.get_loaded_modules()), 0)
00102 self.assertNotEqual(len(self.managerservant.get_loadable_modules()), 0)
00103 self.managerservant.unload_module("hoge")
00104 return
00105
00106 def test_get_loaded_modules(self):
00107 self.assertEqual(self.managerservant.get_loaded_modules(),[])
00108 return
00109
00110 def test_get_loadable_modules(self):
00111 self.assertNotEqual(self.managerservant.get_loadable_modules(),[])
00112 return
00113
00114
00115 def test_get_factory_profiles(self):
00116 self.managerservant.get_factory_profiles()
00117 return
00118
00119 def test_create_component(self):
00120 mgr=OpenRTM_aist.Manager.init(sys.argv)
00121 mgr.activateManager()
00122 self.managerservant.load_module("test_ManagerServant", "TestCompInit")
00123 com = self.managerservant.create_component("TestComp")
00124 self.managerservant.unload_module("test_ManagerServant")
00125 self.assertNotEqual(com,None)
00126 self.managerservant.delete_component("TestComp")
00127 mgr.shutdownComponents()
00128 return
00129
00130
00131 def test_get_components(self):
00132 self.assertEqual(self.managerservant.get_components(),[])
00133 return
00134
00135
00136 def test_get_component_profiles(self):
00137 self.assertEqual(self.managerservant.get_component_profiles(),[])
00138 return
00139
00140
00141 def test_get_profile(self):
00142 self.managerservant.get_profile()
00143 return
00144
00145 def test_get_configuration(self):
00146 self.managerservant.get_configuration()
00147 return
00148
00149 def test_set_configuration(self):
00150 self.assertEqual(self.managerservant.set_configuration("test_name", "test_value"),RTC.RTC_OK)
00151 return
00152
00153
00154 def test_get_service(self):
00155 self.assertEqual(self.managerservant.get_service("test"),CORBA.Object._nil)
00156 return
00157
00158
00159 def test_getObjRef(self):
00160 if not self.managerservant.createINSManager():
00161 poa=OpenRTM_aist.Manager.instance().getORB().resolve_initial_references("omniINSPOA")
00162 poa.deactivate_object(OpenRTM_aist.Manager.instance().getConfig().getProperty("manager.name"))
00163 self.assertEqual(self.managerservant.createINSManager(),True)
00164 else:
00165 self.assertEqual(self.managerservant.createINSManager(),False)
00166 self.assertNotEqual(self.managerservant.getObjRef(),CORBA.Object._nil)
00167 return
00168
00169 def test_is_master(self):
00170 self.assertEqual(self.managerservant.is_master(),False)
00171 return
00172
00173 def test_get_master_managers(self):
00174 self.assertEqual(len(self.managerservant.get_master_managers()),0)
00175 return
00176
00177 def test_master_manager(self):
00178 if not self.managerservant.createINSManager():
00179 poa=OpenRTM_aist.Manager.instance().getORB().resolve_initial_references("omniINSPOA")
00180 poa.deactivate_object(OpenRTM_aist.Manager.instance().getConfig().getProperty("manager.name"))
00181 self.assertEqual(self.managerservant.createINSManager(),True)
00182 else:
00183 self.assertEqual(self.managerservant.createINSManager(),False)
00184 self.assertEqual(len(self.managerservant.get_master_managers()),0)
00185 host_port = "localhost:2810"
00186 owner = self.managerservant.findManager(host_port)
00187 self.assertEqual(self.managerservant.add_master_manager(owner),RTC.RTC_OK)
00188 self.assertEqual(len(self.managerservant.get_master_managers()),1)
00189 self.assertEqual(self.managerservant.remove_master_manager(owner),RTC.RTC_OK)
00190 self.assertEqual(len(self.managerservant.get_master_managers()),0)
00191
00192 return
00193
00194
00195 def test_slave_managers(self):
00196 if not self.managerservant.createINSManager():
00197 poa=OpenRTM_aist.Manager.instance().getORB().resolve_initial_references("omniINSPOA")
00198 poa.deactivate_object(OpenRTM_aist.Manager.instance().getConfig().getProperty("manager.name"))
00199 self.assertEqual(self.managerservant.createINSManager(),True)
00200 else:
00201 self.assertEqual(self.managerservant.createINSManager(),False)
00202 self.assertEqual(len(self.managerservant.get_slave_managers()),0)
00203 host_port = "localhost:2810"
00204 owner = self.managerservant.findManager(host_port)
00205 self.assertEqual(self.managerservant.add_slave_manager(owner),RTC.RTC_OK)
00206 self.assertEqual(len(self.managerservant.get_slave_managers()),1)
00207 self.assertEqual(self.managerservant.remove_slave_manager(owner),RTC.RTC_OK)
00208 self.assertEqual(len(self.managerservant.get_slave_managers()),0)
00209 return
00210
00211
00212 """
00213 def test_forck(self):
00214 self.assertEqual(self.managerservant.fork(),RTC.RTC_OK)
00215 return
00216
00217 def test_shutdown(self):
00218 #self.assertEqual(self.managerservant.shutdown(),RTC.RTC_OK)
00219 return
00220
00221 def test_restart(self):
00222 self.assertEqual(self.managerservant.restart(),RTC.RTC_OK)
00223 return
00224
00225 """
00226
00227
00228
00229
00230 if __name__ == '__main__':
00231 unittest.main()