00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 import sys
00019 sys.path.insert(1,"../")
00020
00021 import unittest
00022 import time
00023
00024
00025 import OpenRTM_aist
00026 import RTC, RTC__POA
00027
00028 testcomp_spec = ["implementation_id", "TestComp",
00029 "type_name", "TestComp",
00030 "description", "Test example component",
00031 "version", "1.0",
00032 "vendor", "Shinji Kurihara, AIST",
00033 "category", "example",
00034 "activity_type", "DataFlowComponent",
00035 "max_instance", "10",
00036 "language", "C++",
00037 "lang_type", "compile",
00038
00039 "conf.default.int_param0", "0",
00040 "conf.default.int_param1", "1",
00041 "conf.default.double_param0", "0.11",
00042 "conf.default.double_param1", "9.9",
00043 "conf.default.str_param0", "hoge",
00044 "conf.default.str_param1", "dara",
00045 "conf.default.vector_param0", "0.0,1.0,2.0,3.0,4.0",
00046 ""]
00047
00048 class TestComp(OpenRTM_aist.DataFlowComponentBase):
00049 def __init__(self, manager):
00050 OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
00051 return
00052
00053
00054 def TestCompInit(manager):
00055 profile = OpenRTM_aist.Properties(defaults_str=testcomp_spec)
00056 manager.registerFactory(profile,
00057 TestComp,
00058 OpenRTM_aist.Delete)
00059
00060
00061 def MyModuleInit(manager):
00062 TestCompInit(manager)
00063 com = manager.createComponent("TestComp")
00064
00065 def TestEcInit(manager):
00066 profile = OpenRTM_aist.Properties(defaults_str=testcomp_spec)
00067 manager.registerECFactory("Art",
00068 TestComp,
00069 OpenRTM_aist.Delete)
00070 manager.createComponent("TestEc")
00071
00072
00073 class TestManager(unittest.TestCase):
00074
00075 def setUp(self):
00076 global manager
00077
00078
00079 self.manager = OpenRTM_aist.Manager.init(sys.argv)
00080
00081 def tearDown(self):
00082 self.manager.shutdownManager()
00083 time.sleep(0.1)
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099 def test_init(self):
00100
00101 return
00102
00103 def test_instance(self):
00104 self.assertEqual(OpenRTM_aist.Manager.instance(), self.manager)
00105 return
00106
00107
00108 def test_load_unload(self):
00109 self.manager.load("hoge", "echo")
00110 self.manager.unload("hoge")
00111 self.manager.unloadAll()
00112 self.manager.load("hoge", "echo")
00113 self.assertEqual(len(self.manager.getLoadedModules()), 1)
00114
00115 return
00116
00117 def test_getLoadedModules(self):
00118 self.manager.activateManager()
00119 self.assertEqual(self.manager.getLoadedModules(),[])
00120 return
00121
00122 def test_getLoadableModules(self):
00123 self.manager.activateManager()
00124
00125
00126 return
00127
00128 def test_registerFactory(self):
00129 profile = OpenRTM_aist.Properties(defaults_str=testcomp_spec)
00130 self.assertEqual(self.manager.registerFactory(profile,
00131 TestComp,
00132 OpenRTM_aist.Delete),
00133 True)
00134 return
00135
00136 def test_getFactoryProfiles(self):
00137 self.manager.getFactoryProfiles()
00138 return
00139
00140 def test_registerECFactory(self):
00141 profile = OpenRTM_aist.Properties(defaults_str=testcomp_spec)
00142 self.assertEqual(self.manager.registerECFactory("Art",
00143 TestComp,
00144 OpenRTM_aist.Delete),
00145 True)
00146 return
00147
00148 def test_getModulesFactories(self):
00149 self.assertEqual(self.manager.getModulesFactories()[0],'PeriodicECSharedComposite')
00150 return
00151
00152 def test_createComponent(self):
00153 self.manager.activateManager()
00154 profile = OpenRTM_aist.Properties(defaults_str=testcomp_spec)
00155 self.manager.registerFactory(profile,
00156 TestComp,
00157 OpenRTM_aist.Delete)
00158 com = self.manager.createComponent("TestComp")
00159 self.assertNotEqual(com,None)
00160
00161
00162 self.manager.shutdownComponents()
00163 time.sleep(0.1)
00164 return
00165
00166 def test_getORB(self):
00167 self.manager.getORB()
00168 return
00169
00170 def test_getPOA(self):
00171 self.manager.getPOA()
00172 return
00173
00174 def test_getPOAManager(self):
00175 self.manager.getPOAManager()
00176 return
00177
00178
00179 def test_registerComponent(self):
00180 self.manager.activateManager()
00181 profile = OpenRTM_aist.Properties(defaults_str=testcomp_spec)
00182 self.manager.registerFactory(profile,
00183 TestComp,
00184 OpenRTM_aist.Delete)
00185 com = self.manager.createComponent("TestComp")
00186 self.assertEqual(self.manager.registerComponent(com),True)
00187 self.manager.shutdownComponents()
00188 return
00189
00190
00191
00192
00193
00194
00195
00196
00197
00198
00199
00200
00201 def test_createContext(self):
00202 ec_args = "PeriodicExecutionContext?rate="
00203 self.manager.activateManager()
00204 ec = self.manager.createContext(ec_args)
00205 self.assertNotEqual(ec,None)
00206 ec.stop()
00207 self.manager.getPOA().deactivate_object(self.manager.getPOA().servant_to_id(ec))
00208 del ec
00209 return
00210
00211
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226 def test_createORBEndpointOption(self):
00227 self.manager.activateManager()
00228 self.manager.createORBEndpointOption("",[])
00229 return
00230
00231
00232
00233
00234
00235
00236
00237
00238
00239
00240
00241 def test_cleanupComponent(self):
00242 self.manager.activateManager()
00243 profile = OpenRTM_aist.Properties(defaults_str=testcomp_spec)
00244 self.manager.registerFactory(profile,
00245 TestComp,
00246 OpenRTM_aist.Delete)
00247 com = self.manager.createComponent("TestComp")
00248 self.assertEqual(self.manager.registerComponent(com),True)
00249 self.manager.cleanupComponent(com)
00250 com.exit()
00251 self.manager.shutdownComponents()
00252 return
00253
00254
00255 def test_notifyFinalized(self):
00256 self.manager.activateManager()
00257 profile = OpenRTM_aist.Properties(defaults_str=testcomp_spec)
00258 self.manager.registerFactory(profile,
00259 TestComp,
00260 OpenRTM_aist.Delete)
00261 com = self.manager.createComponent("TestComp")
00262
00263
00264 self.manager.shutdownComponents()
00265 return
00266
00267
00268 def test_initManager(self):
00269 self.manager.initManager(sys.argv)
00270
00271 return
00272
00273 def test_initLogger(self):
00274 self.manager.initLogger()
00275
00276 return
00277
00278 def test_initORB(self):
00279
00280
00281 return
00282
00283
00284 def test_initNaming(self):
00285 self.assertEqual(self.manager.initNaming(),True)
00286
00287 return
00288
00289
00290 def test_initExecContext(self):
00291 self.assertEqual(self.manager.initExecContext(),True)
00292 return
00293
00294 def test_initComposite(self):
00295 self.assertEqual(self.manager.initComposite(),True)
00296 return
00297
00298
00299 def test_initFactories(self):
00300 self.assertEqual(self.manager.initFactories(),True)
00301 return
00302
00303 def test_initManagerServant(self):
00304 self.assertEqual(self.manager.initManagerServant(), True)
00305 return
00306
00307
00308 def test_procComponentArgs(self):
00309 comp_id = OpenRTM_aist.Properties()
00310 comp_conf = OpenRTM_aist.Properties()
00311
00312 self.assertEqual(self.manager.procComponentArgs("TestComp?instance_name=test&exported_ports=ConsoleIn0.out,ConsoleOut0.in",comp_id,comp_conf),True)
00313 self.assertEqual(comp_id.getProperty("implementation_id"),"TestComp")
00314 self.assertEqual(comp_conf.getProperty("instance_name"),"test")
00315 self.assertEqual(comp_conf.getProperty("exported_ports"),"ConsoleIn0.out,ConsoleOut0.in")
00316 return
00317
00318 def test_procContextArgs(self):
00319 ec_id = [""]
00320 ec_conf = OpenRTM_aist.Properties()
00321 self.assertEqual(self.manager.procContextArgs("PeriodicExecutionContext?rate=1000",ec_id,ec_conf),True)
00322 self.assertEqual(ec_id[0],"PeriodicExecutionContext")
00323 self.assertEqual(ec_conf.getProperty("rate"),"1000")
00324 return
00325
00326 def test_configureComponent(self):
00327 self.manager.activateManager()
00328 profile = OpenRTM_aist.Properties(defaults_str=testcomp_spec)
00329 self.manager.registerFactory(profile,
00330 TestComp,
00331 OpenRTM_aist.Delete)
00332 com = self.manager.createComponent("TestComp")
00333 prop = OpenRTM_aist.Properties()
00334 self.manager.configureComponent(com,prop)
00335 self.manager.cleanupComponent(com)
00336 com.exit()
00337 self.manager.shutdownComponents()
00338 return
00339
00340 def test_formatString(self):
00341 profile = OpenRTM_aist.Properties(defaults_str=testcomp_spec)
00342 self.assertEqual(self.manager.formatString("rtc.log",profile),"rtc.log")
00343 print self.manager.formatString("$(PWD)",profile)
00344 print self.manager.formatString("%n.%t.%m.%v.%V.%c.%h.%M.%p",profile)
00345 return
00346
00347 def test_getLogbuf(self):
00348 self.manager.getLogbuf()
00349 return
00350
00351 def test_getConfig(self):
00352 self.manager.getConfig()
00353 return
00354
00355
00356
00357
00358
00359 if __name__ == '__main__':
00360 unittest.main()