Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 import sys
00019 sys.path.insert(1,"../")
00020
00021 import unittest
00022 import OpenRTM_aist
00023 from PortAdmin import *
00024
00025 import RTC, RTC__POA
00026 import OpenRTM
00027
00028 from omniORB import CORBA
00029
00030 class PortBase(OpenRTM_aist.PortBase):
00031 def __init__(self):
00032 OpenRTM_aist.PortBase.__init__(self)
00033
00034 def publishInterfaces(self, prof):
00035 return RTC.RTC_OK
00036
00037 def subscribeInterfaces(self, prof):
00038 return RTC.RTC_OK
00039
00040 def unsubscribeInterfaces(self, prof):
00041 return RTC.RTC_OK
00042
00043 def activateInterfaces(self):
00044 print "activateInterfaces"
00045
00046
00047 def deactivateInterfaces(self):
00048 print "deactivateInterfaces"
00049
00050
00051 class TestPortAdmin(unittest.TestCase):
00052 def setUp(self):
00053 self._orb = CORBA.ORB_init(sys.argv)
00054 self._poa = self._orb.resolve_initial_references("RootPOA")
00055 self._poa._get_the_POAManager().activate()
00056
00057 self._pa = PortAdmin(self._orb, self._poa)
00058
00059 self._pb1 = PortBase()
00060 self._pb2 = PortBase()
00061
00062 self._pb1.setName("port0")
00063 self._pb2.setName("port1")
00064
00065 self._pa.registerPort(self._pb1)
00066 self._pa.registerPort(self._pb2)
00067 return
00068
00069 def tearDown(self):
00070 OpenRTM_aist.Manager.instance().shutdownManager()
00071
00072
00073 def test_getPortServiceList(self):
00074 plist = self._pa.getPortServiceList()
00075
00076 prof0 = plist[0].get_port_profile()
00077 self.assertEqual(prof0.name, "port0")
00078
00079 prof1 = plist[1].get_port_profile()
00080 self.assertEqual(prof1.name, "port1")
00081 return
00082
00083
00084 def test_getPortProfileList(self):
00085 pprof = self._pa.getPortProfileList()
00086 return
00087
00088
00089 def CCCtest_getPortRef(self):
00090
00091 getP = self._pa.getPortRef("")
00092 self.assertEqual(CORBA.is_nil(getP), True)
00093
00094 getP = self._pa.getPortRef("port1")
00095 self.assertEqual(CORBA.is_nil(getP), False)
00096 self.assertEqual(getP.get_port_profile().name, "port1")
00097
00098 getP = self._pa.getPortRef("port0")
00099 self.assertEqual(CORBA.is_nil(getP), False)
00100 self.assertEqual(getP.get_port_profile().name, "port0")
00101 return
00102
00103
00104 def CCCtest_getPort(self):
00105 pb = self._pa.getPort("port0")
00106 prof = pb.get_port_profile()
00107 self.assertEqual(prof.name, "port0")
00108
00109 pb = self._pa.getPort("port1")
00110 prof = pb.get_port_profile()
00111 self.assertEqual(prof.name, "port1")
00112 return
00113
00114
00115 def CCCtest_deletePort(self):
00116 self._pa.deletePort(self._pb1)
00117 plist = self._pa.getPortServiceList()
00118 self.assertEqual(len(plist), 1)
00119
00120 prof = plist[0].get_port_profile()
00121 self.assertEqual(prof.name, "port1")
00122 return
00123
00124
00125 def CCCtest_deletePortByName(self):
00126 plist = self._pa.getPortServiceList()
00127 self.assertEqual(len(plist), 2)
00128
00129 self._pa.deletePortByName("port1")
00130
00131 plist = self._pa.getPortServiceList()
00132 self.assertEqual(len(plist), 1)
00133 return
00134
00135
00136 def CCCtest_activatePorts(self):
00137 self._pa.activatePorts()
00138 self._pa.deactivatePorts()
00139 return
00140
00141
00142 def CCCtest_finalizePorts(self):
00143 plist = self._pa.getPortServiceList()
00144 self.assertEqual(len(plist), 2)
00145
00146 self._pa.finalizePorts()
00147
00148 plist = self._pa.getPortServiceList()
00149 self.assertEqual(len(plist), 0)
00150 return
00151
00152
00153
00154
00155 if __name__ == '__main__':
00156 unittest.main()
00157