test_RTObject.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- Python -*-
3 
4 
5 ## \file test_RTObject.py
6 ## \brief test for RT component base class
7 ## \date $Date: $
8 ## \author Shinji Kurihara
9 #
10 # Copyright (C) 2006
11 # Task-intelligence Research Group,
12 # Intelligent Systems Research Institute,
13 # National Institute of
14 # Advanced Industrial Science and Technology (AIST), Japan
15 # All rights reserved.
16 #
17 
18 import sys,time
19 sys.path.insert(1,"../")
20 sys.path.insert(1,"../RTM_IDL")
21 
22 import RTC
23 import OpenRTM
24 import SDOPackage
25 import OpenRTM_aist
26 from omniORB import CORBA, PortableServer
27 from omniORB import any
28 import unittest
29 
30 configsample_spec = ["implementation_id", "TestComp",
31  "type_name", "TestComp",
32  "description", "Test example component",
33  "version", "1.0",
34  "vendor", "Shinji Kurihara, AIST",
35  "category", "example",
36  "activity_type", "DataFlowComponent",
37  "max_instance", "10",
38  "language", "Python",
39  "lang_type", "script",
40  # Configuration variables
41  "conf.default.int_param0", "0",
42  "conf.default.int_param1", "1",
43  "conf.default.double_param0", "0.11",
44  "conf.default.double_param1", "9.9",
45  "conf.default.str_param0", "hoge",
46  "conf.default.str_param1", "dara",
47  "conf.default.vector_param0", "0.0,1.0,2.0,3.0,4.0",
48  ""]
49 
50 com = None
51 
53  def __init__(self, orb_, poa_):
54  OpenRTM_aist.RTObject_impl.__init__(self, orb=orb_, poa=poa_)
55 
56  def onInitialize(self):
57  print "onInitialize"
58  return RTC.RTC_OK
59 
60  def onFinalize(self):
61  print "onFinalize"
62  return RTC.RTC_OK
63 
64  def onStartup(self, ec_id):
65  print "onStartup"
66  return RTC.RTC_OK
67 
68  def onShutdown(self, ec_id):
69  print "onSutdown"
70  return RTC.RTC_OK
71 
72  def onActivated(self, ec_id):
73  print "onActivated"
74  return RTC.RTC_OK
75 
76  def onDeactivated(self, ec_id):
77  print "onDeactivated"
78  return RTC.RTC_OK
79 
80  def onExecute(self, ec_id):
81  print "onExecute"
82  return RTC.RTC_OK
83 
84  def onAborting(self, ec_id):
85  print "onAborting"
86  return RTC.RTC_OK
87 
88  def onReset(self, ec_id):
89  print "onReset"
90  return RTC.RTC_OK
91 
92  def onStateUpdate(self, ec_id):
93  print "onStateUpdate"
94  return RTC.RTC_OK
95 
96  def onRateChanged(self, ec_id):
97  print "onRateChanged"
98  return RTC.RTC_OK
99 
100 
101 def TestCompInit(manager):
102  print "TestCompInit"
103  global com
104  profile = OpenRTM_aist.Properties(defaults_str=configsample_spec)
105  manager.registerFactory(profile,
106  TestComp,
107  OpenRTM_aist.Delete)
108  com = manager.createComponent("TestComp")
109 
110 
112  def __init__(self):
113  self._profile = None
114  self._rtobj = None
115  return
116 
117  def __del__(self):
118  return
119 
120  def init(self, rtobj, profile):
121  self._rtobj = rtobj
122  self._profile = profile
123  return
124 
125  def reinit(self, profile):
126  return
127 
128  def getProfile(self):
129  return self._profile
130 
131  def finalize(self):
132  return
133 
134 
135 class TestRTObject_impl(unittest.TestCase):
136  def setUp(self):
137  self._orb = CORBA.ORB_init(sys.argv)
138  self._poa = self._orb.resolve_initial_references("RootPOA")
139  self._poa._get_the_POAManager().activate()
140  return
141 
142  def tearDown(self):
143  #global com
144  #self.rtobj.exit()
145  #self.manager.terminate()
146  time.sleep(0.1)
147  OpenRTM_aist.Manager.instance().shutdownManager()
148  #com = None
149  return
150 
151  def test_is_alive(self):
152  rtobj = TestComp(self._orb, self._poa)
153  ec = rtobj.getObjRef().get_context(0)
154  self.assertEqual(ec,None)
155  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
156  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
157  ec.bindComponent(rtobj)
158  self.assertNotEqual(rtobj.getObjRef().get_owned_contexts(),[])
159  self.assertEqual(rtobj.is_alive(ec.getObjRef()),True)
160  ec.remove_component(rtobj.getObjRef())
161  ec.stop()
162  del ec
163 
164  return
165 
167  rtobj = TestComp(self._orb, self._poa)
168  self.assertEqual(rtobj.getObjRef().get_owned_contexts(),[])
169  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
170  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
171  ec.bindComponent(rtobj)
172  self.assertNotEqual(rtobj.getObjRef().get_owned_contexts(),[])
173  ec.remove_component(rtobj.getObjRef())
174  ec.stop()
175  del ec
176 
177  return
178 
180  rtobj = TestComp(self._orb, self._poa)
181  self.assertEqual(rtobj.getObjRef().get_participating_contexts(),[])
182  return
183 
184  def test_get_context(self):
185  rtobj = TestComp(self._orb, self._poa)
186  print rtobj.getObjRef().get_context(0)
187  return
188 
190  rtobj = TestComp(self._orb, self._poa)
191  rtobj.setInstanceName("TestComp0")
192  prof = rtobj.getObjRef().get_component_profile()
193  self.assertEqual(prof.instance_name, "TestComp0")
194  return
195 
196  def test_get_ports(self):
197  rtobj = TestComp(self._orb, self._poa)
198  self.assertEqual(rtobj.getObjRef().get_ports(), [])
199  return
200 
201 
203  rtobj = TestComp(self._orb, self._poa)
204  ec = OpenRTM_aist.PeriodicExecutionContext(rtobj.getObjRef(), 10)
205  id = rtobj.getObjRef().attach_context(ec.getObjRef())
206  print "attach_context: ", id
207  print rtobj.getObjRef().detach_context(id)
208  poa = OpenRTM_aist.Manager.instance().getPOA()
209  poa.deactivate_object(poa.servant_to_id(ec))
210  return
211 
213  rtobj = TestComp(self._orb, self._poa)
214  self.assertEqual(rtobj.getObjRef().get_owned_organizations(),[])
215  return
216 
217  def test_get_sdo_id(self):
218  rtobj = TestComp(self._orb, self._poa)
219  rtobj.setInstanceName("TestComp0")
220  self.assertEqual(rtobj.getObjRef().get_sdo_id(), "TestComp0")
221  return
222 
223  def test_get_sdo_type(self):
224  rtobj = TestComp(self._orb, self._poa)
225  prop = OpenRTM_aist.Properties(defaults_str=configsample_spec)
226  rtobj.setProperties(prop)
227  self.assertEqual(rtobj.getObjRef().get_sdo_type(), "Test example component")
228  return
229 
231  rtobj = TestComp(self._orb, self._poa)
232  prof = rtobj.getObjRef().get_device_profile()
233  self.assertEqual(prof.device_type, "")
234  return
235 
237  rtobj = TestComp(self._orb, self._poa)
238  self.assertEqual(rtobj.getObjRef().get_service_profiles(),[])
239  return
240 
241 
243  #rtobj.getObjRef().get_service_profile("TestComp")
244  return
245 
246 
248  #rtobj.getObjRef().get_sdo_service(None)
249  return
250 
252  rtobj = TestComp(self._orb, self._poa)
253  print rtobj.getObjRef().get_configuration()
254  return
255 
257  #rtobj.getObjRef().get_monitoring()
258  return
259 
261  rtobj = TestComp(self._orb, self._poa)
262  self.assertEqual(rtobj.getObjRef().get_organizations(), [])
263  return
264 
266  rtobj = TestComp(self._orb, self._poa)
267  self.assertEqual(rtobj.getObjRef().get_status_list(), [])
268  return
269 
270  def test_get_status(self):
271  #rtobj.getObjRef().get_status("status")
272  return
273 
275  rtobj = TestComp(self._orb, self._poa)
276  self.assertEqual(rtobj.getInstanceName(), "")
277  prop = OpenRTM_aist.Properties(defaults_str=configsample_spec)
278  rtobj.setInstanceName("TestComp0")
279  rtobj.setProperties(prop)
280  self.assertEqual(rtobj.getInstanceName(), "TestComp0")
281  self.assertEqual(rtobj.getTypeName(), "TestComp")
282  self.assertEqual(rtobj.getDescription(), "Test example component")
283  self.assertEqual(rtobj.getVersion(), "1.0")
284  self.assertEqual(rtobj.getVendor(), "Shinji Kurihara, AIST")
285  self.assertEqual(rtobj.getCategory(), "example")
286  self.assertNotEqual(rtobj.getNamingNames(),["TestComp0.rtc"])
287  return
288 
289  def test_setObjRef(self):
290  rtobj = TestComp(self._orb, self._poa)
291  rtobj.setObjRef("test")
292  self.assertEqual(rtobj.getObjRef(),"test")
293  return
294 
296  rtobj = TestComp(self._orb, self._poa)
297  conf_ = [123]
298  self.assertEqual(rtobj.bindParameter("config", conf_, 0), True)
299  rtobj.updateParameters("")
300  return
301 
302  def test_PortTestCase(self):
303  rtobj = TestComp(self._orb, self._poa)
304  ringbuf = OpenRTM_aist.RingBuffer(8)
305  outp = OpenRTM_aist.OutPort("out", RTC.TimedLong(RTC.Time(0,0),0), ringbuf)
306  rtobj.registerOutPort("out",outp)
307 
308  ringbuf = OpenRTM_aist.RingBuffer(8)
309  inp = OpenRTM_aist.InPort("in", RTC.TimedLong(RTC.Time(0,0),0), ringbuf)
310  rtobj.registerInPort("in",inp)
311 
312  rtobj.deletePort(outp)
313  rtobj.deletePort(inp)
314 
315  rtobj.finalizePorts()
316  return
317 
318  # since 1.1.0
320  rtobj = TestComp(self._orb, self._poa)
321  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
322  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
323  ec.bindComponent(rtobj)
324  self.assertNotEqual(rtobj.getExecutionContext(0),None)
325  return
326 
328  rtobj = TestComp(self._orb, self._poa)
329  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
330  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
331  ec.bindComponent(rtobj)
332  self.assertEqual(rtobj.getExecutionRate(0),1000.0)
333  return
334 
336  rtobj = TestComp(self._orb, self._poa)
337  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
338  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
339  ec.bindComponent(rtobj)
340  self.assertEqual(rtobj.setExecutionRate(0,10000),RTC.RTC_OK)
341  self.assertEqual(rtobj.getExecutionRate(0),10000.0)
342  return
343 
345  rtobj = TestComp(self._orb, self._poa)
346  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
347  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
348  ec.bindComponent(rtobj)
349  self.assertEqual(rtobj.isOwnExecutionContext(0),True)
350  return
351 
353  rtobj = TestComp(self._orb, self._poa)
354  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
355  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
356  ec.set_rate(1000.0)
357  ec.bindComponent(rtobj)
358  self.assertEqual(rtobj.activate(0),RTC.RTC_OK)
359  ec.start()
360  time.sleep(0.1)
361  ret = rtobj.deactivate(0)
362  time.sleep(0.1)
363  self.assertEqual(ret,RTC.RTC_OK)
364  ec.stop()
365  return
366 
367  def test_reset(self):
368  rtobj = TestComp(self._orb, self._poa)
369  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
370  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
371  ec.bindComponent(rtobj)
372  self.assertEqual(rtobj.activate(0),RTC.RTC_OK)
373  ec.start()
374  ec._comps[0]._sm._sm.goTo(RTC.ERROR_STATE)
375  time.sleep(0.1)
376  self.assertEqual(rtobj.reset(0),RTC.RTC_OK)
377  ec.stop()
378  return
379 
381  rtobj = TestComp(self._orb, self._poa)
382  prof = SDOPackage.ServiceProfile("id","interface_type",
383  OpenRTM_aist.NVUtil.newNV("test","any"),
384  SDOPackage.SDOService._nil)
385  prov = MySdoServiceProviderBase()
386  prov.init(rtobj,prof)
387  self.assertEqual(rtobj.addSdoServiceProvider(prof, prov),True)
388  self.assertEqual(rtobj.removeSdoServiceProvider("id"),True)
389  return
390 
392  import MySdoServiceConsumer
393  OpenRTM_aist.Manager.instance().load("MySdoServiceConsumer.py",
394  "MySdoServiceConsumerInit")
395  rtobj = TestComp(self._orb, self._poa)
396  prof = SDOPackage.ServiceProfile(OpenRTM_aist.toTypename(OpenRTM.ComponentObserver),OpenRTM_aist.toTypename(OpenRTM.ComponentObserver),
397  [OpenRTM_aist.NVUtil.newNV("test","any")],
398  SDOPackage.SDOService._nil)
399  self.assertEqual(rtobj.addSdoServiceConsumer(prof),True)
400  self.assertEqual(rtobj.removeSdoServiceConsumer(OpenRTM_aist.toTypename(OpenRTM.ComponentObserver)),True)
401  return
402 
403  def prelistenerFunc(self, id):
404  print "prelistenerFunc called !!!!"
405  return
406 
408  rtobj = TestComp(self._orb, self._poa)
409 
410  rtobj.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_INITIALIZE,
411  self.prelistenerFunc)
412 
413  rtobj.removePreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_INITIALIZE,
414  self.prelistenerFunc)
415 
416  rtobj.addPreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_FINALIZE,
417  self.prelistenerFunc)
418 
419  rtobj.removePreComponentActionListener(OpenRTM_aist.PreComponentActionListenerType.PRE_ON_FINALIZE,
420  self.prelistenerFunc)
421  return
422 
423  def postlistenerFunc(self, id, ret):
424  print "postlistenerFunc called !!!!"
425  return
426 
428  rtobj = TestComp(self._orb, self._poa)
429 
430  rtobj.addPostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_FINALIZE,
431  self.postlistenerFunc)
432 
433  rtobj.removePostComponentActionListener(OpenRTM_aist.PostComponentActionListenerType.POST_ON_FINALIZE,
434  self.postlistenerFunc)
435  return
436 
438  rtobj = TestComp(self._orb, self._poa)
439 
440  rtobj.addPortActionListener(OpenRTM_aist.PortActionListenerType.ADD_PORT,
441  self.prelistenerFunc)
442 
443  rtobj.removePortActionListener(OpenRTM_aist.PortActionListenerType.ADD_PORT,
444  self.prelistenerFunc)
445  return
446 
448  rtobj = TestComp(self._orb, self._poa)
449 
450  rtobj.addExecutionContextActionListener(OpenRTM_aist.ExecutionContextActionListenerType.EC_ATTACHED,
451  self.prelistenerFunc)
452 
453  rtobj.removeExecutionContextActionListener(OpenRTM_aist.ExecutionContextActionListenerType.EC_ATTACHED,
454  self.prelistenerFunc)
455  return
456 
458  rtobj = TestComp(self._orb, self._poa)
459 
460  rtobj.addPortConnectListener(OpenRTM_aist.PortConnectListenerType.ON_NOTIFY_CONNECT,
461  self.postlistenerFunc)
462 
463  rtobj.removePortConnectListener(OpenRTM_aist.PortConnectListenerType.ON_NOTIFY_CONNECT,
464  self.postlistenerFunc)
465  return
466 
467  def portconretlistenerFunc(self, pname, cprof, ret):
468  print "portconretlistenerFunc called !!!!"
469  return
470 
472  rtobj = TestComp(self._orb, self._poa)
473 
474  rtobj.addPortConnectRetListener(OpenRTM_aist.PortConnectRetListenerType.ON_CONNECTED,
476 
477  rtobj.removePortConnectRetListener(OpenRTM_aist.PortConnectRetListenerType.ON_CONNECTED,
479  return
480 
481  def configparamlistenerFunc(self, pname, cprof, ret):
482  print "configparamlistenerFunc called !!!!"
483  return
484 
486  rtobj = TestComp(self._orb, self._poa)
487 
488  rtobj.addConfigurationParamListener(OpenRTM_aist.ConfigurationParamListenerType.ON_UPDATE_CONFIG_PARAM,
490 
491  rtobj.removeConfigurationParamListener(OpenRTM_aist.ConfigurationParamListenerType.ON_UPDATE_CONFIG_PARAM,
493  return
494 
496  rtobj = TestComp(self._orb, self._poa)
497 
498  rtobj.addConfigurationSetListener(OpenRTM_aist.ConfigurationSetListenerType.ON_SET_CONFIG_SET,
499  self.prelistenerFunc)
500 
501  rtobj.removeConfigurationSetListener(OpenRTM_aist.ConfigurationSetListenerType.ON_SET_CONFIG_SET,
502  self.prelistenerFunc)
503  return
504 
506  rtobj = TestComp(self._orb, self._poa)
507 
508  rtobj.addConfigurationSetNameListener(OpenRTM_aist.ConfigurationSetNameListenerType.ON_UPDATE_CONFIG_SET,
509  self.prelistenerFunc)
510 
511  rtobj.removeConfigurationSetNameListener(OpenRTM_aist.ConfigurationSetNameListenerType.ON_UPDATE_CONFIG_SET,
512  self.prelistenerFunc)
513  return
514 
515  def test_shutdown(self):
516  return
517 
519  rtobj = TestComp(self._orb, self._poa)
520  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
521  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
522  ec.bindComponent(rtobj)
523  print "preOnInitialize()"
524  rtobj.preOnInitialize(0)
525  return
526 
528  rtobj = TestComp(self._orb, self._poa)
529  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
530  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
531  ec.bindComponent(rtobj)
532  rtobj.preOnFinalize(0)
533  return
534 
535  def test_preOnStartup(self):
536  rtobj = TestComp(self._orb, self._poa)
537  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
538  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
539  ec.bindComponent(rtobj)
540  rtobj.preOnStartup(0)
541  return
542 
544  rtobj = TestComp(self._orb, self._poa)
545  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
546  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
547  ec.bindComponent(rtobj)
548  rtobj.preOnShutdown(0)
549  return
550 
552  rtobj = TestComp(self._orb, self._poa)
553  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
554  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
555  ec.bindComponent(rtobj)
556  rtobj.preOnActivated(0)
557  return
558 
560  rtobj = TestComp(self._orb, self._poa)
561  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
562  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
563  ec.bindComponent(rtobj)
564  rtobj.preOnDeactivated(0)
565  return
566 
568  rtobj = TestComp(self._orb, self._poa)
569  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
570  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
571  ec.bindComponent(rtobj)
572  rtobj.preOnAborting(0)
573  return
574 
575  def test_preOnError(self):
576  rtobj = TestComp(self._orb, self._poa)
577  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
578  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
579  ec.bindComponent(rtobj)
580  rtobj.preOnError(0)
581  return
582 
583  def test_preOnReset(self):
584  rtobj = TestComp(self._orb, self._poa)
585  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
586  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
587  ec.bindComponent(rtobj)
588  rtobj.preOnReset(0)
589  return
590 
591  def test_preOnExecute(self):
592  rtobj = TestComp(self._orb, self._poa)
593  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
594  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
595  ec.bindComponent(rtobj)
596  rtobj.preOnExecute(0)
597  return
598 
600  rtobj = TestComp(self._orb, self._poa)
601  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
602  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
603  ec.bindComponent(rtobj)
604  rtobj.preOnStateUpdate(0)
605  return
606 
608  rtobj = TestComp(self._orb, self._poa)
609  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
610  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
611  ec.bindComponent(rtobj)
612  rtobj.preOnRateChanged(0)
613  return
614 
616  rtobj = TestComp(self._orb, self._poa)
617  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
618  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
619  ec.bindComponent(rtobj)
620  rtobj.postOnInitialize(0,True)
621  return
622 
624  rtobj = TestComp(self._orb, self._poa)
625  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
626  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
627  ec.bindComponent(rtobj)
628  rtobj.postOnFinalize(0,True)
629  return
630 
632  rtobj = TestComp(self._orb, self._poa)
633  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
634  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
635  ec.bindComponent(rtobj)
636  rtobj.postOnStartup(0,True)
637  return
638 
640  rtobj = TestComp(self._orb, self._poa)
641  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
642  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
643  ec.bindComponent(rtobj)
644  rtobj.postOnShutdown(0,True)
645  return
646 
648  rtobj = TestComp(self._orb, self._poa)
649  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
650  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
651  ec.bindComponent(rtobj)
652  rtobj.postOnActivated(0,True)
653  return
654 
656  rtobj = TestComp(self._orb, self._poa)
657  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
658  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
659  ec.bindComponent(rtobj)
660  rtobj.postOnDeactivated(0,True)
661  return
662 
664  rtobj = TestComp(self._orb, self._poa)
665  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
666  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
667  ec.bindComponent(rtobj)
668  rtobj.postOnAborting(0,True)
669  return
670 
671  def test_postOnError(self):
672  rtobj = TestComp(self._orb, self._poa)
673  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
674  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
675  ec.bindComponent(rtobj)
676  rtobj.postOnError(0,True)
677  return
678 
679  def test_postOnReset(self):
680  rtobj = TestComp(self._orb, self._poa)
681  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
682  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
683  ec.bindComponent(rtobj)
684  rtobj.postOnReset(0,True)
685  return
686 
688  rtobj = TestComp(self._orb, self._poa)
689  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
690  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
691  ec.bindComponent(rtobj)
692  rtobj.postOnExecute(0,True)
693  return
694 
696  rtobj = TestComp(self._orb, self._poa)
697  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
698  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
699  ec.bindComponent(rtobj)
700  rtobj.postOnStateUpdate(0,True)
701  return
702 
704  rtobj = TestComp(self._orb, self._poa)
705  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
706  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
707  ec.bindComponent(rtobj)
708  rtobj.postOnRateChanged(0,True)
709  return
710 
711  def test_onAddPort(self):
712  rtobj = TestComp(self._orb, self._poa)
713  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
714  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
715  ec.bindComponent(rtobj)
716  rtobj.onAddPort(0)
717  return
718 
719  def test_onRemovePort(self):
720  rtobj = TestComp(self._orb, self._poa)
721  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
722  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
723  ec.bindComponent(rtobj)
724  rtobj.onRemovePort(0)
725  return
726 
728  rtobj = TestComp(self._orb, self._poa)
729  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
730  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
731  ec.bindComponent(rtobj)
732  rtobj.onAttachExecutionContext(0)
733  return
734 
736  rtobj = TestComp(self._orb, self._poa)
737  ec_args = "PeriodicExecutionContext"+"?" + "rate=1000"
738  ec=OpenRTM_aist.Manager.instance().createContext(ec_args)
739  ec.bindComponent(rtobj)
740  rtobj.onDetachExecutionContext(0)
741  return
742 
743 
744 ############### test #################
745 if __name__ == '__main__':
746  unittest.main()
def onExecute(self, ec_id)
def onActivated(self, ec_id)
def test_addRemovePreComponentActionListener(self)
def onAborting(self, ec_id)
def onShutdown(self, ec_id)
def TestCompInit(manager)
The Properties class represents a persistent set of properties.
Definition: Properties.py:83
def init(self, rtobj, profile)
def test_addRemoveConfigurationSetNameListener(self)
def onDeactivated(self, ec_id)
def test_addRemovePostComponentActionListener(self)
def onRateChanged(self, ec_id)
def postlistenerFunc(self, id, ret)
def __init__(self, orb_, poa_)
def test_addRemoveConfigurationSetListener(self)
InPort template class.
Definition: InPort.py:58
def onStateUpdate(self, ec_id)
def test_addRemoveConfigurationParamListener(self)
def portconretlistenerFunc(self, pname, cprof, ret)
def onStartup(self, ec_id)
def test_addRemoveExecutionContextActionListener(self)
def newNV(name, value)
Create NameVale.
Definition: NVUtil.py:50
def configparamlistenerFunc(self, pname, cprof, ret)
def onReset(self, ec_id)


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Thu Jun 6 2019 19:11:35