19 sys.path.insert(1,
"../")
22 from PortBase
import *
31 OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
35 OpenRTM_aist.DataFlowComponentBase.__init__(self, OpenRTM_aist.Manager.instance())
41 OpenRTM_aist.DataFlowComponentBase.__init__(self, OpenRTM_aist.Manager.instance())
47 OpenRTM_aist.Manager.init(sys.argv)
48 OpenRTM_aist.Manager.instance().activateManager()
53 OpenRTM_aist.Manager.instance().shutdownManager()
57 self.
_pb.setName(
"test_connect")
58 prof = self.
_pb.get_port_profile()
59 self.assertEqual(prof.name,
"test_connect")
63 self.
_pb.setName(
"test_connect")
64 prof = self.
_pb.getPortProfile()
65 self.assertEqual(prof.name,
"test_connect")
69 OpenRTM_aist.Manager.init(sys.argv)
75 outp = outp.get_ports()
79 prof = RTC.ConnectorProfile(
"connector0",
81 [outp[0]._narrow(RTC.PortService),inp[0]._narrow(RTC.PortService)],
83 ret, prof = self.
_pb.connect(prof)
84 cntlist = outp[0].get_connector_profiles()
85 self.assertEqual(cntlist[0].name,
"connector0")
86 cntprofile = outp[0].get_connector_profile(
"connectorid_0")
87 self.assertEqual(cntprofile.connector_id,
"connectorid_0")
100 prof = RTC.ConnectorProfile(
"connector0",
"connector_id1",[inp[0],outp[0]],nvlist)
101 self.
_pb.connect(prof)
102 cntprofile = outp[0].get_connector_profile(
"connector_id1")
103 self.assertEqual(cntprofile.connector_id,
"connector_id1")
104 outp[0].disconnect(
"connector_id1")
105 cntprofile = outp[0].get_connector_profile(
"connector_id1")
106 self.assertNotEqual(cntprofile.connector_id,
"connector_id1")
107 self.
_pb.connect(prof)
108 outp[0].disconnect_all()
109 self.assertNotEqual(cntprofile.connector_id,
"connector_id1")
116 self.
_pb.setName(
"test")
117 self.
_pb.setPortRef(outp[0])
118 rtobj =
RTObjMock(OpenRTM_aist.Manager.instance())
119 self.
_pb.setOwner(rtobj)
120 prof = self.
_pb.getProfile()
121 self.assertEqual(prof.name,
".test")
122 self.assertEqual(prof.port_ref,outp[0])
123 self.assertEqual(prof.port_ref,outp[0])
124 self.assertEqual(prof.port_ref,self.
_pb.getPortRef())
140 prof = RTC.ConnectorProfile(
"connector0",
"connector_id1",[inp[0],outp[0]],nvlist)
141 self.
_pb.updateConnectorProfile(prof)
142 cntlist = self.
_pb.get_connector_profiles()
143 self.assertEqual(cntlist[0].name,
"connector0")
155 prof = RTC.ConnectorProfile(
"connector0",
"connector_id1",[inp[0],outp[0]],nvlist)
156 self.
_pb.connect(prof)
163 self.assertEqual(self.
_pb.appendInterface(
"name",
"type", RTC.PROVIDED),
True)
164 piprof = RTC.PortInterfaceProfile(
"name",
"type",RTC.PROVIDED)
165 self.
_pb._profile=RTC.PortProfile(
"name",[piprof],RTC.PortService._nil,[],RTC.RTObject._nil,[])
166 self.assertEqual(self.
_pb.appendInterface(
"name",
"type", RTC.PROVIDED),
False)
167 self.assertEqual(self.
_pb.deleteInterface(
"name", RTC.PROVIDED),
True)
168 self.assertEqual(self.
_pb.deleteInterface(
"name", RTC.PROVIDED),
False)
173 self.
_pb.addProperty(
"name1",
"val1")
174 prof = self.
_pb.getPortProfile()
175 self.assertEqual(prof.properties[0].name,
"name1")
176 self.
_pb.appendProperty(
"name2",
"val2")
177 prof = self.
_pb.getPortProfile()
178 self.assertEqual(prof.properties[1].name,
"name2")
182 if __name__ ==
'__main__':
def test_getProfile(self)
def test_getPortProfile(self)
def test_connect_disconnect(self)
def test_updateConnectorProfile(self)
def __init_(self, manager)
def test_get_connector_profiles(self)
def addOutPort(self, name, outport)
def addInPort(self, name, inport)
def test_eraseConnectorProfile(self)
def test_get_port_profile(self)
def test_add_appendProperty(self)
DataFlowComponentBase class.
def newNV(name, value)
Create NameVale.
def test_append_deleteInterface(self)