00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 import sys,time
00019 sys.path.insert(1,"../")
00020 sys.path.insert(1,"../RTM_IDL")
00021
00022 import RTC
00023 import OpenRTM
00024 import SDOPackage
00025 import OpenRTM_aist
00026 from omniORB import CORBA, PortableServer
00027 from omniORB import any
00028 import unittest
00029
00030 configsample_spec = ["implementation_id", "TestComp",
00031 "type_name", "TestComp",
00032 "description", "Test example component",
00033 "version", "1.0",
00034 "vendor", "Shinji Kurihara, AIST",
00035 "category", "example",
00036 "activity_type", "DataFlowComponent",
00037 "max_instance", "10",
00038 "language", "Python",
00039 "lang_type", "script",
00040
00041 "conf.default.int_param0", "0",
00042 "conf.default.int_param1", "1",
00043 "conf.default.double_param0", "0.11",
00044 "conf.default.double_param1", "9.9",
00045 "conf.default.str_param0", "hoge",
00046 "conf.default.str_param1", "dara",
00047 "conf.default.vector_param0", "0.0,1.0,2.0,3.0,4.0",
00048 ""]
00049
00050 com = None
00051
00052 class TestComp(OpenRTM_aist.RTObject_impl):
00053 def __init__(self, orb_, poa_):
00054 OpenRTM_aist.RTObject_impl.__init__(self, orb=orb_, poa=poa_)
00055
00056 def onInitialize(self):
00057 print "onInitialize"
00058 return RTC.RTC_OK
00059
00060 def onFinalize(self):
00061 print "onFinalize"
00062 return RTC.RTC_OK
00063
00064 def onStartup(self, ec_id):
00065 print "onStartup"
00066 return RTC.RTC_OK
00067
00068 def onShutdown(self, ec_id):
00069 print "onSutdown"
00070 return RTC.RTC_OK
00071
00072 def onActivated(self, ec_id):
00073 print "onActivated"
00074 return RTC.RTC_OK
00075
00076 def onDeactivated(self, ec_id):
00077 print "onDeactivated"
00078 return RTC.RTC_OK
00079
00080 def onExecute(self, ec_id):
00081 print "onExecute"
00082 return RTC.RTC_OK
00083
00084 def onAborting(self, ec_id):
00085 print "onAborting"
00086 return RTC.RTC_OK
00087
00088 def onReset(self, ec_id):
00089 print "onReset"
00090 return RTC.RTC_OK
00091
00092 def onStateUpdate(self, ec_id):
00093 print "onStateUpdate"
00094 return RTC.RTC_OK
00095
00096 def onRateChanged(self, ec_id):
00097 print "onRateChanged"
00098 return RTC.RTC_OK
00099
00100
00101 def TestCompInit(manager):
00102 print "TestCompInit"
00103 global com
00104 profile = OpenRTM_aist.Properties(defaults_str=configsample_spec)
00105 manager.registerFactory(profile,
00106 TestComp,
00107 OpenRTM_aist.Delete)
00108 com = manager.createComponent("TestComp")
00109
00110
00111 class MySdoServiceProviderBase(OpenRTM_aist.SdoServiceProviderBase):
00112 def __init__(self):
00113 self._profile = None
00114 self._rtobj = None
00115 return
00116
00117 def __del__(self):
00118 return
00119
00120 def init(self, rtobj, profile):
00121 self._rtobj = rtobj
00122 self._profile = profile
00123 return
00124
00125 def reinit(self, profile):
00126 return
00127
00128 def getProfile(self):
00129 return self._profile
00130
00131 def finalize(self):
00132 return
00133
00134
00135 class TestRTObject_impl(unittest.TestCase):
00136 def setUp(self):
00137 self._orb = CORBA.ORB_init(sys.argv)
00138 self._poa = self._orb.resolve_initial_references("RootPOA")
00139 self._poa._get_the_POAManager().activate()
00140 return
00141
00142 def tearDown(self):
00143
00144
00145
00146 time.sleep(0.1)
00147 OpenRTM_aist.Manager.instance().shutdownManager()
00148
00149 return
00150
00151 def test_is_alive(self):
00152 rtobj = TestComp(self._orb, self._poa)
00153 ec = rtobj.getObjRef().get_context(0)
00154 self.assertEqual(ec,None)
00155 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00156 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00157 ec.bindComponent(rtobj)
00158 self.assertNotEqual(rtobj.getObjRef().get_owned_contexts(),[])
00159 self.assertEqual(rtobj.is_alive(ec.getObjRef()),True)
00160 ec.remove_component(rtobj.getObjRef())
00161 ec.stop()
00162 del ec
00163
00164 return
00165
00166 def test_get_owned_contexts(self):
00167 rtobj = TestComp(self._orb, self._poa)
00168 self.assertEqual(rtobj.getObjRef().get_owned_contexts(),[])
00169 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00170 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00171 ec.bindComponent(rtobj)
00172 self.assertNotEqual(rtobj.getObjRef().get_owned_contexts(),[])
00173 ec.remove_component(rtobj.getObjRef())
00174 ec.stop()
00175 del ec
00176
00177 return
00178
00179 def test_get_participating_contexts(self):
00180 rtobj = TestComp(self._orb, self._poa)
00181 self.assertEqual(rtobj.getObjRef().get_participating_contexts(),[])
00182 return
00183
00184 def test_get_context(self):
00185 rtobj = TestComp(self._orb, self._poa)
00186 print rtobj.getObjRef().get_context(0)
00187 return
00188
00189 def test_get_component_profile(self):
00190 rtobj = TestComp(self._orb, self._poa)
00191 rtobj.setInstanceName("TestComp0")
00192 prof = rtobj.getObjRef().get_component_profile()
00193 self.assertEqual(prof.instance_name, "TestComp0")
00194 return
00195
00196 def test_get_ports(self):
00197 rtobj = TestComp(self._orb, self._poa)
00198 self.assertEqual(rtobj.getObjRef().get_ports(), [])
00199 return
00200
00201
00202 def test_attach_context(self):
00203 rtobj = TestComp(self._orb, self._poa)
00204 ec = OpenRTM_aist.PeriodicExecutionContext(rtobj.getObjRef(), 10)
00205 id = rtobj.getObjRef().attach_context(ec.getObjRef())
00206 print "attach_context: ", id
00207 print rtobj.getObjRef().detach_context(id)
00208 poa = OpenRTM_aist.Manager.instance().getPOA()
00209 poa.deactivate_object(poa.servant_to_id(ec))
00210 return
00211
00212 def test_get_owned_organizations(self):
00213 rtobj = TestComp(self._orb, self._poa)
00214 self.assertEqual(rtobj.getObjRef().get_owned_organizations(),[])
00215 return
00216
00217 def test_get_sdo_id(self):
00218 rtobj = TestComp(self._orb, self._poa)
00219 rtobj.setInstanceName("TestComp0")
00220 self.assertEqual(rtobj.getObjRef().get_sdo_id(), "TestComp0")
00221 return
00222
00223 def test_get_sdo_type(self):
00224 rtobj = TestComp(self._orb, self._poa)
00225 prop = OpenRTM_aist.Properties(defaults_str=configsample_spec)
00226 rtobj.setProperties(prop)
00227 self.assertEqual(rtobj.getObjRef().get_sdo_type(), "Test example component")
00228 return
00229
00230 def test_get_device_profile(self):
00231 rtobj = TestComp(self._orb, self._poa)
00232 prof = rtobj.getObjRef().get_device_profile()
00233 self.assertEqual(prof.device_type, "")
00234 return
00235
00236 def test_get_service_profiles(self):
00237 rtobj = TestComp(self._orb, self._poa)
00238 self.assertEqual(rtobj.getObjRef().get_service_profiles(),[])
00239 return
00240
00241
00242 def test_get_service_profile(self):
00243
00244 return
00245
00246
00247 def test_get_sdo_service(self):
00248
00249 return
00250
00251 def test_get_configuration(self):
00252 rtobj = TestComp(self._orb, self._poa)
00253 print rtobj.getObjRef().get_configuration()
00254 return
00255
00256 def test_get_monitoring(self):
00257
00258 return
00259
00260 def test_get_organizations(self):
00261 rtobj = TestComp(self._orb, self._poa)
00262 self.assertEqual(rtobj.getObjRef().get_organizations(), [])
00263 return
00264
00265 def test_get_status_list(self):
00266 rtobj = TestComp(self._orb, self._poa)
00267 self.assertEqual(rtobj.getObjRef().get_status_list(), [])
00268 return
00269
00270 def test_get_status(self):
00271
00272 return
00273
00274 def test_getPropTestCase(self):
00275 rtobj = TestComp(self._orb, self._poa)
00276 self.assertEqual(rtobj.getInstanceName(), "")
00277 prop = OpenRTM_aist.Properties(defaults_str=configsample_spec)
00278 rtobj.setInstanceName("TestComp0")
00279 rtobj.setProperties(prop)
00280 self.assertEqual(rtobj.getInstanceName(), "TestComp0")
00281 self.assertEqual(rtobj.getTypeName(), "TestComp")
00282 self.assertEqual(rtobj.getDescription(), "Test example component")
00283 self.assertEqual(rtobj.getVersion(), "1.0")
00284 self.assertEqual(rtobj.getVendor(), "Shinji Kurihara, AIST")
00285 self.assertEqual(rtobj.getCategory(), "example")
00286 self.assertNotEqual(rtobj.getNamingNames(),["TestComp0.rtc"])
00287 return
00288
00289 def test_setObjRef(self):
00290 rtobj = TestComp(self._orb, self._poa)
00291 rtobj.setObjRef("test")
00292 self.assertEqual(rtobj.getObjRef(),"test")
00293 return
00294
00295 def test_bindParameter(self):
00296 rtobj = TestComp(self._orb, self._poa)
00297 conf_ = [123]
00298 self.assertEqual(rtobj.bindParameter("config", conf_, 0), True)
00299 rtobj.updateParameters("")
00300 return
00301
00302 def test_PortTestCase(self):
00303 rtobj = TestComp(self._orb, self._poa)
00304 ringbuf = OpenRTM_aist.RingBuffer(8)
00305 outp = OpenRTM_aist.OutPort("out", RTC.TimedLong(RTC.Time(0,0),0), ringbuf)
00306 rtobj.registerOutPort("out",outp)
00307
00308 ringbuf = OpenRTM_aist.RingBuffer(8)
00309 inp = OpenRTM_aist.InPort("in", RTC.TimedLong(RTC.Time(0,0),0), ringbuf)
00310 rtobj.registerInPort("in",inp)
00311
00312 rtobj.deletePort(outp)
00313 rtobj.deletePort(inp)
00314
00315 rtobj.finalizePorts()
00316 return
00317
00318
00319 def test_getExecutionContext(self):
00320 rtobj = TestComp(self._orb, self._poa)
00321 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00322 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00323 ec.bindComponent(rtobj)
00324 self.assertNotEqual(rtobj.getExecutionContext(0),None)
00325 return
00326
00327 def test_getExecutionRate(self):
00328 rtobj = TestComp(self._orb, self._poa)
00329 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00330 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00331 ec.bindComponent(rtobj)
00332 self.assertEqual(rtobj.getExecutionRate(0),1000.0)
00333 return
00334
00335 def test_setExecutionRate(self):
00336 rtobj = TestComp(self._orb, self._poa)
00337 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00338 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00339 ec.bindComponent(rtobj)
00340 self.assertEqual(rtobj.setExecutionRate(0,10000),RTC.RTC_OK)
00341 self.assertEqual(rtobj.getExecutionRate(0),10000.0)
00342 return
00343
00344 def test_isOwnExecutionContext(self):
00345 rtobj = TestComp(self._orb, self._poa)
00346 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00347 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00348 ec.bindComponent(rtobj)
00349 self.assertEqual(rtobj.isOwnExecutionContext(0),True)
00350 return
00351
00352 def test_activate_deactivate(self):
00353 rtobj = TestComp(self._orb, self._poa)
00354 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00355 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00356 ec.set_rate(1000.0)
00357 ec.bindComponent(rtobj)
00358 self.assertEqual(rtobj.activate(0),RTC.RTC_OK)
00359 ec.start()
00360 time.sleep(0.1)
00361 ret = rtobj.deactivate(0)
00362 time.sleep(0.1)
00363 self.assertEqual(ret,RTC.RTC_OK)
00364 ec.stop()
00365 return
00366
00367 def test_reset(self):
00368 rtobj = TestComp(self._orb, self._poa)
00369 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00370 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00371 ec.bindComponent(rtobj)
00372 self.assertEqual(rtobj.activate(0),RTC.RTC_OK)
00373 ec.start()
00374 ec._comps[0]._sm._sm.goTo(RTC.ERROR_STATE)
00375 time.sleep(0.1)
00376 self.assertEqual(rtobj.reset(0),RTC.RTC_OK)
00377 ec.stop()
00378 return
00379
00380 def test_addRemoveSdoServiceProvider(self):
00381 rtobj = TestComp(self._orb, self._poa)
00382 prof = SDOPackage.ServiceProfile("id","interface_type",
00383 OpenRTM_aist.NVUtil.newNV("test","any"),
00384 SDOPackage.SDOService._nil)
00385 prov = MySdoServiceProviderBase()
00386 prov.init(rtobj,prof)
00387 self.assertEqual(rtobj.addSdoServiceProvider(prof, prov),True)
00388 self.assertEqual(rtobj.removeSdoServiceProvider("id"),True)
00389 return
00390
00391 def test_addRemoveSdoServiceConsumer(self):
00392 import MySdoServiceConsumer
00393 OpenRTM_aist.Manager.instance().load("MySdoServiceConsumer.py",
00394 "MySdoServiceConsumerInit")
00395 rtobj = TestComp(self._orb, self._poa)
00396 prof = SDOPackage.ServiceProfile(OpenRTM_aist.toTypename(OpenRTM.ComponentObserver),OpenRTM_aist.toTypename(OpenRTM.ComponentObserver),
00397 [OpenRTM_aist.NVUtil.newNV("test","any")],
00398 SDOPackage.SDOService._nil)
00399 self.assertEqual(rtobj.addSdoServiceConsumer(prof),True)
00400 self.assertEqual(rtobj.removeSdoServiceConsumer(OpenRTM_aist.toTypename(OpenRTM.ComponentObserver)),True)
00401 return
00402
00403 def prelistenerFunc(self, id):
00404 print "prelistenerFunc called !!!!"
00405 return
00406
00407 def test_addRemovePreComponentActionListener(self):
00408 rtobj = TestComp(self._orb, self._poa)
00409
00410 rtobj.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_INITIALIZE,
00411 self.prelistenerFunc)
00412
00413 rtobj.removePreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_INITIALIZE,
00414 self.prelistenerFunc)
00415
00416 rtobj.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_FINALIZE,
00417 self.prelistenerFunc)
00418
00419 rtobj.removePreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_FINALIZE,
00420 self.prelistenerFunc)
00421 return
00422
00423 def postlistenerFunc(self, id, ret):
00424 print "postlistenerFunc called !!!!"
00425 return
00426
00427 def test_addRemovePostComponentActionListener(self):
00428 rtobj = TestComp(self._orb, self._poa)
00429
00430 rtobj.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_FINALIZE,
00431 self.postlistenerFunc)
00432
00433 rtobj.removePostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_FINALIZE,
00434 self.postlistenerFunc)
00435 return
00436
00437 def test_addRemovePortActionListener(self):
00438 rtobj = TestComp(self._orb, self._poa)
00439
00440 rtobj.addPortActionListener(OpenRTM_aist.PortActionListenerType.ADD_PORT,
00441 self.prelistenerFunc)
00442
00443 rtobj.removePortActionListener(OpenRTM_aist.PortActionListenerType.ADD_PORT,
00444 self.prelistenerFunc)
00445 return
00446
00447 def test_addRemoveExecutionContextActionListener(self):
00448 rtobj = TestComp(self._orb, self._poa)
00449
00450 rtobj.addExecutionContextActionListener(OpenRTM_aist.ExecutionContextActionListenerType.EC_ATTACHED,
00451 self.prelistenerFunc)
00452
00453 rtobj.removeExecutionContextActionListener(OpenRTM_aist.ExecutionContextActionListenerType.EC_ATTACHED,
00454 self.prelistenerFunc)
00455 return
00456
00457 def test_addRemovePortConnectListener(self):
00458 rtobj = TestComp(self._orb, self._poa)
00459
00460 rtobj.addPortConnectListener(OpenRTM_aist.PortConnectListenerType.ON_NOTIFY_CONNECT,
00461 self.postlistenerFunc)
00462
00463 rtobj.removePortConnectListener(OpenRTM_aist.PortConnectListenerType.ON_NOTIFY_CONNECT,
00464 self.postlistenerFunc)
00465 return
00466
00467 def portconretlistenerFunc(self, pname, cprof, ret):
00468 print "portconretlistenerFunc called !!!!"
00469 return
00470
00471 def test_addRemovePortConnectRetListener(self):
00472 rtobj = TestComp(self._orb, self._poa)
00473
00474 rtobj.addPortConnectRetListener(OpenRTM_aist.PortConnectRetListenerType.ON_CONNECTED,
00475 self.portconretlistenerFunc)
00476
00477 rtobj.removePortConnectRetListener(OpenRTM_aist.PortConnectRetListenerType.ON_CONNECTED,
00478 self.portconretlistenerFunc)
00479 return
00480
00481 def configparamlistenerFunc(self, pname, cprof, ret):
00482 print "configparamlistenerFunc called !!!!"
00483 return
00484
00485 def test_addRemoveConfigurationParamListener(self):
00486 rtobj = TestComp(self._orb, self._poa)
00487
00488 rtobj.addConfigurationParamListener(OpenRTM_aist.ConfigurationParamListenerType.ON_UPDATE_CONFIG_PARAM,
00489 self.configparamlistenerFunc)
00490
00491 rtobj.removeConfigurationParamListener(OpenRTM_aist.ConfigurationParamListenerType.ON_UPDATE_CONFIG_PARAM,
00492 self.configparamlistenerFunc)
00493 return
00494
00495 def test_addRemoveConfigurationSetListener(self):
00496 rtobj = TestComp(self._orb, self._poa)
00497
00498 rtobj.addConfigurationSetListener(OpenRTM_aist.ConfigurationSetListenerType.ON_SET_CONFIG_SET,
00499 self.prelistenerFunc)
00500
00501 rtobj.removeConfigurationSetListener(OpenRTM_aist.ConfigurationSetListenerType.ON_SET_CONFIG_SET,
00502 self.prelistenerFunc)
00503 return
00504
00505 def test_addRemoveConfigurationSetNameListener(self):
00506 rtobj = TestComp(self._orb, self._poa)
00507
00508 rtobj.addConfigurationSetNameListener(OpenRTM_aist.ConfigurationSetNameListenerType.ON_UPDATE_CONFIG_SET,
00509 self.prelistenerFunc)
00510
00511 rtobj.removeConfigurationSetNameListener(OpenRTM_aist.ConfigurationSetNameListenerType.ON_UPDATE_CONFIG_SET,
00512 self.prelistenerFunc)
00513 return
00514
00515 def test_shutdown(self):
00516 return
00517
00518 def test_preOnInitialize(self):
00519 rtobj = TestComp(self._orb, self._poa)
00520 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00521 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00522 ec.bindComponent(rtobj)
00523 print "preOnInitialize()"
00524 rtobj.preOnInitialize(0)
00525 return
00526
00527 def test_preOnFinalize(self):
00528 rtobj = TestComp(self._orb, self._poa)
00529 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00530 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00531 ec.bindComponent(rtobj)
00532 rtobj.preOnFinalize(0)
00533 return
00534
00535 def test_preOnStartup(self):
00536 rtobj = TestComp(self._orb, self._poa)
00537 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00538 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00539 ec.bindComponent(rtobj)
00540 rtobj.preOnStartup(0)
00541 return
00542
00543 def test_preOnShutdown(self):
00544 rtobj = TestComp(self._orb, self._poa)
00545 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00546 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00547 ec.bindComponent(rtobj)
00548 rtobj.preOnShutdown(0)
00549 return
00550
00551 def test_preOnActivated(self):
00552 rtobj = TestComp(self._orb, self._poa)
00553 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00554 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00555 ec.bindComponent(rtobj)
00556 rtobj.preOnActivated(0)
00557 return
00558
00559 def test_preOnDeactivated(self):
00560 rtobj = TestComp(self._orb, self._poa)
00561 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00562 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00563 ec.bindComponent(rtobj)
00564 rtobj.preOnDeactivated(0)
00565 return
00566
00567 def test_preOnAborting(self):
00568 rtobj = TestComp(self._orb, self._poa)
00569 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00570 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00571 ec.bindComponent(rtobj)
00572 rtobj.preOnAborting(0)
00573 return
00574
00575 def test_preOnError(self):
00576 rtobj = TestComp(self._orb, self._poa)
00577 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00578 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00579 ec.bindComponent(rtobj)
00580 rtobj.preOnError(0)
00581 return
00582
00583 def test_preOnReset(self):
00584 rtobj = TestComp(self._orb, self._poa)
00585 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00586 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00587 ec.bindComponent(rtobj)
00588 rtobj.preOnReset(0)
00589 return
00590
00591 def test_preOnExecute(self):
00592 rtobj = TestComp(self._orb, self._poa)
00593 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00594 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00595 ec.bindComponent(rtobj)
00596 rtobj.preOnExecute(0)
00597 return
00598
00599 def test_preOnStateUpdate(self):
00600 rtobj = TestComp(self._orb, self._poa)
00601 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00602 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00603 ec.bindComponent(rtobj)
00604 rtobj.preOnStateUpdate(0)
00605 return
00606
00607 def test_preOnRateChanged(self):
00608 rtobj = TestComp(self._orb, self._poa)
00609 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00610 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00611 ec.bindComponent(rtobj)
00612 rtobj.preOnRateChanged(0)
00613 return
00614
00615 def test_postOnInitialize(self):
00616 rtobj = TestComp(self._orb, self._poa)
00617 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00618 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00619 ec.bindComponent(rtobj)
00620 rtobj.postOnInitialize(0,True)
00621 return
00622
00623 def test_postOnFinalize(self):
00624 rtobj = TestComp(self._orb, self._poa)
00625 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00626 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00627 ec.bindComponent(rtobj)
00628 rtobj.postOnFinalize(0,True)
00629 return
00630
00631 def test_postOnStartup(self):
00632 rtobj = TestComp(self._orb, self._poa)
00633 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00634 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00635 ec.bindComponent(rtobj)
00636 rtobj.postOnStartup(0,True)
00637 return
00638
00639 def test_postOnShutdown(self):
00640 rtobj = TestComp(self._orb, self._poa)
00641 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00642 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00643 ec.bindComponent(rtobj)
00644 rtobj.postOnShutdown(0,True)
00645 return
00646
00647 def test_postOnActivated(self):
00648 rtobj = TestComp(self._orb, self._poa)
00649 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00650 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00651 ec.bindComponent(rtobj)
00652 rtobj.postOnActivated(0,True)
00653 return
00654
00655 def test_postOnDeactivated(self):
00656 rtobj = TestComp(self._orb, self._poa)
00657 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00658 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00659 ec.bindComponent(rtobj)
00660 rtobj.postOnDeactivated(0,True)
00661 return
00662
00663 def test_postOnAborting(self):
00664 rtobj = TestComp(self._orb, self._poa)
00665 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00666 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00667 ec.bindComponent(rtobj)
00668 rtobj.postOnAborting(0,True)
00669 return
00670
00671 def test_postOnError(self):
00672 rtobj = TestComp(self._orb, self._poa)
00673 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00674 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00675 ec.bindComponent(rtobj)
00676 rtobj.postOnError(0,True)
00677 return
00678
00679 def test_postOnReset(self):
00680 rtobj = TestComp(self._orb, self._poa)
00681 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00682 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00683 ec.bindComponent(rtobj)
00684 rtobj.postOnReset(0,True)
00685 return
00686
00687 def test_postOnExecute(self):
00688 rtobj = TestComp(self._orb, self._poa)
00689 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00690 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00691 ec.bindComponent(rtobj)
00692 rtobj.postOnExecute(0,True)
00693 return
00694
00695 def test_postOnStateUpdate(self):
00696 rtobj = TestComp(self._orb, self._poa)
00697 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00698 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00699 ec.bindComponent(rtobj)
00700 rtobj.postOnStateUpdate(0,True)
00701 return
00702
00703 def test_postOnRateChanged(self):
00704 rtobj = TestComp(self._orb, self._poa)
00705 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00706 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00707 ec.bindComponent(rtobj)
00708 rtobj.postOnRateChanged(0,True)
00709 return
00710
00711 def test_onAddPort(self):
00712 rtobj = TestComp(self._orb, self._poa)
00713 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00714 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00715 ec.bindComponent(rtobj)
00716 rtobj.onAddPort(0)
00717 return
00718
00719 def test_onRemovePort(self):
00720 rtobj = TestComp(self._orb, self._poa)
00721 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00722 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00723 ec.bindComponent(rtobj)
00724 rtobj.onRemovePort(0)
00725 return
00726
00727 def test_onAttachExecutionContext(self):
00728 rtobj = TestComp(self._orb, self._poa)
00729 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00730 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00731 ec.bindComponent(rtobj)
00732 rtobj.onAttachExecutionContext(0)
00733 return
00734
00735 def test_onDetachExecutionContext(self):
00736 rtobj = TestComp(self._orb, self._poa)
00737 ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00738 ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00739 ec.bindComponent(rtobj)
00740 rtobj.onDetachExecutionContext(0)
00741 return
00742
00743
00744
00745 if __name__ == '__main__':
00746 unittest.main()