test_ComponentObserverConsumer.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: euc-jp -*-
3 
4 
17 
18 import sys
19 sys.path.insert(1,"../")
20 
21 import unittest
22 import time
23 
24 from omniORB import CORBA, PortableServer
25 import RTC
26 import OpenRTM, OpenRTM__POA
27 import SDOPackage
28 import OpenRTM_aist
29 
30 from ComponentObserverConsumer import *
31 
32 class ComponentObserver_i(OpenRTM__POA.ComponentObserver):
33  def __init__(self):
34  pass
35 
36  def update_status(self, status_kind, hint):
37  print "update_status: ", status_kind, ", ", hint
38  return
39 
40 
42  def __init__(self):
43  self._orb = CORBA.ORB_init()
44  self._poa = self._orb.resolve_initial_references("RootPOA")
45  OpenRTM_aist.RTObject_impl.__init__(self, orb=self._orb, poa=self._poa)
46  pass
47 
48 
53 class TestComponentObserverConsumer(unittest.TestCase):
54  """
55  """
56 
57  def setUp(self):
58  self._orb = CORBA.ORB_init(sys.argv)
59  self._poa = self._orb.resolve_initial_references("RootPOA")
60  self._poa._get_the_POAManager().activate()
62  servant_ = ComponentObserver_i()
63  mgr_ = OpenRTM_aist.Manager.instance()
64  oid_ = mgr_.getPOA().servant_to_id(servant_)
65  self._provider = mgr_.getPOA().id_to_reference(oid_)
66  self._mock = MockRTC()
67  self._properties = [OpenRTM_aist.NVUtil.newNV("heartbeat.enable","YES"),
68  OpenRTM_aist.NVUtil.newNV("heartbeat.interval","0.1"),
69  OpenRTM_aist.NVUtil.newNV("observed_status","ALL")]
70  self._sprof = SDOPackage.ServiceProfile("test_id", "interface_type",
71  self._properties, self._provider)
72  self.coc.init(self._mock, self._sprof)
73  return
74 
75  def tearDown(self):
76  self.coc.finalize()
77  del self.coc
78  self._mock.exit()
79  time.sleep(0.1)
80  OpenRTM_aist.Manager.instance().shutdownManager()
81  return
82 
83 
84  def test_reinit(self):
85  self.assertEqual(self.coc.reinit(self._sprof), True)
86  return
87 
88 
89  def test_getProfile(self):
90  self.coc.getProfile()
91  return
92 
93 
94  def test_finalize(self):
95  self.coc.finalize()
96  return
97 
98 
99  def test_updateStatus(self):
100  self.coc.updateStatus(OpenRTM.COMPONENT_PROFILE, "update Component profile")
101  return
102 
103 
104  def test_toString(self):
105  self.assertEqual("COMPONENT_PROFILE",self.coc.toString(OpenRTM.COMPONENT_PROFILE))
106  self.assertEqual("RTC_STATUS",self.coc.toString(OpenRTM.RTC_STATUS))
107  self.assertEqual("EC_STATUS",self.coc.toString(OpenRTM.EC_STATUS))
108  self.assertEqual("PORT_PROFILE",self.coc.toString(OpenRTM.PORT_PROFILE))
109  self.assertEqual("CONFIGURATION",self.coc.toString(OpenRTM.CONFIGURATION))
110  self.assertEqual("HEARTBEAT",self.coc.toString(OpenRTM.HEARTBEAT))
111  return
112 
113 
114  def test_setListeners(self):
115  prop = OpenRTM_aist.Properties()
116  prop.setProperty("observed_status",
117  "component_profile, rtc_status, port_profile, \
118  ec_status, port_profile , configuration")
119  self.coc.setListeners(prop)
120  return
121 
122 
123  def setfunc(self):
124  print "setfunc"
125  return
126 
127  def unsetfunc(self):
128  print "unsetfunc"
129  return
130 
132  self.coc.switchListeners(True, [True], 0, self.setfunc, self. unsetfunc)
133  self.coc.switchListeners(True, [False], 0, self.setfunc, self. unsetfunc)
134  self.coc.switchListeners(False, [True], 0, self.setfunc, self. unsetfunc)
135  self.coc.switchListeners(False, [False], 0, self.setfunc, self. unsetfunc)
136  return
137 
138 
139  def test_heartbeat(self):
140  self.coc.heartbeat()
141  return
142 
143 
144  def test_setHeartbeat(self):
145  prop = OpenRTM_aist.Properties()
146  prop.setProperty("heartbeat.enable","NO")
147  prop.setProperty("heartbeat.interval","1.0")
148  self.coc.setHeartbeat(prop)
149  prop = OpenRTM_aist.Properties()
150  prop.setProperty("heartbeat.enable","YES")
151  prop.setProperty("heartbeat.interval","0.01")
152  self.coc.setHeartbeat(prop)
153  prop = OpenRTM_aist.Properties()
154  prop.setProperty("heartbeat.enable","YES")
155  self.coc.setHeartbeat(prop)
156  return
157 
158 
160  self.coc.unsetHeartbeat()
161  return
162 
163 
165  self.coc.setComponentStatusListeners()
166  return
167 
168 
170  self.coc.unsetComponentStatusListeners()
171  return
172 
173 
175  self.coc.setPortProfileListeners()
176  return
177 
178 
180  self.coc.unsetPortProfileListeners()
181  return
182 
183 
185  self.coc.setExecutionContextListeners()
186  return
187 
188 
190  self.coc.unsetExecutionContextListeners()
191  return
192 
193 
195  self.coc.setConfigurationListeners()
196  return
197 
198 
200  self.coc.unsetConfigurationListeners()
201  return
202 
203 
204 
205 if __name__ == '__main__':
206  unittest.main()
The Properties class represents a persistent set of properties.
Definition: Properties.py:83
def toString(nv, name=None)
Get string value in NVList specified by name.
Definition: NVUtil.py:311
def newNV(name, value)
Create NameVale.
Definition: NVUtil.py:50


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Mon Feb 28 2022 23:01:06