00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 from omniORB import any
00019 import CORBA
00020
00021 import OpenRTM_aist
00022 import RTC, RTC__POA
00023
00024 import sys
00025 sys.path.insert(1,"../")
00026
00027 import unittest
00028
00029 from CorbaPort import *
00030
00031 import _GlobalIDL, _GlobalIDL__POA
00032
00033
00034 class MyService_impl(_GlobalIDL__POA.MyService):
00035 def __init__(self):
00036 pass
00037
00038 def echo(self, msg):
00039 print msg
00040 return msg
00041
00042
00043 class TestCorbaPort(unittest.TestCase):
00044 def setUp(self):
00045 orb = CORBA.ORB_init(sys.argv)
00046 poa = orb.resolve_initial_references("RootPOA")
00047 poa._get_the_POAManager().activate()
00048
00049 self._mysvc = MyService_impl()
00050 self._mycon = OpenRTM_aist.CorbaConsumer()
00051
00052
00053 self._cpSvc = CorbaPort("MyService")
00054 self._cpCon = CorbaPort("MyService")
00055
00056 def tearDown(self):
00057 OpenRTM_aist.Manager.instance().shutdownManager()
00058 return
00059
00060 def test_init(self):
00061 prop = OpenRTM_aist.Properties()
00062 prop.setProperty("connection_limit","3")
00063 self._cpSvc.init(prop)
00064 prop = OpenRTM_aist.Properties()
00065 self._cpSvc.init(prop)
00066 pass
00067
00068
00069 def test_registerProvider(self):
00070 self.assertEqual(self._cpSvc.registerProvider("myservice0", "MyService", self._mysvc),True)
00071 self._cpSvc.activateInterfaces()
00072 self._cpSvc.deactivateInterfaces()
00073
00074
00075 def test_registerConsumer(self):
00076 self.assertEqual(self._cpCon.registerConsumer("myservice0", "MyService", self._mycon),True)
00077
00078
00079 def test_activateInterfaces(self):
00080 self.assertEqual(self._cpSvc.registerProvider("myservice0", "MyService", self._mysvc),True)
00081 self._cpSvc.activateInterfaces()
00082 self._cpSvc.deactivateInterfaces()
00083 self._cpSvc.activateInterfaces()
00084 self._cpSvc.deactivateInterfaces()
00085
00086
00087 def test_deactivateInterfaces(self):
00088 self.assertEqual(self._cpSvc.registerProvider("myservice0", "MyService", self._mysvc),True)
00089 self._cpSvc.activateInterfaces()
00090 self._cpSvc.deactivateInterfaces()
00091
00092
00093 def test_publishInterfaces(self):
00094 prof = RTC.ConnectorProfile("","",[],[])
00095 self.assertEqual(self._cpSvc.publishInterfaces(prof),RTC.RTC_OK)
00096
00097
00098 def test_subscribeInterfaces(self):
00099 self.assertEqual(self._cpSvc.registerProvider("myservice0", "MyService", self._mysvc),True)
00100 self.assertEqual(self._cpCon.registerConsumer("myservice0", "MyService", self._mycon),True)
00101 prof = RTC.ConnectorProfile("","",
00102 [self._cpSvc.getPortRef(),self._cpCon.getPortRef()],
00103 [])
00104 self.assertEqual(self._cpSvc.subscribeInterfaces(prof),RTC.RTC_OK)
00105
00106
00107 def test_unsubscribeInterfaces(self):
00108 prof = RTC.ConnectorProfile("","",[],[])
00109 self._cpSvc.unsubscribeInterfaces(prof)
00110
00111 def test_findProvider(self):
00112 self.assertEqual(self._cpSvc.registerProvider("myservice0", "MyService", self._mysvc),True)
00113 consHolder = CorbaPort.CorbaConsumerHolder("myservice0","MyService",self._mycon,self._cpCon)
00114 ior=[]
00115 self.assertEqual(self._cpSvc.findProvider([],consHolder,ior),False)
00116
00117 def test_findProviderOld(self):
00118 self.assertEqual(self._cpSvc.registerProvider("myservice0", "MyService", self._mysvc),True)
00119 consHolder = CorbaPort.CorbaConsumerHolder("myservice0","MyService",self._mycon,self._cpCon)
00120 ior=[]
00121 self.assertEqual(self._cpSvc.findProviderOld([],consHolder,ior),False)
00122
00123
00124
00125
00126 if __name__ == '__main__':
00127 unittest.main()