test_ComponentObserverConsumer.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: euc-jp -*-
3 
4 
17 
18 import sys
19 sys.path.insert(1,"../")
20 
21 import unittest
22 import time
23 
24 from omniORB import CORBA, PortableServer
25 import RTC
26 import OpenRTM, OpenRTM__POA
27 import SDOPackage
28 import OpenRTM_aist
29 
30 from ComponentObserverConsumer import *
31 
32 class ComponentObserver_i(OpenRTM__POA.ComponentObserver):
33  def __init__(self):
34  pass
35 
36  def update_status(self, status_kind, hint):
37  print("update_status: ", status_kind, ", ", hint)
38  return
39 
40 
42  def __init__(self):
43  self._orb = CORBA.ORB_init()
44  self._poa = self._orb.resolve_initial_references("RootPOA")
45  OpenRTM_aist.RTObject_impl.__init__(self, orb=self._orb, poa=self._poa)
46  pass
47 
48 
53 class TestComponentObserverConsumer(unittest.TestCase):
54  """
55  """
56 
57  def setUp(self):
58  self._orb = CORBA.ORB_init(sys.argv)
59  self._poa = self._orb.resolve_initial_references("RootPOA")
60  self._poa._get_the_POAManager().activate()
62  servant_ = ComponentObserver_i()
63  mgr_ = OpenRTM_aist.Manager.instance()
64  oid_ = mgr_.getPOA().servant_to_id(servant_)
65  self._provider = mgr_.getPOA().id_to_reference(oid_)
66  self._mock = MockRTC()
67  self._properties = [OpenRTM_aist.NVUtil.newNV("heartbeat.enable","YES"),
68  OpenRTM_aist.NVUtil.newNV("heartbeat.interval","0.1"),
69  OpenRTM_aist.NVUtil.newNV("observed_status","ALL")]
70  self._sprof = SDOPackage.ServiceProfile("test_id", "interface_type",
71  self._properties, self._provider)
72  self.coc.init(self._mock, self._sprof)
73  return
74 
75  def tearDown(self):
76  self.coc.finalize()
77  del self.coc
78  self._mock.exit()
79  time.sleep(0.1)
80  OpenRTM_aist.Manager.instance().shutdownManager()
81  return
82 
83 
84  def test_reinit(self):
85  self.assertEqual(self.coc.reinit(self._sprof), True)
86  return
87 
88 
89  def test_getProfile(self):
90  self.coc.getProfile()
91  return
92 
93 
94  def test_finalize(self):
95  self.coc.finalize()
96  return
97 
98 
99  def test_updateStatus(self):
100  self.coc.updateStatus(OpenRTM.COMPONENT_PROFILE, "update Component profile")
101  return
102 
103 
104  def test_toString(self):
105  self.assertEqual("COMPONENT_PROFILE",self.coc.toString(OpenRTM.COMPONENT_PROFILE))
106  self.assertEqual("RTC_STATUS",self.coc.toString(OpenRTM.RTC_STATUS))
107  self.assertEqual("EC_STATUS",self.coc.toString(OpenRTM.EC_STATUS))
108  self.assertEqual("PORT_PROFILE",self.coc.toString(OpenRTM.PORT_PROFILE))
109  self.assertEqual("CONFIGURATION",self.coc.toString(OpenRTM.CONFIGURATION))
110  self.assertEqual("HEARTBEAT",self.coc.toString(OpenRTM.HEARTBEAT))
111  return
112 
113 
114  def test_setListeners(self):
115  prop = OpenRTM_aist.Properties()
116  prop.setProperty("observed_status",
117  "component_profile, rtc_status, port_profile, \
118  ec_status, port_profile , configuration")
119  self.coc.setListeners(prop)
120  return
121 
122 
123  def setfunc(self):
124  print("setfunc")
125  return
126 
127  def unsetfunc(self):
128  print("unsetfunc")
129  return
130 
132  self.coc.switchListeners(True, [True], 0, self.setfunc, self. unsetfunc)
133  self.coc.switchListeners(True, [False], 0, self.setfunc, self. unsetfunc)
134  self.coc.switchListeners(False, [True], 0, self.setfunc, self. unsetfunc)
135  self.coc.switchListeners(False, [False], 0, self.setfunc, self. unsetfunc)
136  return
137 
138 
139  def test_heartbeat(self):
140  self.coc.heartbeat()
141  return
142 
143 
144  def test_setHeartbeat(self):
145  prop = OpenRTM_aist.Properties()
146  prop.setProperty("heartbeat.enable","NO")
147  prop.setProperty("heartbeat.interval","1.0")
148  self.coc.setHeartbeat(prop)
149  prop = OpenRTM_aist.Properties()
150  prop.setProperty("heartbeat.enable","YES")
151  prop.setProperty("heartbeat.interval","0.01")
152  self.coc.setHeartbeat(prop)
153  prop = OpenRTM_aist.Properties()
154  prop.setProperty("heartbeat.enable","YES")
155  self.coc.setHeartbeat(prop)
156  return
157 
158 
160  self.coc.unsetHeartbeat()
161  return
162 
163 
165  self.coc.setComponentStatusListeners()
166  return
167 
168 
170  self.coc.unsetComponentStatusListeners()
171  return
172 
173 
175  self.coc.setPortProfileListeners()
176  return
177 
178 
180  self.coc.unsetPortProfileListeners()
181  return
182 
183 
185  self.coc.setExecutionContextListeners()
186  return
187 
188 
190  self.coc.unsetExecutionContextListeners()
191  return
192 
193 
195  self.coc.setConfigurationListeners()
196  return
197 
198 
200  self.coc.unsetConfigurationListeners()
201  return
202 
203 
204 
205 if __name__ == '__main__':
206  unittest.main()
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.TestComponentObserverConsumer.test_setPortProfileListeners
def test_setPortProfileListeners(self)
Definition: test_ComponentObserverConsumer.py:174
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.TestComponentObserverConsumer.setUp
def setUp(self)
Definition: test_ComponentObserverConsumer.py:57
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.TestComponentObserverConsumer.test_toString
def test_toString(self)
Definition: test_ComponentObserverConsumer.py:104
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.TestComponentObserverConsumer.tearDown
def tearDown(self)
Definition: test_ComponentObserverConsumer.py:75
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.TestComponentObserverConsumer.test_updateStatus
def test_updateStatus(self)
Definition: test_ComponentObserverConsumer.py:99
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.MockRTC.__init__
def __init__(self)
Definition: test_ComponentObserverConsumer.py:42
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.TestComponentObserverConsumer._mock
_mock
Definition: test_ComponentObserverConsumer.py:66
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.TestComponentObserverConsumer.test_setExecutionContextListeners
def test_setExecutionContextListeners(self)
Definition: test_ComponentObserverConsumer.py:184
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.TestComponentObserverConsumer._poa
_poa
Definition: test_ComponentObserverConsumer.py:59
OpenRTM_aist.NVUtil.newNV
def newNV(name, value)
Create NameVale.
Definition: NVUtil.py:50
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.MockRTC._orb
_orb
Definition: test_ComponentObserverConsumer.py:43
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.TestComponentObserverConsumer.test_unsetHeartbeat
def test_unsetHeartbeat(self)
Definition: test_ComponentObserverConsumer.py:159
OpenRTM_aist.ext.sdo.observer.ComponentObserverConsumer.ComponentObserverConsumer
Definition: ComponentObserverConsumer.py:30
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.TestComponentObserverConsumer.test_unsetExecutionContextListeners
def test_unsetExecutionContextListeners(self)
Definition: test_ComponentObserverConsumer.py:189
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.TestComponentObserverConsumer.test_setListeners
def test_setListeners(self)
Definition: test_ComponentObserverConsumer.py:114
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.TestComponentObserverConsumer._provider
_provider
Definition: test_ComponentObserverConsumer.py:65
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.TestComponentObserverConsumer.test_setComponentStatusListeners
def test_setComponentStatusListeners(self)
Definition: test_ComponentObserverConsumer.py:164
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.TestComponentObserverConsumer._orb
_orb
Definition: test_ComponentObserverConsumer.py:58
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.MockRTC
Definition: test_ComponentObserverConsumer.py:41
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.TestComponentObserverConsumer
Definition: test_ComponentObserverConsumer.py:53
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.ComponentObserver_i
Definition: test_ComponentObserverConsumer.py:32
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.TestComponentObserverConsumer.test_heartbeat
def test_heartbeat(self)
Definition: test_ComponentObserverConsumer.py:139
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.TestComponentObserverConsumer.unsetfunc
def unsetfunc(self)
Definition: test_ComponentObserverConsumer.py:127
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.TestComponentObserverConsumer.test_finalize
def test_finalize(self)
Definition: test_ComponentObserverConsumer.py:94
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.MockRTC._poa
_poa
Definition: test_ComponentObserverConsumer.py:44
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.TestComponentObserverConsumer._sprof
_sprof
Definition: test_ComponentObserverConsumer.py:70
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.TestComponentObserverConsumer.test_switchListeners
def test_switchListeners(self)
Definition: test_ComponentObserverConsumer.py:131
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.TestComponentObserverConsumer.test_setHeartbeat
def test_setHeartbeat(self)
Definition: test_ComponentObserverConsumer.py:144
OpenRTM_aist.Properties.Properties
Definition: Properties.py:83
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.TestComponentObserverConsumer.coc
coc
Definition: test_ComponentObserverConsumer.py:61
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.TestComponentObserverConsumer.test_getProfile
def test_getProfile(self)
Definition: test_ComponentObserverConsumer.py:89
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.ComponentObserver_i.__init__
def __init__(self)
Definition: test_ComponentObserverConsumer.py:33
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.TestComponentObserverConsumer.test_unsetPortProfileListeners
def test_unsetPortProfileListeners(self)
Definition: test_ComponentObserverConsumer.py:179
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.TestComponentObserverConsumer._properties
_properties
Definition: test_ComponentObserverConsumer.py:67
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.TestComponentObserverConsumer.setfunc
def setfunc(self)
Definition: test_ComponentObserverConsumer.py:123
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.TestComponentObserverConsumer.test_setConfigurationListeners
def test_setConfigurationListeners(self)
Definition: test_ComponentObserverConsumer.py:194
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.ComponentObserver_i.update_status
def update_status(self, status_kind, hint)
Definition: test_ComponentObserverConsumer.py:36
OpenRTM_aist.NVUtil.toString
def toString(nv, name=None)
Get string value in NVList specified by name.
Definition: NVUtil.py:311
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.TestComponentObserverConsumer.test_unsetConfigurationListeners
def test_unsetConfigurationListeners(self)
Definition: test_ComponentObserverConsumer.py:199
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.TestComponentObserverConsumer.test_unsetComponentStatusListeners
def test_unsetComponentStatusListeners(self)
Definition: test_ComponentObserverConsumer.py:169
OpenRTM_aist.RTObject.RTObject_impl
Definition: RTObject.py:68
OpenRTM_aist.ext.sdo.observer.test.test_ComponentObserverConsumer.TestComponentObserverConsumer.test_reinit
def test_reinit(self)
Definition: test_ComponentObserverConsumer.py:84


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Mon Apr 21 2025 02:45:06