19 sys.path.insert(1,
"../RTM_IDL")
20 sys.path.insert(1,
"../")
23 from omniORB
import CORBA
30 configsample_spec = [
"implementation_id",
"TestComp",
31 "type_name",
"TestComp",
32 "description",
"Test example component",
34 "vendor",
"Shinji Kurihara, AIST",
35 "category",
"example",
36 "activity_type",
"DataFlowComponent",
39 "lang_type",
"compile",
41 "conf.default.int_param0",
"0",
42 "conf.default.int_param1",
"1",
43 "conf.default.double_param0",
"0.11",
44 "conf.default.double_param1",
"9.9",
45 "conf.default.str_param0",
"hoge",
46 "conf.default.str_param1",
"dara",
47 "conf.default.vector_param0",
"0.0,1.0,2.0,3.0,4.0",
54 OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
61 manager.registerFactory(profile,
64 com = manager.createComponent(
"TestComp")
68 print manager.registerECFactory(
"Art",
71 manager.createComponent(
"TestEc")
80 self.managerservant.__del__()
83 OpenRTM_aist.Manager.instance().shutdown()
88 def test_terminate(self): 89 self.managerservant.terminate() 91 def test_shutdown(self): 92 self.managerservant.runManager(True) 95 self.managerservant.shutdown() 96 #self.managerservant.runManager() 100 self.managerservant.load_module(
"hoge",
"echo")
101 self.assertNotEqual(len(self.managerservant.get_loaded_modules()), 0)
102 self.assertNotEqual(len(self.managerservant.get_loadable_modules()), 0)
103 self.managerservant.unload_module(
"hoge")
107 self.assertEqual(self.managerservant.get_loaded_modules(),[])
111 self.assertNotEqual(self.managerservant.get_loadable_modules(),[])
116 self.managerservant.get_factory_profiles()
120 mgr=OpenRTM_aist.Manager.init(sys.argv)
121 mgr.activateManager()
122 self.managerservant.load_module(
"test_ManagerServant",
"TestCompInit")
123 com = self.managerservant.create_component(
"TestComp")
124 self.managerservant.unload_module(
"test_ManagerServant")
125 self.assertNotEqual(com,
None)
126 self.managerservant.delete_component(
"TestComp")
127 mgr.shutdownComponents()
132 self.assertEqual(self.managerservant.get_components(),[])
137 self.assertEqual(self.managerservant.get_component_profiles(),[])
142 self.managerservant.get_profile()
146 self.managerservant.get_configuration()
150 self.assertEqual(self.managerservant.set_configuration(
"test_name",
"test_value"),RTC.RTC_OK)
155 self.assertEqual(self.managerservant.get_service(
"test"),CORBA.Object._nil)
160 if not self.managerservant.createINSManager():
161 poa=OpenRTM_aist.Manager.instance().getORB().resolve_initial_references(
"omniINSPOA")
162 poa.deactivate_object(OpenRTM_aist.Manager.instance().getConfig().getProperty(
"manager.name"))
163 self.assertEqual(self.managerservant.createINSManager(),
True)
165 self.assertEqual(self.managerservant.createINSManager(),
False)
166 self.assertNotEqual(self.managerservant.getObjRef(),CORBA.Object._nil)
170 self.assertEqual(self.managerservant.is_master(),
False)
174 self.assertEqual(len(self.managerservant.get_master_managers()),0)
178 if not self.managerservant.createINSManager():
179 poa=OpenRTM_aist.Manager.instance().getORB().resolve_initial_references(
"omniINSPOA")
180 poa.deactivate_object(OpenRTM_aist.Manager.instance().getConfig().getProperty(
"manager.name"))
181 self.assertEqual(self.managerservant.createINSManager(),
True)
183 self.assertEqual(self.managerservant.createINSManager(),
False)
184 self.assertEqual(len(self.managerservant.get_master_managers()),0)
185 host_port =
"localhost:2810" 186 owner = self.managerservant.findManager(host_port)
187 self.assertEqual(self.managerservant.add_master_manager(owner),RTC.RTC_OK)
188 self.assertEqual(len(self.managerservant.get_master_managers()),1)
189 self.assertEqual(self.managerservant.remove_master_manager(owner),RTC.RTC_OK)
190 self.assertEqual(len(self.managerservant.get_master_managers()),0)
196 if not self.managerservant.createINSManager():
197 poa=OpenRTM_aist.Manager.instance().getORB().resolve_initial_references(
"omniINSPOA")
198 poa.deactivate_object(OpenRTM_aist.Manager.instance().getConfig().getProperty(
"manager.name"))
199 self.assertEqual(self.managerservant.createINSManager(),
True)
201 self.assertEqual(self.managerservant.createINSManager(),
False)
202 self.assertEqual(len(self.managerservant.get_slave_managers()),0)
203 host_port =
"localhost:2810" 204 owner = self.managerservant.findManager(host_port)
205 self.assertEqual(self.managerservant.add_slave_manager(owner),RTC.RTC_OK)
206 self.assertEqual(len(self.managerservant.get_slave_managers()),1)
207 self.assertEqual(self.managerservant.remove_slave_manager(owner),RTC.RTC_OK)
208 self.assertEqual(len(self.managerservant.get_slave_managers()),0)
213 def test_forck(self): 214 self.assertEqual(self.managerservant.fork(),RTC.RTC_OK) 217 def test_shutdown(self): 218 #self.assertEqual(self.managerservant.shutdown(),RTC.RTC_OK) 221 def test_restart(self): 222 self.assertEqual(self.managerservant.restart(),RTC.RTC_OK) 230 if __name__ ==
'__main__':
def test_create_component(self)
def test_load_unload(self)
def test_get_profile(self)
def test_set_configuration(self)
def test_get_component_profiles(self)
def test_get_service(self)
def test_get_factory_profiles(self)
The Properties class represents a persistent set of properties.
def test_get_loadable_modules(self)
def test_get_master_managers(self)
def test_get_configuration(self)
def test_slave_managers(self)
def __init_(self, manager)
def TestCompInit(manager)
def test_get_components(self)
def test_get_loaded_modules(self)
def test_master_manager(self)
DataFlowComponentBase class.