test_CorbaPort.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- Python -*-
00003 
00004 #
00005 #  \file  test_CorbaPort.py
00006 #  \brief test for CorbaPort class
00007 #  \date  $Date: 2007/09/27 $
00008 #  \author Shinji Kurihara
00009 # 
00010 #  Copyright (C) 2006
00011 #      Noriaki Ando
00012 #      Task-intelligence Research Group,
00013 #      Intelligent Systems Research Institute,
00014 #      National Institute of
00015 #          Advanced Industrial Science and Technology (AIST), Japan
00016 #      All rights reserved.
00017 
00018 from omniORB import any
00019 import CORBA
00020 
00021 import OpenRTM_aist
00022 import RTC, RTC__POA
00023 
00024 import sys
00025 sys.path.insert(1,"../")
00026 
00027 import unittest
00028 
00029 from CorbaPort import *
00030 
00031 import _GlobalIDL, _GlobalIDL__POA
00032 
00033 
00034 class MyService_impl(_GlobalIDL__POA.MyService):
00035   def __init__(self):
00036     pass
00037 
00038   def echo(self, msg):
00039     print msg
00040     return msg
00041 
00042   
00043 class TestCorbaPort(unittest.TestCase):
00044   def setUp(self):
00045     orb = CORBA.ORB_init(sys.argv)
00046     poa = orb.resolve_initial_references("RootPOA")
00047     poa._get_the_POAManager().activate()
00048 
00049     self._mysvc = MyService_impl()
00050     self._mycon = OpenRTM_aist.CorbaConsumer()
00051     #self._mysvc._this()
00052 
00053     self._cpSvc = CorbaPort("MyService")
00054     self._cpCon = CorbaPort("MyService")
00055 
00056   def tearDown(self):
00057     OpenRTM_aist.Manager.instance().shutdownManager()
00058     return
00059 
00060   def test_init(self):
00061     prop = OpenRTM_aist.Properties()
00062     prop.setProperty("connection_limit","3")
00063     self._cpSvc.init(prop)
00064     prop = OpenRTM_aist.Properties()
00065     self._cpSvc.init(prop)
00066     pass
00067 
00068 
00069   def test_registerProvider(self):
00070     self.assertEqual(self._cpSvc.registerProvider("myservice0", "MyService", self._mysvc),True)
00071     self._cpSvc.activateInterfaces()
00072     self._cpSvc.deactivateInterfaces()
00073 
00074 
00075   def test_registerConsumer(self):
00076     self.assertEqual(self._cpCon.registerConsumer("myservice0", "MyService", self._mycon),True)
00077 
00078 
00079   def test_activateInterfaces(self):
00080     self.assertEqual(self._cpSvc.registerProvider("myservice0", "MyService", self._mysvc),True)
00081     self._cpSvc.activateInterfaces()
00082     self._cpSvc.deactivateInterfaces()
00083     self._cpSvc.activateInterfaces()
00084     self._cpSvc.deactivateInterfaces()
00085 
00086 
00087   def test_deactivateInterfaces(self):
00088     self.assertEqual(self._cpSvc.registerProvider("myservice0", "MyService", self._mysvc),True)
00089     self._cpSvc.activateInterfaces()
00090     self._cpSvc.deactivateInterfaces()
00091 
00092 
00093   def test_publishInterfaces(self):
00094     prof = RTC.ConnectorProfile("","",[],[])
00095     self.assertEqual(self._cpSvc.publishInterfaces(prof),RTC.RTC_OK)
00096 
00097 
00098   def test_subscribeInterfaces(self):
00099     self.assertEqual(self._cpSvc.registerProvider("myservice0", "MyService", self._mysvc),True)
00100     self.assertEqual(self._cpCon.registerConsumer("myservice0", "MyService", self._mycon),True)
00101     prof = RTC.ConnectorProfile("","",
00102                                 [self._cpSvc.getPortRef(),self._cpCon.getPortRef()],
00103                                 [])
00104     self.assertEqual(self._cpSvc.subscribeInterfaces(prof),RTC.RTC_OK)
00105 
00106 
00107   def test_unsubscribeInterfaces(self):
00108     prof = RTC.ConnectorProfile("","",[],[])
00109     self._cpSvc.unsubscribeInterfaces(prof)
00110 
00111   def test_findProvider(self):
00112     self.assertEqual(self._cpSvc.registerProvider("myservice0", "MyService", self._mysvc),True)
00113     consHolder = CorbaPort.CorbaConsumerHolder("myservice0","MyService",self._mycon,self._cpCon)
00114     ior=[]
00115     self.assertEqual(self._cpSvc.findProvider([],consHolder,ior),False)
00116 
00117   def test_findProviderOld(self):
00118     self.assertEqual(self._cpSvc.registerProvider("myservice0", "MyService", self._mysvc),True)
00119     consHolder = CorbaPort.CorbaConsumerHolder("myservice0","MyService",self._mycon,self._cpCon)
00120     ior=[]
00121     self.assertEqual(self._cpSvc.findProviderOld([],consHolder,ior),False)
00122 
00123 
00124 
00125 ############### test #################
00126 if __name__ == '__main__':
00127   unittest.main()


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Thu Aug 27 2015 14:17:28