19 sys.path.insert(1,
"../RTM_IDL")
20 sys.path.insert(1,
"../")
23 from omniORB
import CORBA
30 configsample_spec = [
"implementation_id",
"TestComp",
31 "type_name",
"TestComp",
32 "description",
"Test example component",
34 "vendor",
"Shinji Kurihara, AIST",
35 "category",
"example",
36 "activity_type",
"DataFlowComponent",
39 "lang_type",
"compile",
41 "conf.default.int_param0",
"0",
42 "conf.default.int_param1",
"1",
43 "conf.default.double_param0",
"0.11",
44 "conf.default.double_param1",
"9.9",
45 "conf.default.str_param0",
"hoge",
46 "conf.default.str_param1",
"dara",
47 "conf.default.vector_param0",
"0.0,1.0,2.0,3.0,4.0",
54 OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
61 manager.registerFactory(profile,
64 com = manager.createComponent(
"TestComp")
68 print manager.registerECFactory(
"Art",
71 manager.createComponent(
"TestEc")
83 OpenRTM_aist.Manager.instance().shutdown()
88 def test_terminate(self): 89 self.managerservant.terminate() 91 def test_shutdown(self): 92 self.managerservant.runManager(True) 95 self.managerservant.shutdown() 96 #self.managerservant.runManager() 101 self.assertNotEqual(len(self.
managerservant.get_loaded_modules()), 0)
102 self.assertNotEqual(len(self.
managerservant.get_loadable_modules()), 0)
111 self.assertNotEqual(self.
managerservant.get_loadable_modules(),[])
120 mgr=OpenRTM_aist.Manager.init(sys.argv)
121 mgr.activateManager()
122 self.
managerservant.load_module(
"test_ManagerServant",
"TestCompInit")
125 self.assertNotEqual(com,
None)
127 mgr.shutdownComponents()
150 self.assertEqual(self.
managerservant.set_configuration(
"test_name",
"test_value"),RTC.RTC_OK)
155 self.assertEqual(self.
managerservant.get_service(
"test"),CORBA.Object._nil)
161 poa=OpenRTM_aist.Manager.instance().getORB().resolve_initial_references(
"omniINSPOA")
162 poa.deactivate_object(OpenRTM_aist.Manager.instance().getConfig().getProperty(
"manager.name"))
166 self.assertNotEqual(self.
managerservant.getObjRef(),CORBA.Object._nil)
174 self.assertEqual(len(self.
managerservant.get_master_managers()),0)
179 poa=OpenRTM_aist.Manager.instance().getORB().resolve_initial_references(
"omniINSPOA")
180 poa.deactivate_object(OpenRTM_aist.Manager.instance().getConfig().getProperty(
"manager.name"))
184 self.assertEqual(len(self.
managerservant.get_master_managers()),0)
185 host_port =
"localhost:2810" 187 self.assertEqual(self.
managerservant.add_master_manager(owner),RTC.RTC_OK)
188 self.assertEqual(len(self.
managerservant.get_master_managers()),1)
189 self.assertEqual(self.
managerservant.remove_master_manager(owner),RTC.RTC_OK)
190 self.assertEqual(len(self.
managerservant.get_master_managers()),0)
197 poa=OpenRTM_aist.Manager.instance().getORB().resolve_initial_references(
"omniINSPOA")
198 poa.deactivate_object(OpenRTM_aist.Manager.instance().getConfig().getProperty(
"manager.name"))
203 host_port =
"localhost:2810" 205 self.assertEqual(self.
managerservant.add_slave_manager(owner),RTC.RTC_OK)
207 self.assertEqual(self.
managerservant.remove_slave_manager(owner),RTC.RTC_OK)
213 def test_forck(self): 214 self.assertEqual(self.managerservant.fork(),RTC.RTC_OK) 217 def test_shutdown(self): 218 #self.assertEqual(self.managerservant.shutdown(),RTC.RTC_OK) 221 def test_restart(self): 222 self.assertEqual(self.managerservant.restart(),RTC.RTC_OK) 230 if __name__ ==
'__main__':
def test_create_component(self)
def test_load_unload(self)
def test_get_profile(self)
def test_set_configuration(self)
def test_get_component_profiles(self)
def test_get_service(self)
def test_get_factory_profiles(self)
The Properties class represents a persistent set of properties.
def test_get_loadable_modules(self)
def test_get_master_managers(self)
def test_get_configuration(self)
def test_slave_managers(self)
def __init_(self, manager)
def TestCompInit(manager)
def test_get_components(self)
def test_get_loaded_modules(self)
def test_master_manager(self)
DataFlowComponentBase class.