Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 import sys
00019 sys.path.insert(1,"../")
00020
00021 import unittest
00022 import OpenRTM_aist
00023 from PeriodicExecutionContext import *
00024
00025 import RTC
00026 import RTC__POA
00027 import SDOPackage
00028 from omniORB import CORBA, PortableServer
00029
00030
00031
00032 class DFP(OpenRTM_aist.RTObject_impl):
00033 def __init__(self):
00034 self._orb = CORBA.ORB_init()
00035 self._poa = self._orb.resolve_initial_references("RootPOA")
00036 OpenRTM_aist.RTObject_impl.__init__(self, orb=self._orb, poa=self._poa)
00037 self._error = False
00038 self._ref = self._this()
00039 self._eclist = []
00040
00041 def getRef(self):
00042 return self._ref
00043
00044 def gotoError(self):
00045 self._error = True
00046
00047 def initialize(self):
00048 return RTC.RTC_OK
00049
00050 def finalize(self):
00051 return RTC.RTC_OK
00052
00053 def exit(self):
00054 return RTC.RTC_OK
00055
00056 def is_alive(self):
00057 return True
00058
00059 def get_contexts(self):
00060 return self._eclist
00061
00062 def set_execution_context_service(self, ec):
00063 self._eclist.append(ec)
00064 return len(self._eclist) -1
00065
00066 def get_component_profile(self):
00067 return None
00068
00069 def get_ports(self):
00070 return None
00071
00072 def get_execution_context_services(self):
00073 return None
00074
00075 def on_initialize(self):
00076 print "on_initialize()"
00077 return RTC.RTC_OK
00078
00079 def on_finalize(self):
00080 print "on_finalize()"
00081 return RTC.RTC_OK
00082
00083 def on_startup(self, id):
00084 print "on_startup()"
00085 return RTC.RTC_OK
00086
00087 def on_shutdown(self, id):
00088 print "shutdown()"
00089 return RTC.RTC_OK
00090
00091 def on_activated(self, id):
00092 print "activated()"
00093 return RTC.RTC_OK
00094
00095 def on_deactivated(self, id):
00096 print "deactivated()"
00097 return RTC.RTC_OK
00098
00099 def on_aborting(self, id):
00100 print "on_aborting()"
00101 return RTC.RTC_OK
00102
00103 def on_error(self, id):
00104 print "on_error()"
00105 return RTC.RTC_OK
00106
00107 def on_reset(self, id):
00108 print "on_reset()"
00109 return RTC.RTC_OK
00110
00111 def on_execute(self, id):
00112 print "on_execute()"
00113 if self._error:
00114 self._error = False
00115 return RTC.RTC_ERROR
00116 return RTC.RTC_OK
00117
00118 def on_state_update(self, id):
00119 print "on_state_update()"
00120 return RTC.RTC_OK
00121
00122 def on_rate_changed(self, id):
00123 print "on_rate_changed()"
00124 return RTC.RTC_OK
00125
00126 def get_owned_organizations(self):
00127 return None
00128
00129 def get_sdo_id(self):
00130 return None
00131
00132 def get_sdo_type(self):
00133 return None
00134
00135 def get_device_profile(self):
00136 return None
00137
00138 def get_service_profiles(self):
00139 return None
00140
00141 def get_service_profile(self, id):
00142 return None
00143
00144 def get_sdo_service(self):
00145 return SDOPackage.SDOService._nil()
00146
00147 def get_configuration(self):
00148 return SDOPackage.Configuration._nil()
00149
00150 def get_monitoring(self):
00151 return SDOPackage.Monitoring._nil()
00152
00153 def get_organizations(self):
00154 return None
00155
00156 def get_status_list(self):
00157 return None
00158
00159 def get_status(self, name):
00160 return None
00161
00162
00163 class TestPeriodicExecutionContext(unittest.TestCase):
00164 def setUp(self):
00165 self._dfp = DFP()
00166 self._dfp._poa._get_the_POAManager().activate()
00167
00168 self._pec = PeriodicExecutionContext(self._dfp._ref, 10)
00169 self._pec.add_component(self._dfp.getRef())
00170 self._pec.start()
00171
00172 def tearDown(self):
00173 self._pec.stop()
00174
00175 self._pec.__del__()
00176 self._pec = None
00177 OpenRTM_aist.Manager.instance().shutdownManager()
00178
00179 def getState(self, state):
00180 if state == 0:
00181 return "INACTIVE_STATE"
00182 elif state == 1:
00183 return "ACTIVE_STATE"
00184 elif state == 2:
00185 return "ERROR_STATE"
00186 elif state == 3:
00187 return "UNKNOWN_STATE"
00188
00189 return "INVALID_STATE"
00190
00191
00192 def test_case(self):
00193 print "rate: ", self._pec.get_rate()
00194 print "kind: ", self._pec.get_kind()
00195 self._pec.activate_component(self._dfp.getRef())
00196
00197
00198
00199
00200
00201 import time
00202 time.sleep(1)
00203
00204
00205 def test_set_rate(self):
00206 self._pec.set_rate(1000)
00207 print "get rate: ", self._pec.get_rate()
00208
00209
00210 def test_activate_component(self):
00211
00212
00213 pass
00214
00215
00216
00217 def test_reset_component(self):
00218 self._pec.reset_component(self._dfp.getRef())
00219 print "reset state: ", self._pec.get_component_state(self._dfp.getRef())
00220 self._pec.activate_component(self._dfp.getRef())
00221 print "activate state: ", self._pec.get_component_state(self._dfp.getRef())
00222
00223
00224 def test_remove(self):
00225 self._pec.remove_component(self._dfp.getRef())
00226 self._pec.activate_component(self._dfp.getRef())
00227 print "activate state: ", self._pec.get_component_state(self._dfp.getRef())
00228
00229
00230 def test_get_profile(self):
00231 print "get_profile.kind: ", self._pec.get_profile().kind
00232
00233
00234
00235
00236 if __name__ == '__main__':
00237 unittest.main()