test_PeriodicExecutionContext.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- Python -*-
00003 
00004 #
00005 # \file test_PeriodicExecutionContext.py
00006 # \brief test for PeriodicExecutionContext class
00007 # \date $Date: 2007/08/28$
00008 # \author Shinji Kurihara
00009 #
00010 # Copyright (C) 2006
00011 #     Task-intelligence Research Group,
00012 #     Intelligent Systems Research Institute,
00013 #     National Institute of
00014 #         Advanced Industrial Science and Technology (AIST), Japan
00015 #     All rights reserved.
00016 #
00017 
00018 import sys
00019 sys.path.insert(1,"../")
00020 
00021 import unittest
00022 import OpenRTM_aist
00023 from PeriodicExecutionContext import *
00024 
00025 import RTC
00026 import RTC__POA
00027 import SDOPackage
00028 from omniORB import CORBA, PortableServer
00029 
00030 
00031 #class DFP(RTC__POA.DataFlowComponent):
00032 class DFP(OpenRTM_aist.RTObject_impl):
00033   def __init__(self):
00034     self._orb = CORBA.ORB_init()
00035     self._poa = self._orb.resolve_initial_references("RootPOA")
00036     OpenRTM_aist.RTObject_impl.__init__(self, orb=self._orb, poa=self._poa)
00037     self._error = False
00038     self._ref = self._this()
00039     self._eclist = []
00040 
00041   def getRef(self):
00042     return self._ref
00043 
00044   def gotoError(self):
00045     self._error = True
00046 
00047   def initialize(self):
00048     return RTC.RTC_OK
00049 
00050   def finalize(self):
00051     return RTC.RTC_OK
00052 
00053   def exit(self):
00054     return RTC.RTC_OK
00055 
00056   def is_alive(self):
00057     return True
00058 
00059   def get_contexts(self):
00060     return self._eclist
00061 
00062   def set_execution_context_service(self, ec):
00063     self._eclist.append(ec)
00064     return len(self._eclist) -1 
00065 
00066   def get_component_profile(self):
00067     return None
00068 
00069   def get_ports(self):
00070     return None
00071 
00072   def get_execution_context_services(self):
00073     return None
00074 
00075   def on_initialize(self):
00076     print "on_initialize()"
00077     return RTC.RTC_OK
00078 
00079   def on_finalize(self):
00080     print "on_finalize()"
00081     return RTC.RTC_OK
00082 
00083   def on_startup(self, id):
00084     print "on_startup()"
00085     return RTC.RTC_OK
00086 
00087   def on_shutdown(self, id):
00088     print "shutdown()"
00089     return RTC.RTC_OK
00090 
00091   def on_activated(self, id):
00092     print "activated()"
00093     return RTC.RTC_OK
00094 
00095   def on_deactivated(self, id):
00096     print "deactivated()"
00097     return RTC.RTC_OK
00098 
00099   def on_aborting(self, id):
00100     print "on_aborting()"
00101     return RTC.RTC_OK
00102   
00103   def on_error(self, id):
00104     print "on_error()"
00105     return RTC.RTC_OK
00106   
00107   def on_reset(self, id):
00108     print "on_reset()"
00109     return RTC.RTC_OK
00110 
00111   def on_execute(self, id):
00112     print "on_execute()"
00113     if self._error:
00114       self._error = False
00115       return RTC.RTC_ERROR
00116     return RTC.RTC_OK
00117     
00118   def on_state_update(self, id):
00119     print "on_state_update()"
00120     return RTC.RTC_OK
00121 
00122   def on_rate_changed(self, id):
00123     print "on_rate_changed()"
00124     return RTC.RTC_OK
00125 
00126   def get_owned_organizations(self):
00127     return None
00128 
00129   def get_sdo_id(self):
00130     return None
00131 
00132   def get_sdo_type(self):
00133     return None
00134 
00135   def get_device_profile(self):
00136     return None
00137 
00138   def get_service_profiles(self):
00139     return None
00140 
00141   def get_service_profile(self, id):
00142     return None
00143 
00144   def get_sdo_service(self):
00145     return SDOPackage.SDOService._nil()
00146 
00147   def get_configuration(self):
00148     return SDOPackage.Configuration._nil()
00149 
00150   def get_monitoring(self):
00151     return SDOPackage.Monitoring._nil()
00152 
00153   def get_organizations(self):
00154     return None
00155 
00156   def get_status_list(self):
00157     return None
00158 
00159   def get_status(self, name):
00160     return None
00161   
00162 
00163 class TestPeriodicExecutionContext(unittest.TestCase):
00164   def setUp(self):
00165     self._dfp = DFP()
00166     self._dfp._poa._get_the_POAManager().activate()
00167 
00168     self._pec = PeriodicExecutionContext(self._dfp._ref, 10)
00169     self._pec.add_component(self._dfp.getRef())
00170     self._pec.start()
00171   
00172   def tearDown(self):
00173     self._pec.stop()
00174     # import gc
00175     self._pec.__del__()
00176     self._pec = None
00177     OpenRTM_aist.Manager.instance().shutdownManager()
00178 
00179   def getState(self, state):
00180     if state == 0:
00181       return "INACTIVE_STATE"
00182     elif state == 1:
00183       return "ACTIVE_STATE"
00184     elif state == 2:
00185       return "ERROR_STATE"
00186     elif state == 3:
00187       return "UNKNOWN_STATE"
00188 
00189     return "INVALID_STATE"
00190     
00191 
00192   def test_case(self):
00193     print "rate: ", self._pec.get_rate()
00194     print "kind: ", self._pec.get_kind()
00195     self._pec.activate_component(self._dfp.getRef())
00196     #for i in range(3):
00197     # state = self._pec.get_component_state(self._dfp.getRef())
00198     # print self.getState(state)
00199     #self._pec.stop()
00200     #print "is_running : ", self._pec.is_running()
00201     import time
00202     time.sleep(1)
00203 
00204   
00205   def test_set_rate(self):
00206     self._pec.set_rate(1000)
00207     print "get rate: ", self._pec.get_rate()
00208 
00209   
00210   def test_activate_component(self):
00211     #self._pec.activate_component(self._dfp.getRef())
00212     #self._pec.deactivate_component(self._dfp.getRef())
00213     pass
00214 
00215 
00216 
00217   def test_reset_component(self):
00218     self._pec.reset_component(self._dfp.getRef())
00219     print "reset state: ", self._pec.get_component_state(self._dfp.getRef())
00220     self._pec.activate_component(self._dfp.getRef())
00221     print "activate state: ", self._pec.get_component_state(self._dfp.getRef())
00222 
00223   
00224   def test_remove(self):
00225     self._pec.remove_component(self._dfp.getRef())
00226     self._pec.activate_component(self._dfp.getRef())
00227     print "activate state: ", self._pec.get_component_state(self._dfp.getRef())
00228 
00229 
00230   def test_get_profile(self):
00231     print "get_profile.kind: ", self._pec.get_profile().kind
00232 
00233   
00234 
00235 ############### test #################
00236 if __name__ == '__main__':
00237         unittest.main()


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Thu Aug 27 2015 14:17:28