test_ManagerServant.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- coding: euc-jp -*-
00003 
00004 #
00005 # \file test_ManagerServant.py
00006 # \brief test for ManagerServant class
00007 # \date $Date: $
00008 # \author Shinji Kurihara
00009 #
00010 # Copyright (C) 2006
00011 #     Task-intelligence Research Group,
00012 #     Intelligent Systems Research Institute,
00013 #     National Institute of
00014 #         Advanced Industrial Science and Technology (AIST), Japan
00015 #     All rights reserved.
00016 #
00017 
00018 import sys,time
00019 sys.path.insert(1,"../RTM_IDL")
00020 sys.path.insert(1,"../")
00021 
00022 import unittest
00023 from omniORB import CORBA
00024 
00025 #from Manager import *
00026 import OpenRTM_aist
00027 import RTC, RTC__POA
00028 import RTM, RTM__POA
00029 
00030 configsample_spec = ["implementation_id", "TestComp",
00031          "type_name",         "TestComp",
00032          "description",       "Test example component",
00033          "version",           "1.0",
00034          "vendor",            "Shinji Kurihara, AIST",
00035          "category",          "example",
00036          "activity_type",     "DataFlowComponent",
00037          "max_instance",      "10",
00038          "language",          "Python",
00039          "lang_type",         "compile",
00040          # Configuration variables
00041          "conf.default.int_param0", "0",
00042          "conf.default.int_param1", "1",
00043          "conf.default.double_param0", "0.11",
00044          "conf.default.double_param1", "9.9",
00045          "conf.default.str_param0", "hoge",
00046          "conf.default.str_param1", "dara",
00047          "conf.default.vector_param0", "0.0,1.0,2.0,3.0,4.0",
00048          ""]
00049 
00050 com = None
00051 
00052 class TestComp(OpenRTM_aist.DataFlowComponentBase):
00053   def __init_(self, manager):
00054     OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
00055 
00056     
00057 def TestCompInit(manager):
00058   print "TestCompInit"
00059   global com
00060   profile = OpenRTM_aist.Properties(defaults_str=configsample_spec)
00061   manager.registerFactory(profile,
00062         TestComp,
00063         OpenRTM_aist.Delete)
00064   com = manager.createComponent("TestComp")
00065 
00066 def TestEcInit(manager):
00067   profile = OpenRTM_aist.Properties(defaults_str=configsample_spec)
00068   print manager.registerECFactory("Art",
00069           TestComp,
00070           OpenRTM_aist.Delete)
00071   manager.createComponent("TestEc")
00072 
00073 
00074 class TestManagerServant(unittest.TestCase):
00075 
00076   def setUp(self):
00077     self.managerservant = OpenRTM_aist.ManagerServant()
00078 
00079   def tearDown(self):
00080     self.managerservant.__del__()
00081     del self.managerservant
00082     #OpenRTM_aist.Manager.instance().shutdownManager()
00083     OpenRTM_aist.Manager.instance().shutdown()
00084     time.sleep(0.1)
00085     return
00086 
00087   """
00088   def test_terminate(self):
00089     self.managerservant.terminate()
00090 
00091   def test_shutdown(self):
00092     self.managerservant.runManager(True)
00093     import time
00094     time.sleep(0.1)
00095     self.managerservant.shutdown()
00096     #self.managerservant.runManager()
00097   """
00098 
00099   def test_load_unload(self):
00100     self.managerservant.load_module("hoge", "echo")
00101     self.assertNotEqual(len(self.managerservant.get_loaded_modules()), 0)
00102     self.assertNotEqual(len(self.managerservant.get_loadable_modules()), 0)
00103     self.managerservant.unload_module("hoge")
00104     return
00105 
00106   def test_get_loaded_modules(self):
00107     self.assertEqual(self.managerservant.get_loaded_modules(),[])
00108     return
00109 
00110   def test_get_loadable_modules(self):
00111     self.assertNotEqual(self.managerservant.get_loadable_modules(),[])
00112     return
00113 
00114 
00115   def test_get_factory_profiles(self):
00116     self.managerservant.get_factory_profiles()
00117     return
00118 
00119   def test_create_component(self):
00120     mgr=OpenRTM_aist.Manager.init(sys.argv)
00121     mgr.activateManager()
00122     self.managerservant.load_module("test_ManagerServant", "TestCompInit")
00123     com = self.managerservant.create_component("TestComp")
00124     self.managerservant.unload_module("test_ManagerServant")
00125     self.assertNotEqual(com,None)
00126     self.managerservant.delete_component("TestComp")
00127     mgr.shutdownComponents()
00128     return
00129 
00130 
00131   def test_get_components(self):
00132     self.assertEqual(self.managerservant.get_components(),[])
00133     return
00134 
00135 
00136   def test_get_component_profiles(self):
00137     self.assertEqual(self.managerservant.get_component_profiles(),[])
00138     return
00139 
00140 
00141   def test_get_profile(self):
00142     self.managerservant.get_profile()
00143     return
00144     
00145   def test_get_configuration(self):
00146     self.managerservant.get_configuration()
00147     return
00148 
00149   def test_set_configuration(self):
00150     self.assertEqual(self.managerservant.set_configuration("test_name", "test_value"),RTC.RTC_OK)
00151     return
00152 
00153 
00154   def test_get_service(self):
00155     self.assertEqual(self.managerservant.get_service("test"),CORBA.Object._nil)
00156     return
00157 
00158 
00159   def test_getObjRef(self):
00160     if not self.managerservant.createINSManager():
00161       poa=OpenRTM_aist.Manager.instance().getORB().resolve_initial_references("omniINSPOA")
00162       poa.deactivate_object(OpenRTM_aist.Manager.instance().getConfig().getProperty("manager.name"))
00163       self.assertEqual(self.managerservant.createINSManager(),True)
00164     else:
00165       self.assertEqual(self.managerservant.createINSManager(),False)
00166     self.assertNotEqual(self.managerservant.getObjRef(),CORBA.Object._nil)
00167     return
00168 
00169   def test_is_master(self):
00170     self.assertEqual(self.managerservant.is_master(),False)
00171     return
00172 
00173   def test_get_master_managers(self):
00174     self.assertEqual(len(self.managerservant.get_master_managers()),0)
00175     return
00176 
00177   def test_master_manager(self):
00178     if not self.managerservant.createINSManager():
00179       poa=OpenRTM_aist.Manager.instance().getORB().resolve_initial_references("omniINSPOA")
00180       poa.deactivate_object(OpenRTM_aist.Manager.instance().getConfig().getProperty("manager.name"))
00181       self.assertEqual(self.managerservant.createINSManager(),True)
00182     else:
00183       self.assertEqual(self.managerservant.createINSManager(),False)
00184     self.assertEqual(len(self.managerservant.get_master_managers()),0)
00185     host_port = "localhost:2810"
00186     owner = self.managerservant.findManager(host_port)
00187     self.assertEqual(self.managerservant.add_master_manager(owner),RTC.RTC_OK)
00188     self.assertEqual(len(self.managerservant.get_master_managers()),1)
00189     self.assertEqual(self.managerservant.remove_master_manager(owner),RTC.RTC_OK)
00190     self.assertEqual(len(self.managerservant.get_master_managers()),0)
00191     
00192     return
00193 
00194 
00195   def test_slave_managers(self):
00196     if not self.managerservant.createINSManager():
00197       poa=OpenRTM_aist.Manager.instance().getORB().resolve_initial_references("omniINSPOA")
00198       poa.deactivate_object(OpenRTM_aist.Manager.instance().getConfig().getProperty("manager.name"))
00199       self.assertEqual(self.managerservant.createINSManager(),True)
00200     else:
00201       self.assertEqual(self.managerservant.createINSManager(),False)
00202     self.assertEqual(len(self.managerservant.get_slave_managers()),0)
00203     host_port = "localhost:2810"
00204     owner = self.managerservant.findManager(host_port)
00205     self.assertEqual(self.managerservant.add_slave_manager(owner),RTC.RTC_OK)
00206     self.assertEqual(len(self.managerservant.get_slave_managers()),1)
00207     self.assertEqual(self.managerservant.remove_slave_manager(owner),RTC.RTC_OK)
00208     self.assertEqual(len(self.managerservant.get_slave_managers()),0)
00209     return
00210 
00211     
00212 """
00213   def test_forck(self):
00214     self.assertEqual(self.managerservant.fork(),RTC.RTC_OK)
00215     return
00216 
00217   def test_shutdown(self):
00218     #self.assertEqual(self.managerservant.shutdown(),RTC.RTC_OK)
00219     return
00220 
00221   def test_restart(self):
00222     self.assertEqual(self.managerservant.restart(),RTC.RTC_OK)
00223     return
00224 
00225 """
00226 
00227 
00228 
00229 ############### test #################
00230 if __name__ == '__main__':
00231         unittest.main()


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Thu Aug 27 2015 14:17:28