test_ComponentObserverConsumer.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- coding: euc-jp -*-
00003 
00004 ##
00005 # @file test_ComponentObserverConsumer.py
00006 # @brief test for ComponentObserverConsumer
00007 # @date $Date$
00008 # @author Shinji Kurihara
00009 #
00010 # Copyright (C) 2011
00011 #     Noriaki Ando
00012 #     Intelligent Systems Research Institute,
00013 #     National Institute of
00014 #         Advanced Industrial Science and Technology (AIST), Japan
00015 #     All rights reserved.
00016 #
00017 
00018 import sys
00019 sys.path.insert(1,"../")
00020 
00021 import unittest
00022 import time
00023 
00024 from omniORB import CORBA, PortableServer
00025 import RTC
00026 import OpenRTM, OpenRTM__POA
00027 import SDOPackage
00028 import OpenRTM_aist
00029 
00030 from ComponentObserverConsumer import *
00031 
00032 class ComponentObserver_i(OpenRTM__POA.ComponentObserver):
00033   def __init__(self):
00034     pass
00035 
00036   def update_status(self, status_kind, hint):
00037     print "update_status: ", status_kind, ", ", hint
00038     return
00039 
00040 
00041 class MockRTC(OpenRTM_aist.RTObject_impl):
00042   def __init__(self):
00043     self._orb = CORBA.ORB_init()
00044     self._poa = self._orb.resolve_initial_references("RootPOA")
00045     OpenRTM_aist.RTObject_impl.__init__(self, orb=self._orb, poa=self._poa)
00046     pass
00047 
00048 ##
00049 # @if jp
00050 # @else
00051 # @endif
00052 #
00053 class TestComponentObserverConsumer(unittest.TestCase):
00054   """
00055   """
00056 
00057   def setUp(self):
00058     self._orb = CORBA.ORB_init(sys.argv)
00059     self._poa = self._orb.resolve_initial_references("RootPOA")
00060     self._poa._get_the_POAManager().activate()
00061     self.coc = ComponentObserverConsumer()
00062     servant_ = ComponentObserver_i()
00063     mgr_ = OpenRTM_aist.Manager.instance()
00064     oid_ = mgr_.getPOA().servant_to_id(servant_)
00065     self._provider = mgr_.getPOA().id_to_reference(oid_)
00066     self._mock = MockRTC()
00067     self._properties = [OpenRTM_aist.NVUtil.newNV("heartbeat.enable","YES"),
00068                         OpenRTM_aist.NVUtil.newNV("heartbeat.interval","0.1"),
00069                         OpenRTM_aist.NVUtil.newNV("observed_status","ALL")]
00070     self._sprof = SDOPackage.ServiceProfile("test_id", "interface_type",
00071                                             self._properties, self._provider)
00072     self.coc.init(self._mock, self._sprof)
00073     return
00074 
00075   def tearDown(self):
00076     self.coc.finalize()
00077     del self.coc
00078     self._mock.exit()
00079     time.sleep(0.1)
00080     OpenRTM_aist.Manager.instance().shutdownManager()
00081     return
00082 
00083 
00084   def test_reinit(self):
00085     self.assertEqual(self.coc.reinit(self._sprof), True)
00086     return
00087 
00088 
00089   def test_getProfile(self):
00090     self.coc.getProfile()
00091     return
00092 
00093 
00094   def test_finalize(self):
00095     self.coc.finalize()
00096     return
00097 
00098 
00099   def test_updateStatus(self):
00100     self.coc.updateStatus(OpenRTM.COMPONENT_PROFILE, "update Component profile")
00101     return
00102 
00103 
00104   def test_toString(self):
00105     self.assertEqual("COMPONENT_PROFILE",self.coc.toString(OpenRTM.COMPONENT_PROFILE))
00106     self.assertEqual("RTC_STATUS",self.coc.toString(OpenRTM.RTC_STATUS))
00107     self.assertEqual("EC_STATUS",self.coc.toString(OpenRTM.EC_STATUS))
00108     self.assertEqual("PORT_PROFILE",self.coc.toString(OpenRTM.PORT_PROFILE))
00109     self.assertEqual("CONFIGURATION",self.coc.toString(OpenRTM.CONFIGURATION))
00110     self.assertEqual("HEARTBEAT",self.coc.toString(OpenRTM.HEARTBEAT))
00111     return
00112 
00113 
00114   def test_setListeners(self):
00115     prop = OpenRTM_aist.Properties()
00116     prop.setProperty("observed_status", 
00117                      "component_profile, rtc_status, port_profile, \
00118                       ec_status, port_profile , configuration")
00119     self.coc.setListeners(prop)
00120     return
00121 
00122 
00123   def setfunc(self):
00124     print "setfunc"
00125     return
00126 
00127   def unsetfunc(self):
00128     print "unsetfunc"
00129     return
00130 
00131   def test_switchListeners(self):
00132     self.coc.switchListeners(True, [True], 0, self.setfunc, self. unsetfunc)
00133     self.coc.switchListeners(True, [False], 0, self.setfunc, self. unsetfunc)
00134     self.coc.switchListeners(False, [True], 0, self.setfunc, self. unsetfunc)
00135     self.coc.switchListeners(False, [False], 0, self.setfunc, self. unsetfunc)
00136     return
00137 
00138 
00139   def test_heartbeat(self):
00140     self.coc.heartbeat()
00141     return
00142 
00143 
00144   def test_setHeartbeat(self):
00145     prop = OpenRTM_aist.Properties()
00146     prop.setProperty("heartbeat.enable","NO")
00147     prop.setProperty("heartbeat.interval","1.0")
00148     self.coc.setHeartbeat(prop)
00149     prop = OpenRTM_aist.Properties()
00150     prop.setProperty("heartbeat.enable","YES")
00151     prop.setProperty("heartbeat.interval","0.01")
00152     self.coc.setHeartbeat(prop)
00153     prop = OpenRTM_aist.Properties()
00154     prop.setProperty("heartbeat.enable","YES")
00155     self.coc.setHeartbeat(prop)
00156     return
00157 
00158 
00159   def test_unsetHeartbeat(self):
00160     self.coc.unsetHeartbeat()
00161     return
00162 
00163 
00164   def test_setComponentStatusListeners(self):
00165     self.coc.setComponentStatusListeners()
00166     return
00167 
00168   
00169   def test_unsetComponentStatusListeners(self):
00170     self.coc.unsetComponentStatusListeners()
00171     return
00172 
00173 
00174   def test_setPortProfileListeners(self):
00175     self.coc.setPortProfileListeners()
00176     return
00177 
00178 
00179   def test_unsetPortProfileListeners(self):
00180     self.coc.unsetPortProfileListeners()
00181     return
00182 
00183 
00184   def test_setExecutionContextListeners(self):
00185     self.coc.setExecutionContextListeners()
00186     return
00187 
00188 
00189   def test_unsetExecutionContextListeners(self):
00190     self.coc.unsetExecutionContextListeners()
00191     return
00192 
00193 
00194   def test_setConfigurationListeners(self):
00195     self.coc.setConfigurationListeners()
00196     return
00197 
00198 
00199   def test_unsetConfigurationListeners(self):
00200     self.coc.unsetConfigurationListeners()
00201     return
00202 
00203 
00204 ############### test #################
00205 if __name__ == '__main__':
00206         unittest.main()


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Thu Aug 27 2015 14:17:28