test_RTObject.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- Python -*-
00003 
00004 
00005 ## \file test_RTObject.py
00006 ## \brief test for RT component base class
00007 ## \date $Date: $
00008 ## \author Shinji Kurihara
00009 #
00010 # Copyright (C) 2006
00011 #     Task-intelligence Research Group,
00012 #     Intelligent Systems Research Institute,
00013 #     National Institute of
00014 #         Advanced Industrial Science and Technology (AIST), Japan
00015 #     All rights reserved.
00016 #
00017 
00018 import sys,time
00019 sys.path.insert(1,"../")
00020 sys.path.insert(1,"../RTM_IDL")
00021 
00022 import RTC
00023 import OpenRTM
00024 import SDOPackage
00025 import OpenRTM_aist
00026 from omniORB import CORBA, PortableServer
00027 from omniORB import any
00028 import unittest
00029 
00030 configsample_spec = ["implementation_id", "TestComp",
00031          "type_name",         "TestComp",
00032          "description",       "Test example component",
00033          "version",           "1.0",
00034          "vendor",            "Shinji Kurihara, AIST",
00035          "category",          "example",
00036          "activity_type",     "DataFlowComponent",
00037          "max_instance",      "10",
00038          "language",          "Python",
00039          "lang_type",         "script",
00040          # Configuration variables
00041          "conf.default.int_param0", "0",
00042          "conf.default.int_param1", "1",
00043          "conf.default.double_param0", "0.11",
00044          "conf.default.double_param1", "9.9",
00045          "conf.default.str_param0", "hoge",
00046          "conf.default.str_param1", "dara",
00047          "conf.default.vector_param0", "0.0,1.0,2.0,3.0,4.0",
00048          ""]
00049 
00050 com = None
00051 
00052 class TestComp(OpenRTM_aist.RTObject_impl):
00053   def __init__(self, orb_, poa_):
00054     OpenRTM_aist.RTObject_impl.__init__(self, orb=orb_, poa=poa_)
00055 
00056   def onInitialize(self):
00057     print "onInitialize"
00058     return RTC.RTC_OK
00059 
00060   def onFinalize(self):
00061     print "onFinalize"
00062     return RTC.RTC_OK
00063     
00064   def onStartup(self, ec_id):
00065     print "onStartup"
00066     return RTC.RTC_OK
00067 
00068   def onShutdown(self, ec_id):
00069     print "onSutdown"
00070     return RTC.RTC_OK
00071 
00072   def onActivated(self, ec_id):
00073     print "onActivated"
00074     return RTC.RTC_OK
00075 
00076   def onDeactivated(self, ec_id):
00077     print "onDeactivated"
00078     return RTC.RTC_OK
00079 
00080   def onExecute(self, ec_id):
00081     print "onExecute"
00082     return RTC.RTC_OK
00083 
00084   def onAborting(self, ec_id):
00085     print "onAborting"
00086     return RTC.RTC_OK
00087 
00088   def onReset(self, ec_id):
00089     print "onReset"
00090     return RTC.RTC_OK
00091     
00092   def onStateUpdate(self, ec_id):
00093     print "onStateUpdate"
00094     return RTC.RTC_OK
00095 
00096   def onRateChanged(self, ec_id):
00097     print "onRateChanged"
00098     return RTC.RTC_OK
00099 
00100     
00101 def TestCompInit(manager):
00102   print "TestCompInit"
00103   global com
00104   profile = OpenRTM_aist.Properties(defaults_str=configsample_spec)
00105   manager.registerFactory(profile,
00106         TestComp,
00107         OpenRTM_aist.Delete)
00108   com = manager.createComponent("TestComp")
00109 
00110 
00111 class MySdoServiceProviderBase(OpenRTM_aist.SdoServiceProviderBase):
00112   def __init__(self):
00113     self._profile = None
00114     self._rtobj = None
00115     return
00116 
00117   def __del__(self):
00118     return
00119 
00120   def init(self, rtobj, profile):
00121     self._rtobj = rtobj
00122     self._profile = profile
00123     return
00124 
00125   def reinit(self, profile):
00126     return
00127 
00128   def getProfile(self):
00129     return self._profile
00130 
00131   def finalize(self):
00132     return
00133 
00134 
00135 class TestRTObject_impl(unittest.TestCase):
00136   def setUp(self):
00137     self._orb = CORBA.ORB_init(sys.argv)
00138     self._poa = self._orb.resolve_initial_references("RootPOA")
00139     self._poa._get_the_POAManager().activate()
00140     return
00141 
00142   def tearDown(self):
00143     #global com
00144     #self.rtobj.exit()
00145     #self.manager.terminate()
00146     time.sleep(0.1)
00147     OpenRTM_aist.Manager.instance().shutdownManager()
00148     #com = None
00149     return
00150 
00151   def test_is_alive(self):
00152     rtobj = TestComp(self._orb, self._poa)
00153     ec = rtobj.getObjRef().get_context(0)
00154     self.assertEqual(ec,None)
00155     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00156     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00157     ec.bindComponent(rtobj)
00158     self.assertNotEqual(rtobj.getObjRef().get_owned_contexts(),[])
00159     self.assertEqual(rtobj.is_alive(ec.getObjRef()),True)
00160     ec.remove_component(rtobj.getObjRef())
00161     ec.stop()
00162     del ec
00163 
00164     return
00165 
00166   def test_get_owned_contexts(self):
00167     rtobj = TestComp(self._orb, self._poa)
00168     self.assertEqual(rtobj.getObjRef().get_owned_contexts(),[])
00169     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00170     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00171     ec.bindComponent(rtobj)
00172     self.assertNotEqual(rtobj.getObjRef().get_owned_contexts(),[])
00173     ec.remove_component(rtobj.getObjRef())
00174     ec.stop()
00175     del ec
00176 
00177     return
00178 
00179   def test_get_participating_contexts(self):
00180     rtobj = TestComp(self._orb, self._poa)
00181     self.assertEqual(rtobj.getObjRef().get_participating_contexts(),[])
00182     return
00183 
00184   def test_get_context(self):
00185     rtobj = TestComp(self._orb, self._poa)
00186     print rtobj.getObjRef().get_context(0)
00187     return
00188 
00189   def test_get_component_profile(self):
00190     rtobj = TestComp(self._orb, self._poa)
00191     rtobj.setInstanceName("TestComp0")
00192     prof = rtobj.getObjRef().get_component_profile()
00193     self.assertEqual(prof.instance_name, "TestComp0")
00194     return
00195 
00196   def test_get_ports(self):
00197     rtobj = TestComp(self._orb, self._poa)
00198     self.assertEqual(rtobj.getObjRef().get_ports(), [])
00199     return
00200 
00201 
00202   def test_attach_context(self):
00203     rtobj = TestComp(self._orb, self._poa)
00204     ec = OpenRTM_aist.PeriodicExecutionContext(rtobj.getObjRef(), 10)
00205     id = rtobj.getObjRef().attach_context(ec.getObjRef())
00206     print "attach_context: ", id
00207     print rtobj.getObjRef().detach_context(id)
00208     poa = OpenRTM_aist.Manager.instance().getPOA()
00209     poa.deactivate_object(poa.servant_to_id(ec))
00210     return
00211 
00212   def test_get_owned_organizations(self):
00213     rtobj = TestComp(self._orb, self._poa)
00214     self.assertEqual(rtobj.getObjRef().get_owned_organizations(),[])
00215     return
00216     
00217   def test_get_sdo_id(self):
00218     rtobj = TestComp(self._orb, self._poa)
00219     rtobj.setInstanceName("TestComp0")
00220     self.assertEqual(rtobj.getObjRef().get_sdo_id(), "TestComp0")
00221     return
00222 
00223   def test_get_sdo_type(self):
00224     rtobj = TestComp(self._orb, self._poa)
00225     prop = OpenRTM_aist.Properties(defaults_str=configsample_spec)
00226     rtobj.setProperties(prop)
00227     self.assertEqual(rtobj.getObjRef().get_sdo_type(), "Test example component")
00228     return
00229 
00230   def test_get_device_profile(self):
00231     rtobj = TestComp(self._orb, self._poa)
00232     prof = rtobj.getObjRef().get_device_profile()
00233     self.assertEqual(prof.device_type, "")
00234     return
00235 
00236   def test_get_service_profiles(self):
00237     rtobj = TestComp(self._orb, self._poa)
00238     self.assertEqual(rtobj.getObjRef().get_service_profiles(),[])
00239     return
00240 
00241 
00242   def test_get_service_profile(self):
00243     #rtobj.getObjRef().get_service_profile("TestComp")
00244     return
00245 
00246 
00247   def test_get_sdo_service(self):
00248     #rtobj.getObjRef().get_sdo_service(None)
00249     return
00250 
00251   def test_get_configuration(self):
00252     rtobj = TestComp(self._orb, self._poa)
00253     print rtobj.getObjRef().get_configuration()
00254     return
00255 
00256   def test_get_monitoring(self):
00257     #rtobj.getObjRef().get_monitoring()
00258     return
00259 
00260   def test_get_organizations(self):
00261     rtobj = TestComp(self._orb, self._poa)
00262     self.assertEqual(rtobj.getObjRef().get_organizations(), [])
00263     return
00264 
00265   def test_get_status_list(self):
00266     rtobj = TestComp(self._orb, self._poa)
00267     self.assertEqual(rtobj.getObjRef().get_status_list(), [])
00268     return
00269 
00270   def test_get_status(self):
00271     #rtobj.getObjRef().get_status("status")
00272     return
00273 
00274   def test_getPropTestCase(self):
00275     rtobj = TestComp(self._orb, self._poa)
00276     self.assertEqual(rtobj.getInstanceName(), "")
00277     prop = OpenRTM_aist.Properties(defaults_str=configsample_spec)
00278     rtobj.setInstanceName("TestComp0")
00279     rtobj.setProperties(prop)
00280     self.assertEqual(rtobj.getInstanceName(), "TestComp0")
00281     self.assertEqual(rtobj.getTypeName(), "TestComp")
00282     self.assertEqual(rtobj.getDescription(), "Test example component")
00283     self.assertEqual(rtobj.getVersion(), "1.0")
00284     self.assertEqual(rtobj.getVendor(), "Shinji Kurihara, AIST")
00285     self.assertEqual(rtobj.getCategory(), "example")
00286     self.assertNotEqual(rtobj.getNamingNames(),["TestComp0.rtc"])
00287     return
00288 
00289   def test_setObjRef(self):
00290     rtobj = TestComp(self._orb, self._poa)
00291     rtobj.setObjRef("test")
00292     self.assertEqual(rtobj.getObjRef(),"test")
00293     return
00294 
00295   def test_bindParameter(self):
00296     rtobj = TestComp(self._orb, self._poa)
00297     conf_ = [123]
00298     self.assertEqual(rtobj.bindParameter("config", conf_, 0), True)
00299     rtobj.updateParameters("")
00300     return
00301 
00302   def test_PortTestCase(self):
00303     rtobj = TestComp(self._orb, self._poa)
00304     ringbuf = OpenRTM_aist.RingBuffer(8)
00305     outp = OpenRTM_aist.OutPort("out", RTC.TimedLong(RTC.Time(0,0),0), ringbuf)
00306     rtobj.registerOutPort("out",outp)
00307 
00308     ringbuf = OpenRTM_aist.RingBuffer(8)
00309     inp = OpenRTM_aist.InPort("in", RTC.TimedLong(RTC.Time(0,0),0), ringbuf)
00310     rtobj.registerInPort("in",inp)
00311     
00312     rtobj.deletePort(outp)
00313     rtobj.deletePort(inp)
00314 
00315     rtobj.finalizePorts()
00316     return
00317 
00318   # since 1.1.0
00319   def test_getExecutionContext(self):
00320     rtobj = TestComp(self._orb, self._poa)
00321     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00322     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00323     ec.bindComponent(rtobj)
00324     self.assertNotEqual(rtobj.getExecutionContext(0),None)
00325     return
00326 
00327   def test_getExecutionRate(self):
00328     rtobj = TestComp(self._orb, self._poa)
00329     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00330     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00331     ec.bindComponent(rtobj)
00332     self.assertEqual(rtobj.getExecutionRate(0),1000.0)
00333     return
00334 
00335   def test_setExecutionRate(self):
00336     rtobj = TestComp(self._orb, self._poa)
00337     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00338     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00339     ec.bindComponent(rtobj)
00340     self.assertEqual(rtobj.setExecutionRate(0,10000),RTC.RTC_OK)
00341     self.assertEqual(rtobj.getExecutionRate(0),10000.0)
00342     return
00343 
00344   def test_isOwnExecutionContext(self):
00345     rtobj = TestComp(self._orb, self._poa)
00346     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00347     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00348     ec.bindComponent(rtobj)
00349     self.assertEqual(rtobj.isOwnExecutionContext(0),True)
00350     return
00351 
00352   def test_activate_deactivate(self):
00353     rtobj = TestComp(self._orb, self._poa)
00354     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00355     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00356     ec.set_rate(1000.0)
00357     ec.bindComponent(rtobj)
00358     self.assertEqual(rtobj.activate(0),RTC.RTC_OK)
00359     ec.start()
00360     time.sleep(0.1)
00361     ret = rtobj.deactivate(0)
00362     time.sleep(0.1)
00363     self.assertEqual(ret,RTC.RTC_OK)
00364     ec.stop()
00365     return
00366 
00367   def test_reset(self):
00368     rtobj = TestComp(self._orb, self._poa)
00369     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00370     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00371     ec.bindComponent(rtobj)
00372     self.assertEqual(rtobj.activate(0),RTC.RTC_OK)
00373     ec.start()
00374     ec._comps[0]._sm._sm.goTo(RTC.ERROR_STATE)
00375     time.sleep(0.1)
00376     self.assertEqual(rtobj.reset(0),RTC.RTC_OK)
00377     ec.stop()
00378     return
00379 
00380   def test_addRemoveSdoServiceProvider(self):
00381     rtobj = TestComp(self._orb, self._poa)
00382     prof = SDOPackage.ServiceProfile("id","interface_type",
00383                                      OpenRTM_aist.NVUtil.newNV("test","any"),
00384                                      SDOPackage.SDOService._nil)
00385     prov = MySdoServiceProviderBase()
00386     prov.init(rtobj,prof)
00387     self.assertEqual(rtobj.addSdoServiceProvider(prof, prov),True)
00388     self.assertEqual(rtobj.removeSdoServiceProvider("id"),True)
00389     return
00390 
00391   def test_addRemoveSdoServiceConsumer(self):
00392     import MySdoServiceConsumer
00393     OpenRTM_aist.Manager.instance().load("MySdoServiceConsumer.py",
00394                                          "MySdoServiceConsumerInit")
00395     rtobj = TestComp(self._orb, self._poa)
00396     prof = SDOPackage.ServiceProfile(OpenRTM_aist.toTypename(OpenRTM.ComponentObserver),OpenRTM_aist.toTypename(OpenRTM.ComponentObserver),
00397                                      [OpenRTM_aist.NVUtil.newNV("test","any")],
00398                                      SDOPackage.SDOService._nil)
00399     self.assertEqual(rtobj.addSdoServiceConsumer(prof),True)
00400     self.assertEqual(rtobj.removeSdoServiceConsumer(OpenRTM_aist.toTypename(OpenRTM.ComponentObserver)),True)
00401     return
00402 
00403   def prelistenerFunc(self, id):
00404     print "prelistenerFunc called !!!!"
00405     return
00406 
00407   def test_addRemovePreComponentActionListener(self):
00408     rtobj = TestComp(self._orb, self._poa)
00409 
00410     rtobj.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_INITIALIZE,
00411                                         self.prelistenerFunc)
00412 
00413     rtobj.removePreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_INITIALIZE,
00414                                            self.prelistenerFunc)
00415 
00416     rtobj.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_FINALIZE,
00417                                         self.prelistenerFunc)
00418 
00419     rtobj.removePreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_FINALIZE,
00420                                            self.prelistenerFunc)
00421     return
00422 
00423   def postlistenerFunc(self, id, ret):
00424     print "postlistenerFunc called !!!!"
00425     return
00426 
00427   def test_addRemovePostComponentActionListener(self):
00428     rtobj = TestComp(self._orb, self._poa)
00429 
00430     rtobj.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_FINALIZE,
00431                                          self.postlistenerFunc)
00432 
00433     rtobj.removePostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_FINALIZE,
00434                                             self.postlistenerFunc)
00435     return
00436 
00437   def test_addRemovePortActionListener(self):
00438     rtobj = TestComp(self._orb, self._poa)
00439 
00440     rtobj.addPortActionListener(OpenRTM_aist.PortActionListenerType.ADD_PORT,
00441                                 self.prelistenerFunc)
00442 
00443     rtobj.removePortActionListener(OpenRTM_aist.PortActionListenerType.ADD_PORT,
00444                                    self.prelistenerFunc)
00445     return
00446 
00447   def test_addRemoveExecutionContextActionListener(self):
00448     rtobj = TestComp(self._orb, self._poa)
00449 
00450     rtobj.addExecutionContextActionListener(OpenRTM_aist.ExecutionContextActionListenerType.EC_ATTACHED,
00451                                             self.prelistenerFunc)
00452 
00453     rtobj.removeExecutionContextActionListener(OpenRTM_aist.ExecutionContextActionListenerType.EC_ATTACHED,
00454                                             self.prelistenerFunc)
00455     return
00456 
00457   def test_addRemovePortConnectListener(self):
00458     rtobj = TestComp(self._orb, self._poa)
00459 
00460     rtobj.addPortConnectListener(OpenRTM_aist.PortConnectListenerType.ON_NOTIFY_CONNECT,
00461                                  self.postlistenerFunc)
00462 
00463     rtobj.removePortConnectListener(OpenRTM_aist.PortConnectListenerType.ON_NOTIFY_CONNECT,
00464                                     self.postlistenerFunc)
00465     return
00466 
00467   def portconretlistenerFunc(self, pname, cprof, ret):
00468     print "portconretlistenerFunc called !!!!"
00469     return
00470 
00471   def test_addRemovePortConnectRetListener(self):
00472     rtobj = TestComp(self._orb, self._poa)
00473 
00474     rtobj.addPortConnectRetListener(OpenRTM_aist.PortConnectRetListenerType.ON_CONNECTED,
00475                                     self.portconretlistenerFunc)
00476 
00477     rtobj.removePortConnectRetListener(OpenRTM_aist.PortConnectRetListenerType.ON_CONNECTED,
00478                                        self.portconretlistenerFunc)
00479     return
00480 
00481   def configparamlistenerFunc(self, pname, cprof, ret):
00482     print "configparamlistenerFunc called !!!!"
00483     return
00484 
00485   def test_addRemoveConfigurationParamListener(self):
00486     rtobj = TestComp(self._orb, self._poa)
00487 
00488     rtobj.addConfigurationParamListener(OpenRTM_aist.ConfigurationParamListenerType.ON_UPDATE_CONFIG_PARAM,
00489                                         self.configparamlistenerFunc)
00490 
00491     rtobj.removeConfigurationParamListener(OpenRTM_aist.ConfigurationParamListenerType.ON_UPDATE_CONFIG_PARAM,
00492                                            self.configparamlistenerFunc)
00493     return
00494 
00495   def test_addRemoveConfigurationSetListener(self):
00496     rtobj = TestComp(self._orb, self._poa)
00497 
00498     rtobj.addConfigurationSetListener(OpenRTM_aist.ConfigurationSetListenerType.ON_SET_CONFIG_SET,
00499                                       self.prelistenerFunc)
00500 
00501     rtobj.removeConfigurationSetListener(OpenRTM_aist.ConfigurationSetListenerType.ON_SET_CONFIG_SET,
00502                                          self.prelistenerFunc)
00503     return
00504 
00505   def test_addRemoveConfigurationSetNameListener(self):
00506     rtobj = TestComp(self._orb, self._poa)
00507 
00508     rtobj.addConfigurationSetNameListener(OpenRTM_aist.ConfigurationSetNameListenerType.ON_UPDATE_CONFIG_SET,
00509                                           self.prelistenerFunc)
00510 
00511     rtobj.removeConfigurationSetNameListener(OpenRTM_aist.ConfigurationSetNameListenerType.ON_UPDATE_CONFIG_SET,
00512                                              self.prelistenerFunc)
00513     return
00514 
00515   def test_shutdown(self):
00516     return
00517 
00518   def test_preOnInitialize(self):
00519     rtobj = TestComp(self._orb, self._poa)
00520     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00521     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00522     ec.bindComponent(rtobj)
00523     print "preOnInitialize()"
00524     rtobj.preOnInitialize(0)
00525     return
00526 
00527   def test_preOnFinalize(self):
00528     rtobj = TestComp(self._orb, self._poa)
00529     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00530     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00531     ec.bindComponent(rtobj)
00532     rtobj.preOnFinalize(0)
00533     return
00534 
00535   def test_preOnStartup(self):
00536     rtobj = TestComp(self._orb, self._poa)
00537     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00538     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00539     ec.bindComponent(rtobj)
00540     rtobj.preOnStartup(0)
00541     return
00542 
00543   def test_preOnShutdown(self):
00544     rtobj = TestComp(self._orb, self._poa)
00545     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00546     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00547     ec.bindComponent(rtobj)
00548     rtobj.preOnShutdown(0)
00549     return
00550 
00551   def test_preOnActivated(self):
00552     rtobj = TestComp(self._orb, self._poa)
00553     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00554     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00555     ec.bindComponent(rtobj)
00556     rtobj.preOnActivated(0)
00557     return
00558 
00559   def test_preOnDeactivated(self):
00560     rtobj = TestComp(self._orb, self._poa)
00561     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00562     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00563     ec.bindComponent(rtobj)
00564     rtobj.preOnDeactivated(0)
00565     return
00566 
00567   def test_preOnAborting(self):
00568     rtobj = TestComp(self._orb, self._poa)
00569     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00570     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00571     ec.bindComponent(rtobj)
00572     rtobj.preOnAborting(0)
00573     return
00574 
00575   def test_preOnError(self):
00576     rtobj = TestComp(self._orb, self._poa)
00577     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00578     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00579     ec.bindComponent(rtobj)
00580     rtobj.preOnError(0)
00581     return
00582 
00583   def test_preOnReset(self):
00584     rtobj = TestComp(self._orb, self._poa)
00585     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00586     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00587     ec.bindComponent(rtobj)
00588     rtobj.preOnReset(0)
00589     return
00590 
00591   def test_preOnExecute(self):
00592     rtobj = TestComp(self._orb, self._poa)
00593     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00594     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00595     ec.bindComponent(rtobj)
00596     rtobj.preOnExecute(0)
00597     return
00598 
00599   def test_preOnStateUpdate(self):
00600     rtobj = TestComp(self._orb, self._poa)
00601     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00602     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00603     ec.bindComponent(rtobj)
00604     rtobj.preOnStateUpdate(0)
00605     return
00606 
00607   def test_preOnRateChanged(self):
00608     rtobj = TestComp(self._orb, self._poa)
00609     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00610     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00611     ec.bindComponent(rtobj)
00612     rtobj.preOnRateChanged(0)
00613     return
00614 
00615   def test_postOnInitialize(self):
00616     rtobj = TestComp(self._orb, self._poa)
00617     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00618     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00619     ec.bindComponent(rtobj)
00620     rtobj.postOnInitialize(0,True)
00621     return
00622 
00623   def test_postOnFinalize(self):
00624     rtobj = TestComp(self._orb, self._poa)
00625     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00626     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00627     ec.bindComponent(rtobj)
00628     rtobj.postOnFinalize(0,True)
00629     return
00630 
00631   def test_postOnStartup(self):
00632     rtobj = TestComp(self._orb, self._poa)
00633     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00634     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00635     ec.bindComponent(rtobj)
00636     rtobj.postOnStartup(0,True)
00637     return
00638 
00639   def test_postOnShutdown(self):
00640     rtobj = TestComp(self._orb, self._poa)
00641     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00642     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00643     ec.bindComponent(rtobj)
00644     rtobj.postOnShutdown(0,True)
00645     return
00646 
00647   def test_postOnActivated(self):
00648     rtobj = TestComp(self._orb, self._poa)
00649     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00650     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00651     ec.bindComponent(rtobj)
00652     rtobj.postOnActivated(0,True)
00653     return
00654 
00655   def test_postOnDeactivated(self):
00656     rtobj = TestComp(self._orb, self._poa)
00657     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00658     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00659     ec.bindComponent(rtobj)
00660     rtobj.postOnDeactivated(0,True)
00661     return
00662 
00663   def test_postOnAborting(self):
00664     rtobj = TestComp(self._orb, self._poa)
00665     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00666     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00667     ec.bindComponent(rtobj)
00668     rtobj.postOnAborting(0,True)
00669     return
00670 
00671   def test_postOnError(self):
00672     rtobj = TestComp(self._orb, self._poa)
00673     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00674     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00675     ec.bindComponent(rtobj)
00676     rtobj.postOnError(0,True)
00677     return
00678 
00679   def test_postOnReset(self):
00680     rtobj = TestComp(self._orb, self._poa)
00681     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00682     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00683     ec.bindComponent(rtobj)
00684     rtobj.postOnReset(0,True)
00685     return
00686 
00687   def test_postOnExecute(self):
00688     rtobj = TestComp(self._orb, self._poa)
00689     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00690     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00691     ec.bindComponent(rtobj)
00692     rtobj.postOnExecute(0,True)
00693     return
00694 
00695   def test_postOnStateUpdate(self):
00696     rtobj = TestComp(self._orb, self._poa)
00697     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00698     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00699     ec.bindComponent(rtobj)
00700     rtobj.postOnStateUpdate(0,True)
00701     return
00702 
00703   def test_postOnRateChanged(self):
00704     rtobj = TestComp(self._orb, self._poa)
00705     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00706     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00707     ec.bindComponent(rtobj)
00708     rtobj.postOnRateChanged(0,True)
00709     return
00710 
00711   def test_onAddPort(self):
00712     rtobj = TestComp(self._orb, self._poa)
00713     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00714     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00715     ec.bindComponent(rtobj)
00716     rtobj.onAddPort(0)
00717     return
00718 
00719   def test_onRemovePort(self):
00720     rtobj = TestComp(self._orb, self._poa)
00721     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00722     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00723     ec.bindComponent(rtobj)
00724     rtobj.onRemovePort(0)
00725     return
00726 
00727   def test_onAttachExecutionContext(self):
00728     rtobj = TestComp(self._orb, self._poa)
00729     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00730     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00731     ec.bindComponent(rtobj)
00732     rtobj.onAttachExecutionContext(0)
00733     return
00734 
00735   def test_onDetachExecutionContext(self):
00736     rtobj = TestComp(self._orb, self._poa)
00737     ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
00738     ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
00739     ec.bindComponent(rtobj)
00740     rtobj.onDetachExecutionContext(0)
00741     return
00742 
00743 
00744 ############### test #################
00745 if __name__ == '__main__':
00746   unittest.main()


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Thu Aug 27 2015 14:17:28