test_CorbaPort.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- Python -*-
3 
4 #
5 # \file test_CorbaPort.py
6 # \brief test for CorbaPort class
7 # \date $Date: 2007/09/27 $
8 # \author Shinji Kurihara
9 #
10 # Copyright (C) 2006
11 # Noriaki Ando
12 # Task-intelligence Research Group,
13 # Intelligent Systems Research Institute,
14 # National Institute of
15 # Advanced Industrial Science and Technology (AIST), Japan
16 # All rights reserved.
17 
18 from omniORB import any
19 import CORBA
20 
21 import OpenRTM_aist
22 import RTC, RTC__POA
23 
24 import sys
25 sys.path.insert(1,"../")
26 
27 import unittest
28 
29 from CorbaPort import *
30 
31 import _GlobalIDL, _GlobalIDL__POA
32 
33 
34 class MyService_impl(_GlobalIDL__POA.MyService):
35  def __init__(self):
36  pass
37 
38  def echo(self, msg):
39  print msg
40  return msg
41 
42 
43 class TestCorbaPort(unittest.TestCase):
44  def setUp(self):
45  orb = CORBA.ORB_init(sys.argv)
46  poa = orb.resolve_initial_references("RootPOA")
47  poa._get_the_POAManager().activate()
48 
51  #self._mysvc._this()
52 
53  self._cpSvc = CorbaPort("MyService")
54  self._cpCon = CorbaPort("MyService")
55 
56  def tearDown(self):
57  OpenRTM_aist.Manager.instance().shutdownManager()
58  return
59 
60  def test_init(self):
62  prop.setProperty("connection_limit","3")
63  self._cpSvc.init(prop)
65  self._cpSvc.init(prop)
66  pass
67 
68 
70  self.assertEqual(self._cpSvc.registerProvider("myservice0", "MyService", self._mysvc),True)
71  self._cpSvc.activateInterfaces()
72  self._cpSvc.deactivateInterfaces()
73 
74 
76  self.assertEqual(self._cpCon.registerConsumer("myservice0", "MyService", self._mycon),True)
77 
78 
80  self.assertEqual(self._cpSvc.registerProvider("myservice0", "MyService", self._mysvc),True)
81  self._cpSvc.activateInterfaces()
82  self._cpSvc.deactivateInterfaces()
83  self._cpSvc.activateInterfaces()
84  self._cpSvc.deactivateInterfaces()
85 
86 
88  self.assertEqual(self._cpSvc.registerProvider("myservice0", "MyService", self._mysvc),True)
89  self._cpSvc.activateInterfaces()
90  self._cpSvc.deactivateInterfaces()
91 
92 
94  prof = RTC.ConnectorProfile("","",[],[])
95  self.assertEqual(self._cpSvc.publishInterfaces(prof),RTC.RTC_OK)
96 
97 
99  self.assertEqual(self._cpSvc.registerProvider("myservice0", "MyService", self._mysvc),True)
100  self.assertEqual(self._cpCon.registerConsumer("myservice0", "MyService", self._mycon),True)
101  prof = RTC.ConnectorProfile("","",
102  [self._cpSvc.getPortRef(),self._cpCon.getPortRef()],
103  [])
104  self.assertEqual(self._cpSvc.subscribeInterfaces(prof),RTC.RTC_OK)
105 
106 
108  prof = RTC.ConnectorProfile("","",[],[])
109  self._cpSvc.unsubscribeInterfaces(prof)
110 
111  def test_findProvider(self):
112  self.assertEqual(self._cpSvc.registerProvider("myservice0", "MyService", self._mysvc),True)
113  consHolder = CorbaPort.CorbaConsumerHolder("myservice0","MyService",self._mycon,self._cpCon)
114  ior=[]
115  self.assertEqual(self._cpSvc.findProvider([],consHolder,ior),False)
116 
118  self.assertEqual(self._cpSvc.registerProvider("myservice0", "MyService", self._mysvc),True)
119  consHolder = CorbaPort.CorbaConsumerHolder("myservice0","MyService",self._mycon,self._cpCon)
120  ior=[]
121  self.assertEqual(self._cpSvc.findProviderOld([],consHolder,ior),False)
122 
123 
124 
125 ############### test #################
126 if __name__ == '__main__':
127  unittest.main()
The Properties class represents a persistent set of properties.
Definition: Properties.py:83


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Thu Jun 6 2019 19:11:34