test_PeriodicExecutionContext.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- Python -*-
3 
4 #
5 # \file test_PeriodicExecutionContext.py
6 # \brief test for PeriodicExecutionContext class
7 # \date $Date: 2007/08/28$
8 # \author Shinji Kurihara
9 #
10 # Copyright (C) 2006
11 # Task-intelligence Research Group,
12 # Intelligent Systems Research Institute,
13 # National Institute of
14 # Advanced Industrial Science and Technology (AIST), Japan
15 # All rights reserved.
16 #
17 
18 import sys
19 sys.path.insert(1,"../")
20 
21 import unittest
22 import OpenRTM_aist
23 from PeriodicExecutionContext import *
24 
25 import RTC
26 import RTC__POA
27 import SDOPackage
28 from omniORB import CORBA, PortableServer
29 
30 
31 #class DFP(RTC__POA.DataFlowComponent):
33  def __init__(self):
34  self._orb = CORBA.ORB_init()
35  self._poa = self._orb.resolve_initial_references("RootPOA")
36  OpenRTM_aist.RTObject_impl.__init__(self, orb=self._orb, poa=self._poa)
37  self._error = False
38  self._ref = self._this()
39  self._eclist = []
40 
41  def getRef(self):
42  return self._ref
43 
44  def gotoError(self):
45  self._error = True
46 
47  def initialize(self):
48  return RTC.RTC_OK
49 
50  def finalize(self):
51  return RTC.RTC_OK
52 
53  def exit(self):
54  return RTC.RTC_OK
55 
56  def is_alive(self):
57  return True
58 
59  def get_contexts(self):
60  return self._eclist
61 
63  self._eclist.append(ec)
64  return len(self._eclist) -1
65 
67  return None
68 
69  def get_ports(self):
70  return None
71 
73  return None
74 
75  def on_initialize(self):
76  print "on_initialize()"
77  return RTC.RTC_OK
78 
79  def on_finalize(self):
80  print "on_finalize()"
81  return RTC.RTC_OK
82 
83  def on_startup(self, id):
84  print "on_startup()"
85  return RTC.RTC_OK
86 
87  def on_shutdown(self, id):
88  print "shutdown()"
89  return RTC.RTC_OK
90 
91  def on_activated(self, id):
92  print "activated()"
93  return RTC.RTC_OK
94 
95  def on_deactivated(self, id):
96  print "deactivated()"
97  return RTC.RTC_OK
98 
99  def on_aborting(self, id):
100  print "on_aborting()"
101  return RTC.RTC_OK
102 
103  def on_error(self, id):
104  print "on_error()"
105  return RTC.RTC_OK
106 
107  def on_reset(self, id):
108  print "on_reset()"
109  return RTC.RTC_OK
110 
111  def on_execute(self, id):
112  print "on_execute()"
113  if self._error:
114  self._error = False
115  return RTC.RTC_ERROR
116  return RTC.RTC_OK
117 
118  def on_state_update(self, id):
119  print "on_state_update()"
120  return RTC.RTC_OK
121 
122  def on_rate_changed(self, id):
123  print "on_rate_changed()"
124  return RTC.RTC_OK
125 
127  return None
128 
129  def get_sdo_id(self):
130  return None
131 
132  def get_sdo_type(self):
133  return None
134 
136  return None
137 
139  return None
140 
141  def get_service_profile(self, id):
142  return None
143 
144  def get_sdo_service(self):
145  return SDOPackage.SDOService._nil()
146 
147  def get_configuration(self):
148  return SDOPackage.Configuration._nil()
149 
150  def get_monitoring(self):
151  return SDOPackage.Monitoring._nil()
152 
153  def get_organizations(self):
154  return None
155 
156  def get_status_list(self):
157  return None
158 
159  def get_status(self, name):
160  return None
161 
162 
163 class TestPeriodicExecutionContext(unittest.TestCase):
164  def setUp(self):
165  self._dfp = DFP()
166  self._dfp._poa._get_the_POAManager().activate()
167 
168  self._pec = PeriodicExecutionContext(self._dfp._ref, 10)
169  self._pec.add_component(self._dfp.getRef())
170  self._pec.start()
171 
172  def tearDown(self):
173  self._pec.stop()
174  # import gc
175  self._pec.__del__()
176  self._pec = None
177  OpenRTM_aist.Manager.instance().shutdownManager()
178 
179  def getState(self, state):
180  if state == 0:
181  return "INACTIVE_STATE"
182  elif state == 1:
183  return "ACTIVE_STATE"
184  elif state == 2:
185  return "ERROR_STATE"
186  elif state == 3:
187  return "UNKNOWN_STATE"
188 
189  return "INVALID_STATE"
190 
191 
192  def test_case(self):
193  print "rate: ", self._pec.get_rate()
194  print "kind: ", self._pec.get_kind()
195  self._pec.activate_component(self._dfp.getRef())
196  #for i in range(3):
197  # state = self._pec.get_component_state(self._dfp.getRef())
198  # print self.getState(state)
199  #self._pec.stop()
200  #print "is_running : ", self._pec.is_running()
201  import time
202  time.sleep(1)
203 
204 
205  def test_set_rate(self):
206  self._pec.set_rate(1000)
207  print "get rate: ", self._pec.get_rate()
208 
209 
211  #self._pec.activate_component(self._dfp.getRef())
212  #self._pec.deactivate_component(self._dfp.getRef())
213  pass
214 
215 
216 
218  self._pec.reset_component(self._dfp.getRef())
219  print "reset state: ", self._pec.get_component_state(self._dfp.getRef())
220  self._pec.activate_component(self._dfp.getRef())
221  print "activate state: ", self._pec.get_component_state(self._dfp.getRef())
222 
223 
224  def test_remove(self):
225  self._pec.remove_component(self._dfp.getRef())
226  self._pec.activate_component(self._dfp.getRef())
227  print "activate state: ", self._pec.get_component_state(self._dfp.getRef())
228 
229 
230  def test_get_profile(self):
231  print "get_profile.kind: ", self._pec.get_profile().kind
232 
233 
234 
235 ############### test #################
236 if __name__ == '__main__':
237  unittest.main()


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Thu Jun 6 2019 19:11:35