19 sys.path.insert(1,
"../RTM_IDL")
20 sys.path.insert(1,
"../")
23 from omniORB
import CORBA
30 configsample_spec = [
"implementation_id",
"TestComp",
31 "type_name",
"TestComp",
32 "description",
"Test example component",
34 "vendor",
"Shinji Kurihara, AIST",
35 "category",
"example",
36 "activity_type",
"DataFlowComponent",
39 "lang_type",
"compile",
41 "conf.default.int_param0",
"0",
42 "conf.default.int_param1",
"1",
43 "conf.default.double_param0",
"0.11",
44 "conf.default.double_param1",
"9.9",
45 "conf.default.str_param0",
"hoge",
46 "conf.default.str_param1",
"dara",
47 "conf.default.vector_param0",
"0.0,1.0,2.0,3.0,4.0",
54 OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
61 manager.registerFactory(profile,
64 com = manager.createComponent(
"TestComp")
68 print(manager.registerECFactory(
"Art",
71 manager.createComponent(
"TestEc")
83 OpenRTM_aist.Manager.instance().shutdown()
88 def test_terminate(self):
89 self.managerservant.terminate()
91 def test_shutdown(self):
92 self.managerservant.runManager(True)
95 self.managerservant.shutdown()
96 #self.managerservant.runManager()
101 self.assertNotEqual(len(self.
managerservant.get_loaded_modules()), 0)
102 self.assertNotEqual(len(self.
managerservant.get_loadable_modules()), 0)
111 self.assertNotEqual(self.
managerservant.get_loadable_modules(),[])
120 mgr=OpenRTM_aist.Manager.init(sys.argv)
121 mgr.activateManager()
122 self.
managerservant.load_module(
"test_ManagerServant",
"TestCompInit")
125 self.assertNotEqual(com,
None)
127 mgr.shutdownComponents()
150 self.assertEqual(self.
managerservant.set_configuration(
"test_name",
"test_value"),RTC.RTC_OK)
155 self.assertEqual(self.
managerservant.get_service(
"test"),CORBA.Object._nil)
161 poa=OpenRTM_aist.Manager.instance().getORB().resolve_initial_references(
"omniINSPOA")
162 poa.deactivate_object(OpenRTM_aist.Manager.instance().getConfig().getProperty(
"manager.name"))
166 self.assertNotEqual(self.
managerservant.getObjRef(),CORBA.Object._nil)
174 self.assertEqual(len(self.
managerservant.get_master_managers()),0)
179 poa=OpenRTM_aist.Manager.instance().getORB().resolve_initial_references(
"omniINSPOA")
180 poa.deactivate_object(OpenRTM_aist.Manager.instance().getConfig().getProperty(
"manager.name"))
184 self.assertEqual(len(self.
managerservant.get_master_managers()),0)
185 host_port =
"localhost:2810"
187 self.assertEqual(self.
managerservant.add_master_manager(owner),RTC.RTC_OK)
188 self.assertEqual(len(self.
managerservant.get_master_managers()),1)
189 self.assertEqual(self.
managerservant.remove_master_manager(owner),RTC.RTC_OK)
190 self.assertEqual(len(self.
managerservant.get_master_managers()),0)
197 poa=OpenRTM_aist.Manager.instance().getORB().resolve_initial_references(
"omniINSPOA")
198 poa.deactivate_object(OpenRTM_aist.Manager.instance().getConfig().getProperty(
"manager.name"))
203 host_port =
"localhost:2810"
205 self.assertEqual(self.
managerservant.add_slave_manager(owner),RTC.RTC_OK)
207 self.assertEqual(self.
managerservant.remove_slave_manager(owner),RTC.RTC_OK)
213 def test_forck(self):
214 self.assertEqual(self.managerservant.fork(),RTC.RTC_OK)
217 def test_shutdown(self):
218 #self.assertEqual(self.managerservant.shutdown(),RTC.RTC_OK)
221 def test_restart(self):
222 self.assertEqual(self.managerservant.restart(),RTC.RTC_OK)
230 if __name__ ==
'__main__':