00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 import sys
00019 sys.path.insert(1,"../")
00020
00021 import unittest
00022 from PortBase import *
00023
00024 import CORBA
00025 import OpenRTM_aist
00026 import RTC, RTC__POA
00027
00028
00029 class RTObjMock(OpenRTM_aist.DataFlowComponentBase):
00030 def __init_(self, manager):
00031 OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
00032
00033 class OutPortObj(OpenRTM_aist.DataFlowComponentBase):
00034 def __init__(self):
00035 OpenRTM_aist.DataFlowComponentBase.__init__(self, OpenRTM_aist.Manager.instance())
00036 self.addOutPort("out",OpenRTM_aist.OutPort("out", RTC.TimedLong(RTC.Time(0,0),0)))
00037
00038
00039 class InPortObj(OpenRTM_aist.DataFlowComponentBase):
00040 def __init__(self):
00041 OpenRTM_aist.DataFlowComponentBase.__init__(self, OpenRTM_aist.Manager.instance())
00042 self.addInPort("in",OpenRTM_aist.InPort("in", RTC.TimedLong(RTC.Time(0,0),0)))
00043
00044
00045 class TestPortBase(unittest.TestCase):
00046 def setUp(self):
00047 OpenRTM_aist.Manager.init(sys.argv)
00048 OpenRTM_aist.Manager.instance().activateManager()
00049 self._pb = PortBase()
00050 return
00051
00052 def tearDown(self):
00053 OpenRTM_aist.Manager.instance().shutdownManager()
00054 return
00055
00056 def test_get_port_profile(self):
00057 self._pb.setName("test_connect")
00058 prof = self._pb.get_port_profile()
00059 self.assertEqual(prof.name,"test_connect")
00060 return
00061
00062 def test_getPortProfile(self):
00063 self._pb.setName("test_connect")
00064 prof = self._pb.getPortProfile()
00065 self.assertEqual(prof.name,"test_connect")
00066
00067
00068 def test_get_connector_profiles(self):
00069 OpenRTM_aist.Manager.init(sys.argv)
00070 nvlist = [OpenRTM_aist.NVUtil.newNV("dataport.interface_type","corba_cdr"),
00071 OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type","push"),
00072 OpenRTM_aist.NVUtil.newNV("dataport.subscription_type","flush")]
00073
00074 outp = OutPortObj()
00075 outp = outp.get_ports()
00076
00077 inp = InPortObj()
00078 inp = inp.get_ports()
00079 prof = RTC.ConnectorProfile("connector0",
00080 "connectorid_0",
00081 [outp[0]._narrow(RTC.PortService),inp[0]._narrow(RTC.PortService)],
00082 nvlist)
00083 ret, prof = self._pb.connect(prof)
00084 cntlist = outp[0].get_connector_profiles()
00085 self.assertEqual(cntlist[0].name,"connector0")
00086 cntprofile = outp[0].get_connector_profile("connectorid_0")
00087 self.assertEqual(cntprofile.connector_id,"connectorid_0")
00088
00089 return
00090
00091
00092
00093 def test_connect_disconnect(self):
00094 nvlist = [OpenRTM_aist.NVUtil.newNV("dataport.interface_type","corba_cdr"),
00095 OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type","push"),
00096 OpenRTM_aist.NVUtil.newNV("dataport.subscription_type","flush")]
00097
00098 outp = OutPortObj().get_ports()
00099 inp = InPortObj().get_ports()
00100 prof = RTC.ConnectorProfile("connector0","connector_id1",[inp[0],outp[0]],nvlist)
00101 self._pb.connect(prof)
00102 cntprofile = outp[0].get_connector_profile("connector_id1")
00103 self.assertEqual(cntprofile.connector_id,"connector_id1")
00104 outp[0].disconnect("connector_id1")
00105 cntprofile = outp[0].get_connector_profile("connector_id1")
00106 self.assertNotEqual(cntprofile.connector_id,"connector_id1")
00107 self._pb.connect(prof)
00108 outp[0].disconnect_all()
00109 self.assertNotEqual(cntprofile.connector_id,"connector_id1")
00110 return
00111
00112
00113
00114 def test_getProfile(self):
00115 outp = OutPortObj().get_ports()
00116 self._pb.setName("test")
00117 self._pb.setPortRef(outp[0])
00118 rtobj = RTObjMock(OpenRTM_aist.Manager.instance())
00119 self._pb.setOwner(rtobj)
00120 prof = self._pb.getProfile()
00121 self.assertEqual(prof.name,".test")
00122 self.assertEqual(prof.port_ref,outp[0])
00123 self.assertEqual(prof.port_ref,outp[0])
00124 self.assertEqual(prof.port_ref,self._pb.getPortRef())
00125 return
00126
00127
00128 def test_getUUID(self):
00129 self._pb.getUUID()
00130 return
00131
00132
00133 def test_updateConnectorProfile(self):
00134 nvlist = [OpenRTM_aist.NVUtil.newNV("dataport.interface_type","corba_cdr"),
00135 OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type","push"),
00136 OpenRTM_aist.NVUtil.newNV("dataport.subscription_type","flush")]
00137
00138 outp = OutPortObj().get_ports()
00139 inp = InPortObj().get_ports()
00140 prof = RTC.ConnectorProfile("connector0","connector_id1",[inp[0],outp[0]],nvlist)
00141 self._pb.updateConnectorProfile(prof)
00142 cntlist = self._pb.get_connector_profiles()
00143 self.assertEqual(cntlist[0].name,"connector0")
00144 return
00145
00146
00147
00148 def test_eraseConnectorProfile(self):
00149 nvlist = [OpenRTM_aist.NVUtil.newNV("dataport.interface_type","corba_cdr"),
00150 OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type","push"),
00151 OpenRTM_aist.NVUtil.newNV("dataport.subscription_type","flush")]
00152
00153 outp = OutPortObj().get_ports()
00154 inp = InPortObj().get_ports()
00155 prof = RTC.ConnectorProfile("connector0","connector_id1",[inp[0],outp[0]],nvlist)
00156 self._pb.connect(prof)
00157
00158 return
00159
00160
00161
00162 def test_append_deleteInterface(self):
00163 self.assertEqual(self._pb.appendInterface("name", "type", RTC.PROVIDED), True)
00164 piprof = RTC.PortInterfaceProfile("name","type",RTC.PROVIDED)
00165 self._pb._profile=RTC.PortProfile("name",[piprof],RTC.PortService._nil,[],RTC.RTObject._nil,[])
00166 self.assertEqual(self._pb.appendInterface("name", "type", RTC.PROVIDED), False)
00167 self.assertEqual(self._pb.deleteInterface("name", RTC.PROVIDED), True)
00168 self.assertEqual(self._pb.deleteInterface("name", RTC.PROVIDED), False)
00169 return
00170
00171
00172 def test_add_appendProperty(self):
00173 self._pb.addProperty("name1","val1")
00174 prof = self._pb.getPortProfile()
00175 self.assertEqual(prof.properties[0].name, "name1")
00176 self._pb.appendProperty("name2","val2")
00177 prof = self._pb.getPortProfile()
00178 self.assertEqual(prof.properties[1].name, "name2")
00179
00180
00181
00182 if __name__ == '__main__':
00183 unittest.main()