19 sys.path.insert(1,
"../")
23 from PortAdmin
import *
28 from omniORB
import CORBA
32 OpenRTM_aist.PortBase.__init__(self)
44 print "activateInterfaces" 48 print "deactivateInterfaces" 53 self.
_orb = CORBA.ORB_init(sys.argv)
54 self.
_poa = self._orb.resolve_initial_references(
"RootPOA")
55 self._poa._get_the_POAManager().activate()
62 self._pb1.setName(
"port0")
63 self._pb2.setName(
"port1")
65 self._pa.registerPort(self.
_pb1)
66 self._pa.registerPort(self.
_pb2)
70 OpenRTM_aist.Manager.instance().shutdownManager()
74 plist = self._pa.getPortServiceList()
76 prof0 = plist[0].get_port_profile()
77 self.assertEqual(prof0.name,
"port0")
79 prof1 = plist[1].get_port_profile()
80 self.assertEqual(prof1.name,
"port1")
85 pprof = self._pa.getPortProfileList()
91 getP = self._pa.getPortRef(
"")
92 self.assertEqual(CORBA.is_nil(getP),
True)
94 getP = self._pa.getPortRef(
"port1")
95 self.assertEqual(CORBA.is_nil(getP),
False)
96 self.assertEqual(getP.get_port_profile().name,
"port1")
98 getP = self._pa.getPortRef(
"port0")
99 self.assertEqual(CORBA.is_nil(getP),
False)
100 self.assertEqual(getP.get_port_profile().name,
"port0")
105 pb = self._pa.getPort(
"port0")
106 prof = pb.get_port_profile()
107 self.assertEqual(prof.name,
"port0")
109 pb = self._pa.getPort(
"port1")
110 prof = pb.get_port_profile()
111 self.assertEqual(prof.name,
"port1")
116 self._pa.deletePort(self.
_pb1)
117 plist = self._pa.getPortServiceList()
118 self.assertEqual(len(plist), 1)
120 prof = plist[0].get_port_profile()
121 self.assertEqual(prof.name,
"port1")
126 plist = self._pa.getPortServiceList()
127 self.assertEqual(len(plist), 2)
129 self._pa.deletePortByName(
"port1")
131 plist = self._pa.getPortServiceList()
132 self.assertEqual(len(plist), 1)
137 self._pa.activatePorts()
138 self._pa.deactivatePorts()
143 plist = self._pa.getPortServiceList()
144 self.assertEqual(len(plist), 2)
146 self._pa.finalizePorts()
148 plist = self._pa.getPortServiceList()
149 self.assertEqual(len(plist), 0)
155 if __name__ ==
'__main__':
def test_getPortServiceList(self)
def CCCtest_deletePort(self)
def CCCtest_deletePortByName(self)
def subscribeInterfaces(self, prof)
def CCCtest_getPortRef(self)
def unsubscribeInterfaces(self, prof)
def activateInterfaces(self)
def CCCtest_getPort(self)
def deactivateInterfaces(self)
def CCCtest_finalizePorts(self)
def CCCtest_activatePorts(self)
def publishInterfaces(self, prof)
def test_getPortProfileList(self)