test_PortBase.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- Python -*-
00003 
00004 #
00005 #  \file test_PortBase.py
00006 #  \brief test for RTC's Port base class
00007 #  \date $Date: 2007/09/18 $
00008 #  \author Shinji Kurihara
00009 # 
00010 #  Copyright (C) 2006
00011 #      Task-intelligence Research Group,
00012 #      Intelligent Systems Research Institute,
00013 #      National Institute of
00014 #          Advanced Industrial Science and Technology (AIST), Japan
00015 #      All rights reserved.
00016 # 
00017 
00018 import sys
00019 sys.path.insert(1,"../")
00020 
00021 import unittest
00022 from PortBase import *
00023 
00024 import CORBA
00025 import OpenRTM_aist
00026 import RTC, RTC__POA
00027 
00028 
00029 class RTObjMock(OpenRTM_aist.DataFlowComponentBase):
00030   def __init_(self, manager):
00031     OpenRTM_aist.DataFlowComponentBase.__init__(self, manager)
00032 
00033 class OutPortObj(OpenRTM_aist.DataFlowComponentBase):
00034   def __init__(self):
00035     OpenRTM_aist.DataFlowComponentBase.__init__(self, OpenRTM_aist.Manager.instance())
00036     self.addOutPort("out",OpenRTM_aist.OutPort("out", RTC.TimedLong(RTC.Time(0,0),0)))
00037 
00038 
00039 class InPortObj(OpenRTM_aist.DataFlowComponentBase):
00040   def __init__(self):
00041     OpenRTM_aist.DataFlowComponentBase.__init__(self, OpenRTM_aist.Manager.instance())
00042     self.addInPort("in",OpenRTM_aist.InPort("in", RTC.TimedLong(RTC.Time(0,0),0)))
00043 
00044 
00045 class TestPortBase(unittest.TestCase):
00046   def setUp(self):
00047     OpenRTM_aist.Manager.init(sys.argv)
00048     OpenRTM_aist.Manager.instance().activateManager()
00049     self._pb = PortBase()
00050     return
00051 
00052   def tearDown(self):
00053     OpenRTM_aist.Manager.instance().shutdownManager()
00054     return
00055 
00056   def test_get_port_profile(self):
00057     self._pb.setName("test_connect")
00058     prof = self._pb.get_port_profile()
00059     self.assertEqual(prof.name,"test_connect")
00060     return
00061 
00062   def test_getPortProfile(self):
00063     self._pb.setName("test_connect")
00064     prof = self._pb.getPortProfile()
00065     self.assertEqual(prof.name,"test_connect")
00066     
00067 
00068   def test_get_connector_profiles(self):
00069     OpenRTM_aist.Manager.init(sys.argv)
00070     nvlist = [OpenRTM_aist.NVUtil.newNV("dataport.interface_type","corba_cdr"),
00071         OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type","push"),
00072         OpenRTM_aist.NVUtil.newNV("dataport.subscription_type","flush")]
00073 
00074     outp = OutPortObj()
00075     outp = outp.get_ports()
00076 
00077     inp  = InPortObj()
00078     inp = inp.get_ports()
00079     prof = RTC.ConnectorProfile("connector0",
00080               "connectorid_0",
00081               [outp[0]._narrow(RTC.PortService),inp[0]._narrow(RTC.PortService)],
00082               nvlist)
00083     ret, prof = self._pb.connect(prof)
00084     cntlist = outp[0].get_connector_profiles()
00085     self.assertEqual(cntlist[0].name,"connector0")
00086     cntprofile = outp[0].get_connector_profile("connectorid_0")
00087     self.assertEqual(cntprofile.connector_id,"connectorid_0")
00088     #ret,prof = self._pb.notify_connect(prof)
00089     return
00090 
00091 
00092 
00093   def test_connect_disconnect(self):
00094     nvlist = [OpenRTM_aist.NVUtil.newNV("dataport.interface_type","corba_cdr"),
00095         OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type","push"),
00096         OpenRTM_aist.NVUtil.newNV("dataport.subscription_type","flush")]
00097 
00098     outp = OutPortObj().get_ports()
00099     inp  = InPortObj().get_ports()
00100     prof = RTC.ConnectorProfile("connector0","connector_id1",[inp[0],outp[0]],nvlist)
00101     self._pb.connect(prof)
00102     cntprofile = outp[0].get_connector_profile("connector_id1")
00103     self.assertEqual(cntprofile.connector_id,"connector_id1")
00104     outp[0].disconnect("connector_id1")
00105     cntprofile = outp[0].get_connector_profile("connector_id1")
00106     self.assertNotEqual(cntprofile.connector_id,"connector_id1")
00107     self._pb.connect(prof)
00108     outp[0].disconnect_all()
00109     self.assertNotEqual(cntprofile.connector_id,"connector_id1")
00110     return
00111 
00112 
00113 
00114   def test_getProfile(self):
00115     outp = OutPortObj().get_ports()
00116     self._pb.setName("test")
00117     self._pb.setPortRef(outp[0])
00118     rtobj = RTObjMock(OpenRTM_aist.Manager.instance())
00119     self._pb.setOwner(rtobj)
00120     prof = self._pb.getProfile()
00121     self.assertEqual(prof.name,".test")
00122     self.assertEqual(prof.port_ref,outp[0])
00123     self.assertEqual(prof.port_ref,outp[0])
00124     self.assertEqual(prof.port_ref,self._pb.getPortRef())
00125     return
00126 
00127 
00128   def test_getUUID(self):
00129     self._pb.getUUID()
00130     return
00131 
00132 
00133   def test_updateConnectorProfile(self):
00134     nvlist = [OpenRTM_aist.NVUtil.newNV("dataport.interface_type","corba_cdr"),
00135         OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type","push"),
00136         OpenRTM_aist.NVUtil.newNV("dataport.subscription_type","flush")]
00137 
00138     outp = OutPortObj().get_ports()
00139     inp  = InPortObj().get_ports()
00140     prof = RTC.ConnectorProfile("connector0","connector_id1",[inp[0],outp[0]],nvlist)
00141     self._pb.updateConnectorProfile(prof)
00142     cntlist = self._pb.get_connector_profiles()
00143     self.assertEqual(cntlist[0].name,"connector0")
00144     return
00145 
00146 
00147 
00148   def test_eraseConnectorProfile(self):
00149     nvlist = [OpenRTM_aist.NVUtil.newNV("dataport.interface_type","corba_cdr"),
00150         OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type","push"),
00151         OpenRTM_aist.NVUtil.newNV("dataport.subscription_type","flush")]
00152 
00153     outp = OutPortObj().get_ports()
00154     inp  = InPortObj().get_ports()
00155     prof = RTC.ConnectorProfile("connector0","connector_id1",[inp[0],outp[0]],nvlist)
00156     self._pb.connect(prof)
00157     #self.assertEqual(self._pb.eraseConnectorProfile("connector_id1"),True)
00158     return 
00159 
00160 
00161 
00162   def test_append_deleteInterface(self):
00163     self.assertEqual(self._pb.appendInterface("name", "type", RTC.PROVIDED), True)
00164     piprof = RTC.PortInterfaceProfile("name","type",RTC.PROVIDED)
00165     self._pb._profile=RTC.PortProfile("name",[piprof],RTC.PortService._nil,[],RTC.RTObject._nil,[])
00166     self.assertEqual(self._pb.appendInterface("name", "type", RTC.PROVIDED), False)
00167     self.assertEqual(self._pb.deleteInterface("name", RTC.PROVIDED), True)
00168     self.assertEqual(self._pb.deleteInterface("name", RTC.PROVIDED), False)
00169     return
00170     
00171 
00172   def test_add_appendProperty(self):
00173     self._pb.addProperty("name1","val1")
00174     prof = self._pb.getPortProfile()
00175     self.assertEqual(prof.properties[0].name, "name1")
00176     self._pb.appendProperty("name2","val2")
00177     prof = self._pb.getPortProfile()
00178     self.assertEqual(prof.properties[1].name, "name2")
00179 
00180 
00181 ############### test #################
00182 if __name__ == '__main__':
00183         unittest.main()


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Thu Aug 27 2015 14:17:28