00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 import sys
00019 sys.path.insert(1,"../")
00020
00021 import unittest
00022 import time
00023
00024 from omniORB import CORBA, PortableServer
00025 import RTC
00026 import OpenRTM, OpenRTM__POA
00027 import SDOPackage
00028 import OpenRTM_aist
00029
00030 from ComponentObserverConsumer import *
00031
00032 class ComponentObserver_i(OpenRTM__POA.ComponentObserver):
00033 def __init__(self):
00034 pass
00035
00036 def update_status(self, status_kind, hint):
00037 print "update_status: ", status_kind, ", ", hint
00038 return
00039
00040
00041 class MockRTC(OpenRTM_aist.RTObject_impl):
00042 def __init__(self):
00043 self._orb = CORBA.ORB_init()
00044 self._poa = self._orb.resolve_initial_references("RootPOA")
00045 OpenRTM_aist.RTObject_impl.__init__(self, orb=self._orb, poa=self._poa)
00046 pass
00047
00048
00049
00050
00051
00052
00053 class TestComponentObserverConsumer(unittest.TestCase):
00054 """
00055 """
00056
00057 def setUp(self):
00058 self._orb = CORBA.ORB_init(sys.argv)
00059 self._poa = self._orb.resolve_initial_references("RootPOA")
00060 self._poa._get_the_POAManager().activate()
00061 self.coc = ComponentObserverConsumer()
00062 servant_ = ComponentObserver_i()
00063 mgr_ = OpenRTM_aist.Manager.instance()
00064 oid_ = mgr_.getPOA().servant_to_id(servant_)
00065 self._provider = mgr_.getPOA().id_to_reference(oid_)
00066 self._mock = MockRTC()
00067 self._properties = [OpenRTM_aist.NVUtil.newNV("heartbeat.enable","YES"),
00068 OpenRTM_aist.NVUtil.newNV("heartbeat.interval","0.1"),
00069 OpenRTM_aist.NVUtil.newNV("observed_status","ALL")]
00070 self._sprof = SDOPackage.ServiceProfile("test_id", "interface_type",
00071 self._properties, self._provider)
00072 self.coc.init(self._mock, self._sprof)
00073 return
00074
00075 def tearDown(self):
00076 self.coc.finalize()
00077 del self.coc
00078 self._mock.exit()
00079 time.sleep(0.1)
00080 OpenRTM_aist.Manager.instance().shutdownManager()
00081 return
00082
00083
00084 def test_reinit(self):
00085 self.assertEqual(self.coc.reinit(self._sprof), True)
00086 return
00087
00088
00089 def test_getProfile(self):
00090 self.coc.getProfile()
00091 return
00092
00093
00094 def test_finalize(self):
00095 self.coc.finalize()
00096 return
00097
00098
00099 def test_updateStatus(self):
00100 self.coc.updateStatus(OpenRTM.COMPONENT_PROFILE, "update Component profile")
00101 return
00102
00103
00104 def test_toString(self):
00105 self.assertEqual("COMPONENT_PROFILE",self.coc.toString(OpenRTM.COMPONENT_PROFILE))
00106 self.assertEqual("RTC_STATUS",self.coc.toString(OpenRTM.RTC_STATUS))
00107 self.assertEqual("EC_STATUS",self.coc.toString(OpenRTM.EC_STATUS))
00108 self.assertEqual("PORT_PROFILE",self.coc.toString(OpenRTM.PORT_PROFILE))
00109 self.assertEqual("CONFIGURATION",self.coc.toString(OpenRTM.CONFIGURATION))
00110 self.assertEqual("HEARTBEAT",self.coc.toString(OpenRTM.HEARTBEAT))
00111 return
00112
00113
00114 def test_setListeners(self):
00115 prop = OpenRTM_aist.Properties()
00116 prop.setProperty("observed_status",
00117 "component_profile, rtc_status, port_profile, \
00118 ec_status, port_profile , configuration")
00119 self.coc.setListeners(prop)
00120 return
00121
00122
00123 def setfunc(self):
00124 print "setfunc"
00125 return
00126
00127 def unsetfunc(self):
00128 print "unsetfunc"
00129 return
00130
00131 def test_switchListeners(self):
00132 self.coc.switchListeners(True, [True], 0, self.setfunc, self. unsetfunc)
00133 self.coc.switchListeners(True, [False], 0, self.setfunc, self. unsetfunc)
00134 self.coc.switchListeners(False, [True], 0, self.setfunc, self. unsetfunc)
00135 self.coc.switchListeners(False, [False], 0, self.setfunc, self. unsetfunc)
00136 return
00137
00138
00139 def test_heartbeat(self):
00140 self.coc.heartbeat()
00141 return
00142
00143
00144 def test_setHeartbeat(self):
00145 prop = OpenRTM_aist.Properties()
00146 prop.setProperty("heartbeat.enable","NO")
00147 prop.setProperty("heartbeat.interval","1.0")
00148 self.coc.setHeartbeat(prop)
00149 prop = OpenRTM_aist.Properties()
00150 prop.setProperty("heartbeat.enable","YES")
00151 prop.setProperty("heartbeat.interval","0.01")
00152 self.coc.setHeartbeat(prop)
00153 prop = OpenRTM_aist.Properties()
00154 prop.setProperty("heartbeat.enable","YES")
00155 self.coc.setHeartbeat(prop)
00156 return
00157
00158
00159 def test_unsetHeartbeat(self):
00160 self.coc.unsetHeartbeat()
00161 return
00162
00163
00164 def test_setComponentStatusListeners(self):
00165 self.coc.setComponentStatusListeners()
00166 return
00167
00168
00169 def test_unsetComponentStatusListeners(self):
00170 self.coc.unsetComponentStatusListeners()
00171 return
00172
00173
00174 def test_setPortProfileListeners(self):
00175 self.coc.setPortProfileListeners()
00176 return
00177
00178
00179 def test_unsetPortProfileListeners(self):
00180 self.coc.unsetPortProfileListeners()
00181 return
00182
00183
00184 def test_setExecutionContextListeners(self):
00185 self.coc.setExecutionContextListeners()
00186 return
00187
00188
00189 def test_unsetExecutionContextListeners(self):
00190 self.coc.unsetExecutionContextListeners()
00191 return
00192
00193
00194 def test_setConfigurationListeners(self):
00195 self.coc.setConfigurationListeners()
00196 return
00197
00198
00199 def test_unsetConfigurationListeners(self):
00200 self.coc.unsetConfigurationListeners()
00201 return
00202
00203
00204
00205 if __name__ == '__main__':
00206 unittest.main()