test_ManagerServant.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: euc-jp -*-
3 
4 #
5 # \file test_ManagerServant.py
6 # \brief test for ManagerServant class
7 # \date $Date: $
8 # \author Shinji Kurihara
9 #
10 # Copyright (C) 2006
11 # Task-intelligence Research Group,
12 # Intelligent Systems Research Institute,
13 # National Institute of
14 # Advanced Industrial Science and Technology (AIST), Japan
15 # All rights reserved.
16 #
17 
18 import sys,time
19 sys.path.insert(1,"../RTM_IDL")
20 sys.path.insert(1,"../")
21 
22 import unittest
23 from omniORB import CORBA
24 
25 #from Manager import *
26 import OpenRTM_aist
27 import RTC, RTC__POA
28 import RTM, RTM__POA
29 
30 configsample_spec = ["implementation_id", "TestComp",
31  "type_name", "TestComp",
32  "description", "Test example component",
33  "version", "1.0",
34  "vendor", "Shinji Kurihara, AIST",
35  "category", "example",
36  "activity_type", "DataFlowComponent",
37  "max_instance", "10",
38  "language", "Python",
39  "lang_type", "compile",
40  # Configuration variables
41  "conf.default.int_param0", "0",
42  "conf.default.int_param1", "1",
43  "conf.default.double_param0", "0.11",
44  "conf.default.double_param1", "9.9",
45  "conf.default.str_param0", "hoge",
46  "conf.default.str_param1", "dara",
47  "conf.default.vector_param0", "0.0,1.0,2.0,3.0,4.0",
48  ""]
49 
50 com = None
51 
53  def __init_(self, manager):
54  OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
55 
56 
57 def TestCompInit(manager):
58  print "TestCompInit"
59  global com
60  profile = OpenRTM_aist.Properties(defaults_str=configsample_spec)
61  manager.registerFactory(profile,
62  TestComp,
63  OpenRTM_aist.Delete)
64  com = manager.createComponent("TestComp")
65 
66 def TestEcInit(manager):
67  profile = OpenRTM_aist.Properties(defaults_str=configsample_spec)
68  print manager.registerECFactory("Art",
69  TestComp,
70  OpenRTM_aist.Delete)
71  manager.createComponent("TestEc")
72 
73 
74 class TestManagerServant(unittest.TestCase):
75 
76  def setUp(self):
78 
79  def tearDown(self):
80  self.managerservant.__del__()
81  del self.managerservant
82  #OpenRTM_aist.Manager.instance().shutdownManager()
83  OpenRTM_aist.Manager.instance().shutdown()
84  time.sleep(0.1)
85  return
86 
87  """
88  def test_terminate(self):
89  self.managerservant.terminate()
90 
91  def test_shutdown(self):
92  self.managerservant.runManager(True)
93  import time
94  time.sleep(0.1)
95  self.managerservant.shutdown()
96  #self.managerservant.runManager()
97  """
98 
99  def test_load_unload(self):
100  self.managerservant.load_module("hoge", "echo")
101  self.assertNotEqual(len(self.managerservant.get_loaded_modules()), 0)
102  self.assertNotEqual(len(self.managerservant.get_loadable_modules()), 0)
103  self.managerservant.unload_module("hoge")
104  return
105 
107  self.assertEqual(self.managerservant.get_loaded_modules(),[])
108  return
109 
111  self.assertNotEqual(self.managerservant.get_loadable_modules(),[])
112  return
113 
114 
116  self.managerservant.get_factory_profiles()
117  return
118 
120  mgr=OpenRTM_aist.Manager.init(sys.argv)
121  mgr.activateManager()
122  self.managerservant.load_module("test_ManagerServant", "TestCompInit")
123  com = self.managerservant.create_component("TestComp")
124  self.managerservant.unload_module("test_ManagerServant")
125  self.assertNotEqual(com,None)
126  self.managerservant.delete_component("TestComp")
127  mgr.shutdownComponents()
128  return
129 
130 
132  self.assertEqual(self.managerservant.get_components(),[])
133  return
134 
135 
137  self.assertEqual(self.managerservant.get_component_profiles(),[])
138  return
139 
140 
141  def test_get_profile(self):
142  self.managerservant.get_profile()
143  return
144 
146  self.managerservant.get_configuration()
147  return
148 
150  self.assertEqual(self.managerservant.set_configuration("test_name", "test_value"),RTC.RTC_OK)
151  return
152 
153 
154  def test_get_service(self):
155  self.assertEqual(self.managerservant.get_service("test"),CORBA.Object._nil)
156  return
157 
158 
159  def test_getObjRef(self):
160  if not self.managerservant.createINSManager():
161  poa=OpenRTM_aist.Manager.instance().getORB().resolve_initial_references("omniINSPOA")
162  poa.deactivate_object(OpenRTM_aist.Manager.instance().getConfig().getProperty("manager.name"))
163  self.assertEqual(self.managerservant.createINSManager(),True)
164  else:
165  self.assertEqual(self.managerservant.createINSManager(),False)
166  self.assertNotEqual(self.managerservant.getObjRef(),CORBA.Object._nil)
167  return
168 
169  def test_is_master(self):
170  self.assertEqual(self.managerservant.is_master(),False)
171  return
172 
174  self.assertEqual(len(self.managerservant.get_master_managers()),0)
175  return
176 
178  if not self.managerservant.createINSManager():
179  poa=OpenRTM_aist.Manager.instance().getORB().resolve_initial_references("omniINSPOA")
180  poa.deactivate_object(OpenRTM_aist.Manager.instance().getConfig().getProperty("manager.name"))
181  self.assertEqual(self.managerservant.createINSManager(),True)
182  else:
183  self.assertEqual(self.managerservant.createINSManager(),False)
184  self.assertEqual(len(self.managerservant.get_master_managers()),0)
185  host_port = "localhost:2810"
186  owner = self.managerservant.findManager(host_port)
187  self.assertEqual(self.managerservant.add_master_manager(owner),RTC.RTC_OK)
188  self.assertEqual(len(self.managerservant.get_master_managers()),1)
189  self.assertEqual(self.managerservant.remove_master_manager(owner),RTC.RTC_OK)
190  self.assertEqual(len(self.managerservant.get_master_managers()),0)
191 
192  return
193 
194 
196  if not self.managerservant.createINSManager():
197  poa=OpenRTM_aist.Manager.instance().getORB().resolve_initial_references("omniINSPOA")
198  poa.deactivate_object(OpenRTM_aist.Manager.instance().getConfig().getProperty("manager.name"))
199  self.assertEqual(self.managerservant.createINSManager(),True)
200  else:
201  self.assertEqual(self.managerservant.createINSManager(),False)
202  self.assertEqual(len(self.managerservant.get_slave_managers()),0)
203  host_port = "localhost:2810"
204  owner = self.managerservant.findManager(host_port)
205  self.assertEqual(self.managerservant.add_slave_manager(owner),RTC.RTC_OK)
206  self.assertEqual(len(self.managerservant.get_slave_managers()),1)
207  self.assertEqual(self.managerservant.remove_slave_manager(owner),RTC.RTC_OK)
208  self.assertEqual(len(self.managerservant.get_slave_managers()),0)
209  return
210 
211 
212 """
213  def test_forck(self):
214  self.assertEqual(self.managerservant.fork(),RTC.RTC_OK)
215  return
216 
217  def test_shutdown(self):
218  #self.assertEqual(self.managerservant.shutdown(),RTC.RTC_OK)
219  return
220 
221  def test_restart(self):
222  self.assertEqual(self.managerservant.restart(),RTC.RTC_OK)
223  return
224 
225 """
226 
227 
228 
229 
230 if __name__ == '__main__':
231  unittest.main()
The Properties class represents a persistent set of properties.
Definition: Properties.py:83


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Mon Feb 28 2022 23:01:07