test_PeriodicExecutionContext.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- Python -*-
3 
4 #
5 # \file test_PeriodicExecutionContext.py
6 # \brief test for PeriodicExecutionContext class
7 # \date $Date: 2007/08/28$
8 # \author Shinji Kurihara
9 #
10 # Copyright (C) 2006
11 # Task-intelligence Research Group,
12 # Intelligent Systems Research Institute,
13 # National Institute of
14 # Advanced Industrial Science and Technology (AIST), Japan
15 # All rights reserved.
16 #
17 
18 import sys
19 sys.path.insert(1,"../")
20 
21 import unittest
22 import OpenRTM_aist
23 from PeriodicExecutionContext import *
24 
25 import RTC
26 import RTC__POA
27 import SDOPackage
28 from omniORB import CORBA, PortableServer
29 
30 
31 #class DFP(RTC__POA.DataFlowComponent):
33  def __init__(self):
34  self._orb = CORBA.ORB_init()
35  self._poa = self._orb.resolve_initial_references("RootPOA")
36  OpenRTM_aist.RTObject_impl.__init__(self, orb=self._orb, poa=self._poa)
37  self._error = False
38  self._ref = self._this()
39  self._eclist = []
40 
41  def getRef(self):
42  return self._ref
43 
44  def gotoError(self):
45  self._error = True
46 
47  def initialize(self):
48  return RTC.RTC_OK
49 
50  def finalize(self):
51  return RTC.RTC_OK
52 
53  def exit(self):
54  return RTC.RTC_OK
55 
56  def is_alive(self):
57  return True
58 
59  def get_contexts(self):
60  return self._eclist
61 
63  self._eclist.append(ec)
64  return len(self._eclist) -1
65 
67  return None
68 
69  def get_ports(self):
70  return None
71 
73  return None
74 
75  def on_initialize(self):
76  print("on_initialize()")
77  return RTC.RTC_OK
78 
79  def on_finalize(self):
80  print("on_finalize()")
81  return RTC.RTC_OK
82 
83  def on_startup(self, id):
84  print("on_startup()")
85  return RTC.RTC_OK
86 
87  def on_shutdown(self, id):
88  print("shutdown()")
89  return RTC.RTC_OK
90 
91  def on_activated(self, id):
92  print("activated()")
93  return RTC.RTC_OK
94 
95  def on_deactivated(self, id):
96  print("deactivated()")
97  return RTC.RTC_OK
98 
99  def on_aborting(self, id):
100  print("on_aborting()")
101  return RTC.RTC_OK
102 
103  def on_error(self, id):
104  print("on_error()")
105  return RTC.RTC_OK
106 
107  def on_reset(self, id):
108  print("on_reset()")
109  return RTC.RTC_OK
110 
111  def on_execute(self, id):
112  print("on_execute()")
113  if self._error:
114  self._error = False
115  return RTC.RTC_ERROR
116  return RTC.RTC_OK
117 
118  def on_state_update(self, id):
119  print("on_state_update()")
120  return RTC.RTC_OK
121 
122  def on_rate_changed(self, id):
123  print("on_rate_changed()")
124  return RTC.RTC_OK
125 
127  return None
128 
129  def get_sdo_id(self):
130  return None
131 
132  def get_sdo_type(self):
133  return None
134 
136  return None
137 
139  return None
140 
141  def get_service_profile(self, id):
142  return None
143 
144  def get_sdo_service(self):
145  return SDOPackage.SDOService._nil()
146 
147  def get_configuration(self):
148  return SDOPackage.Configuration._nil()
149 
150  def get_monitoring(self):
151  return SDOPackage.Monitoring._nil()
152 
153  def get_organizations(self):
154  return None
155 
156  def get_status_list(self):
157  return None
158 
159  def get_status(self, name):
160  return None
161 
162 
163 class TestPeriodicExecutionContext(unittest.TestCase):
164  def setUp(self):
165  self._dfp = DFP()
166  self._dfp._poa._get_the_POAManager().activate()
167 
168  self._pec = PeriodicExecutionContext(self._dfp._ref, 10)
169  self._pec.add_component(self._dfp.getRef())
170  self._pec.start()
171 
172  def tearDown(self):
173  self._pec.stop()
174  # import gc
175  self._pec.__del__()
176  self._pec = None
177  OpenRTM_aist.Manager.instance().shutdownManager()
178 
179  def getState(self, state):
180  if state == 0:
181  return "INACTIVE_STATE"
182  elif state == 1:
183  return "ACTIVE_STATE"
184  elif state == 2:
185  return "ERROR_STATE"
186  elif state == 3:
187  return "UNKNOWN_STATE"
188 
189  return "INVALID_STATE"
190 
191 
192  def test_case(self):
193  print("rate: ", self._pec.get_rate())
194  print("kind: ", self._pec.get_kind())
195  self._pec.activate_component(self._dfp.getRef())
196  #for i in range(3):
197  # state = self._pec.get_component_state(self._dfp.getRef())
198  # print self.getState(state)
199  #self._pec.stop()
200  #print "is_running : ", self._pec.is_running()
201  import time
202  time.sleep(1)
203 
204 
205  def test_set_rate(self):
206  self._pec.set_rate(1000)
207  print("get rate: ", self._pec.get_rate())
208 
209 
211  #self._pec.activate_component(self._dfp.getRef())
212  #self._pec.deactivate_component(self._dfp.getRef())
213  pass
214 
215 
216 
218  self._pec.reset_component(self._dfp.getRef())
219  print("reset state: ", self._pec.get_component_state(self._dfp.getRef()))
220  self._pec.activate_component(self._dfp.getRef())
221  print("activate state: ", self._pec.get_component_state(self._dfp.getRef()))
222 
223 
224  def test_remove(self):
225  self._pec.remove_component(self._dfp.getRef())
226  self._pec.activate_component(self._dfp.getRef())
227  print("activate state: ", self._pec.get_component_state(self._dfp.getRef()))
228 
229 
230  def test_get_profile(self):
231  print("get_profile.kind: ", self._pec.get_profile().kind)
232 
233 
234 
235 
236 if __name__ == '__main__':
237  unittest.main()
OpenRTM_aist.RTObject.RTObject_impl._eclist
_eclist
Definition: RTObject.py:124
test_PeriodicExecutionContext.DFP.on_startup
def on_startup(self, id)
[ComponentAction CORBA interface] StartUp RTC
Definition: test_PeriodicExecutionContext.py:83
test_PeriodicExecutionContext.TestPeriodicExecutionContext.getState
def getState(self, state)
Definition: test_PeriodicExecutionContext.py:179
test_PeriodicExecutionContext.TestPeriodicExecutionContext.test_case
def test_case(self)
Definition: test_PeriodicExecutionContext.py:192
test_PeriodicExecutionContext.TestPeriodicExecutionContext._pec
_pec
Definition: test_PeriodicExecutionContext.py:168
test_PeriodicExecutionContext.DFP
Definition: test_PeriodicExecutionContext.py:32
test_PeriodicExecutionContext.DFP.get_owned_organizations
def get_owned_organizations(self)
[SDO interface] Getting Organizations
Definition: test_PeriodicExecutionContext.py:126
test_PeriodicExecutionContext.DFP.get_sdo_service
def get_sdo_service(self)
Definition: test_PeriodicExecutionContext.py:144
test_PeriodicExecutionContext.DFP.on_execute
def on_execute(self, id)
[DataFlowComponentAction CORBA interface] Primary Periodic Operation of RTC
Definition: test_PeriodicExecutionContext.py:111
test_PeriodicExecutionContext.DFP.initialize
def initialize(self)
Initialize the RTC that realizes this interface.
Definition: test_PeriodicExecutionContext.py:47
test_PeriodicExecutionContext.TestPeriodicExecutionContext.tearDown
def tearDown(self)
Definition: test_PeriodicExecutionContext.py:172
test_PeriodicExecutionContext.DFP.finalize
def finalize(self)
Finalize the RTC for preparing it for destruction.
Definition: test_PeriodicExecutionContext.py:50
OpenRTM_aist.RTObject.RTObject_impl._orb
_orb
Definition: RTObject.py:94
test_PeriodicExecutionContext.DFP.on_finalize
def on_finalize(self)
[ComponentAction CORBA interface] Finalize RTC
Definition: test_PeriodicExecutionContext.py:79
test_PeriodicExecutionContext.TestPeriodicExecutionContext.test_get_profile
def test_get_profile(self)
Definition: test_PeriodicExecutionContext.py:230
test_PeriodicExecutionContext.TestPeriodicExecutionContext.setUp
def setUp(self)
Definition: test_PeriodicExecutionContext.py:164
test_PeriodicExecutionContext.DFP.get_configuration
def get_configuration(self)
[SDO interface] Getting Configuration object
Definition: test_PeriodicExecutionContext.py:147
test_PeriodicExecutionContext.DFP.get_service_profile
def get_service_profile(self, id)
[SDO interface] Getting Organizations
Definition: test_PeriodicExecutionContext.py:141
test_PeriodicExecutionContext.DFP.on_deactivated
def on_deactivated(self, id)
[ComponentAction CORBA interface] Deactivate RTC
Definition: test_PeriodicExecutionContext.py:95
test_PeriodicExecutionContext.DFP.get_ports
def get_ports(self)
[RTObject CORBA interface] Get Ports
Definition: test_PeriodicExecutionContext.py:69
test_PeriodicExecutionContext.DFP._error
_error
Definition: test_PeriodicExecutionContext.py:37
test_PeriodicExecutionContext.TestPeriodicExecutionContext.test_remove
def test_remove(self)
Definition: test_PeriodicExecutionContext.py:224
test_PeriodicExecutionContext.DFP.on_shutdown
def on_shutdown(self, id)
[ComponentAction CORBA interface] ShutDown RTC
Definition: test_PeriodicExecutionContext.py:87
test_PeriodicExecutionContext.DFP.on_error
def on_error(self, id)
[ComponentAction CORBA interface] Error Processing of RTC
Definition: test_PeriodicExecutionContext.py:103
test_PeriodicExecutionContext.TestPeriodicExecutionContext.test_set_rate
def test_set_rate(self)
Definition: test_PeriodicExecutionContext.py:205
test_PeriodicExecutionContext.DFP.__init__
def __init__(self)
Definition: test_PeriodicExecutionContext.py:33
test_PeriodicExecutionContext.DFP._ref
_ref
Definition: test_PeriodicExecutionContext.py:38
test_PeriodicExecutionContext.DFP.get_organizations
def get_organizations(self)
[SDO interface] Getting Organizations
Definition: test_PeriodicExecutionContext.py:153
test_PeriodicExecutionContext.DFP.set_execution_context_service
def set_execution_context_service(self, ec)
Definition: test_PeriodicExecutionContext.py:62
test_PeriodicExecutionContext.DFP.get_contexts
def get_contexts(self)
Definition: test_PeriodicExecutionContext.py:59
test_PeriodicExecutionContext.TestPeriodicExecutionContext.test_activate_component
def test_activate_component(self)
Definition: test_PeriodicExecutionContext.py:210
test_PeriodicExecutionContext.TestPeriodicExecutionContext.test_reset_component
def test_reset_component(self)
Definition: test_PeriodicExecutionContext.py:217
test_PeriodicExecutionContext.DFP.gotoError
def gotoError(self)
Definition: test_PeriodicExecutionContext.py:44
PeriodicExecutionContext
test_PeriodicExecutionContext.DFP.on_activated
def on_activated(self, id)
[ComponentAction CORBA interface] Activate RTC
Definition: test_PeriodicExecutionContext.py:91
test_PeriodicExecutionContext.DFP.get_monitoring
def get_monitoring(self)
[SDO interface] Get Monitoring object
Definition: test_PeriodicExecutionContext.py:150
test_PeriodicExecutionContext.TestPeriodicExecutionContext._dfp
_dfp
Definition: test_PeriodicExecutionContext.py:165
test_PeriodicExecutionContext.DFP.get_status
def get_status(self, name)
[SDO interface] Get SDO Status
Definition: test_PeriodicExecutionContext.py:159
test_PeriodicExecutionContext.DFP.get_device_profile
def get_device_profile(self)
[SDO interface] Getting SDO DeviceProfile
Definition: test_PeriodicExecutionContext.py:135
test_PeriodicExecutionContext.DFP.get_sdo_type
def get_sdo_type(self)
[SDO interface] Getting SDO type
Definition: test_PeriodicExecutionContext.py:132
OpenRTM_aist.RTObject.RTObject_impl._poa
_poa
Definition: RTObject.py:95
test_PeriodicExecutionContext.DFP.on_rate_changed
def on_rate_changed(self, id)
[DataFlowComponentAction CORBA interface] Notify rate chenged
Definition: test_PeriodicExecutionContext.py:122
test_PeriodicExecutionContext.DFP.get_sdo_id
def get_sdo_id(self)
[SDO interface] Getting SDO ID
Definition: test_PeriodicExecutionContext.py:129
test_PeriodicExecutionContext.DFP.on_state_update
def on_state_update(self, id)
[DataFlowComponentAction CORBA interface] Secondary Periodic Operation of RTC
Definition: test_PeriodicExecutionContext.py:118
test_PeriodicExecutionContext.DFP.getRef
def getRef(self)
Definition: test_PeriodicExecutionContext.py:41
test_PeriodicExecutionContext.DFP.on_initialize
def on_initialize(self)
[ComponentAction CORBA interface] Initialize RTC
Definition: test_PeriodicExecutionContext.py:75
test_PeriodicExecutionContext.DFP.on_reset
def on_reset(self, id)
[ComponentAction CORBA interface] Resetting RTC
Definition: test_PeriodicExecutionContext.py:107
test_PeriodicExecutionContext.DFP.exit
def exit(self)
Stop the RTC's execution context(s) and finalize it along with its contents.
Definition: test_PeriodicExecutionContext.py:53
OpenRTM_aist.RTObject.RTObject_impl
Definition: RTObject.py:68
test_PeriodicExecutionContext.DFP.get_execution_context_services
def get_execution_context_services(self)
Definition: test_PeriodicExecutionContext.py:72
test_PeriodicExecutionContext.DFP.on_aborting
def on_aborting(self, id)
[ComponentAction CORBA interface] Transition Error State
Definition: test_PeriodicExecutionContext.py:99
test_PeriodicExecutionContext.TestPeriodicExecutionContext
Definition: test_PeriodicExecutionContext.py:163
test_PeriodicExecutionContext.DFP.get_service_profiles
def get_service_profiles(self)
[SDO interface] Getting SDO ServiceProfile
Definition: test_PeriodicExecutionContext.py:138
test_PeriodicExecutionContext.DFP.get_status_list
def get_status_list(self)
[SDO interface] Get SDO Status
Definition: test_PeriodicExecutionContext.py:156
test_PeriodicExecutionContext.DFP.is_alive
def is_alive(self)
Definition: test_PeriodicExecutionContext.py:56
OpenRTM_aist.NVUtil.append
def append(dest, src)
Definition: NVUtil.py:386
test_PeriodicExecutionContext.DFP.get_component_profile
def get_component_profile(self)
[RTObject CORBA interface] Get RTC's profile
Definition: test_PeriodicExecutionContext.py:66


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Mon Apr 21 2025 02:45:07