test_CorbaPort.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- Python -*-
3 
4 #
5 # \file test_CorbaPort.py
6 # \brief test for CorbaPort class
7 # \date $Date: 2007/09/27 $
8 # \author Shinji Kurihara
9 #
10 # Copyright (C) 2006
11 # Noriaki Ando
12 # Task-intelligence Research Group,
13 # Intelligent Systems Research Institute,
14 # National Institute of
15 # Advanced Industrial Science and Technology (AIST), Japan
16 # All rights reserved.
17 
18 from omniORB import any
19 import CORBA
20 
21 import OpenRTM_aist
22 import RTC, RTC__POA
23 
24 import sys
25 sys.path.insert(1,"../")
26 
27 import unittest
28 
29 from CorbaPort import *
30 
31 import _GlobalIDL, _GlobalIDL__POA
32 
33 
34 class MyService_impl(_GlobalIDL__POA.MyService):
35  def __init__(self):
36  pass
37 
38  def echo(self, msg):
39  print(msg)
40  return msg
41 
42 
43 class TestCorbaPort(unittest.TestCase):
44  def setUp(self):
45  orb = CORBA.ORB_init(sys.argv)
46  poa = orb.resolve_initial_references("RootPOA")
47  poa._get_the_POAManager().activate()
48 
51  #self._mysvc._this()
52 
53  self._cpSvc = CorbaPort("MyService")
54  self._cpCon = CorbaPort("MyService")
55 
56  def tearDown(self):
57  OpenRTM_aist.Manager.instance().shutdownManager()
58  return
59 
60  def test_init(self):
62  prop.setProperty("connection_limit","3")
63  self._cpSvc.init(prop)
65  self._cpSvc.init(prop)
66  pass
67 
68 
70  self.assertEqual(self._cpSvc.registerProvider("myservice0", "MyService", self._mysvc),True)
71  self._cpSvc.activateInterfaces()
72  self._cpSvc.deactivateInterfaces()
73 
74 
76  self.assertEqual(self._cpCon.registerConsumer("myservice0", "MyService", self._mycon),True)
77 
78 
80  self.assertEqual(self._cpSvc.registerProvider("myservice0", "MyService", self._mysvc),True)
81  self._cpSvc.activateInterfaces()
82  self._cpSvc.deactivateInterfaces()
83  self._cpSvc.activateInterfaces()
84  self._cpSvc.deactivateInterfaces()
85 
86 
88  self.assertEqual(self._cpSvc.registerProvider("myservice0", "MyService", self._mysvc),True)
89  self._cpSvc.activateInterfaces()
90  self._cpSvc.deactivateInterfaces()
91 
92 
94  prof = RTC.ConnectorProfile("","",[],[])
95  self.assertEqual(self._cpSvc.publishInterfaces(prof),RTC.RTC_OK)
96 
97 
99  self.assertEqual(self._cpSvc.registerProvider("myservice0", "MyService", self._mysvc),True)
100  self.assertEqual(self._cpCon.registerConsumer("myservice0", "MyService", self._mycon),True)
101  prof = RTC.ConnectorProfile("","",
102  [self._cpSvc.getPortRef(),self._cpCon.getPortRef()],
103  [])
104  self.assertEqual(self._cpSvc.subscribeInterfaces(prof),RTC.RTC_OK)
105 
106 
108  prof = RTC.ConnectorProfile("","",[],[])
109  self._cpSvc.unsubscribeInterfaces(prof)
110 
111  def test_findProvider(self):
112  self.assertEqual(self._cpSvc.registerProvider("myservice0", "MyService", self._mysvc),True)
113  consHolder = CorbaPort.CorbaConsumerHolder("myservice0","MyService",self._mycon,self._cpCon)
114  ior=[]
115  self.assertEqual(self._cpSvc.findProvider([],consHolder,ior),False)
116 
118  self.assertEqual(self._cpSvc.registerProvider("myservice0", "MyService", self._mysvc),True)
119  consHolder = CorbaPort.CorbaConsumerHolder("myservice0","MyService",self._mycon,self._cpCon)
120  ior=[]
121  self.assertEqual(self._cpSvc.findProviderOld([],consHolder,ior),False)
122 
123 
124 
125 
126 if __name__ == '__main__':
127  unittest.main()
test_CorbaPort.TestCorbaPort
Definition: test_CorbaPort.py:43
test_CorbaPort.MyService_impl.echo
def echo(self, msg)
Definition: test_CorbaPort.py:38
test_CorbaPort.TestCorbaPort.test_findProviderOld
def test_findProviderOld(self)
Definition: test_CorbaPort.py:117
test_CorbaPort.TestCorbaPort._mycon
_mycon
Definition: test_CorbaPort.py:50
test_CorbaPort.TestCorbaPort.test_registerConsumer
def test_registerConsumer(self)
Definition: test_CorbaPort.py:75
test_CorbaPort.TestCorbaPort.tearDown
def tearDown(self)
Definition: test_CorbaPort.py:56
test_CorbaPort.MyService_impl.__init__
def __init__(self)
Definition: test_CorbaPort.py:35
test_CorbaPort.TestCorbaPort.setUp
def setUp(self)
Definition: test_CorbaPort.py:44
test_CorbaPort.TestCorbaPort.test_publishInterfaces
def test_publishInterfaces(self)
Definition: test_CorbaPort.py:93
CorbaPort
test_CorbaPort.TestCorbaPort.test_activateInterfaces
def test_activateInterfaces(self)
Definition: test_CorbaPort.py:79
test_CorbaPort.TestCorbaPort._mysvc
_mysvc
Definition: test_CorbaPort.py:49
test_CorbaPort.TestCorbaPort.test_init
def test_init(self)
Definition: test_CorbaPort.py:60
test_CorbaPort.TestCorbaPort._cpSvc
_cpSvc
Definition: test_CorbaPort.py:53
test_CorbaPort.TestCorbaPort.test_unsubscribeInterfaces
def test_unsubscribeInterfaces(self)
Definition: test_CorbaPort.py:107
OpenRTM_aist.CorbaConsumer.CorbaConsumer
Definition: CorbaConsumer.py:187
test_CorbaPort.TestCorbaPort._cpCon
_cpCon
Definition: test_CorbaPort.py:54
test_CorbaPort.TestCorbaPort.test_findProvider
def test_findProvider(self)
Definition: test_CorbaPort.py:111
test_CorbaPort.MyService_impl
Definition: test_CorbaPort.py:34
OpenRTM_aist.Properties.Properties
Definition: Properties.py:83
test_CorbaPort.TestCorbaPort.test_registerProvider
def test_registerProvider(self)
Definition: test_CorbaPort.py:69
test_CorbaPort.TestCorbaPort.test_deactivateInterfaces
def test_deactivateInterfaces(self)
Definition: test_CorbaPort.py:87
test_CorbaPort.TestCorbaPort.test_subscribeInterfaces
def test_subscribeInterfaces(self)
Definition: test_CorbaPort.py:98


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Mon Apr 21 2025 02:45:06