test_InPortBase.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- coding: euc-jp -*-
00003 
00004 
00005 #  \file test_InPortBase.py
00006 #  \brief test for InPortBase class
00007 #  \date $Date: 2007/09/20 $
00008 #  \author Shinji Kurihara
00009 # 
00010 #  Copyright (C) 2003-2005
00011 #      Task-intelligence Research Group,
00012 #      Intelligent Systems Research Institute,
00013 #      National Institute of
00014 #          Advanced Industrial Science and Technology (AIST), Japan
00015 #      All rights reserved.
00016  
00017 
00018 import sys
00019 sys.path.insert(1,"../")
00020 
00021 from omniORB import CORBA, PortableServer
00022 from omniORB import any
00023 
00024 import unittest
00025 
00026 from InPortBase import *
00027 
00028 import RTC, RTC__POA
00029 import OpenRTM_aist
00030 
00031 
00032 class InPortMock(InPortBase):
00033   def __init__(self, name, data_type):
00034     InPortBase.__init__(self, name, data_type)
00035     return
00036 
00037   def getThebuffer(self):
00038     return self._thebuffer
00039 
00040   def getProviderTypes(self):
00041     return self._providerTypes
00042 
00043   def getConsumerTypes(self):
00044     return self._consumerTypes
00045 
00046   def publishInterfaces_public(self, connector_profile):
00047     return self.publishInterfaces(connector_profile)
00048 
00049 
00050 
00051 class TestInPortBase(unittest.TestCase):
00052   def setUp(self):
00053     #mgr=OpenRTM_aist.Manager.instance()
00054     self._orb = CORBA.ORB_init(sys.argv)
00055     self._poa = self._orb.resolve_initial_references("RootPOA")
00056     self._poa._get_the_POAManager().activate()
00057 
00058     self._inport = InPortMock("in", OpenRTM_aist.toTypename(RTC.TimedLong(RTC.Time(0,0), 0)))
00059     self._outport = OpenRTM_aist.OutPort("out",RTC.TimedLong(RTC.Time(0,0), 0))
00060     profile = self._outport.getPortProfile()
00061     prop    = OpenRTM_aist.NVUtil.toProperties(profile.properties)
00062     self._outport.init(prop)
00063 
00064     self._nvlist = [OpenRTM_aist.NVUtil.newNV("dataport.interface_type","corba_cdr"),
00065                     OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type","push"),
00066                     OpenRTM_aist.NVUtil.newNV("dataport.subscription_type","flush")]
00067 
00068     import time
00069     time.sleep(0.05)
00070     return
00071 
00072 
00073   def tearDown(self):
00074     OpenRTM_aist.Manager.instance().shutdownManager()
00075     return
00076 
00077   def test_properties(self):
00078     self.assertEqual(isinstance(self._inport.properties(),OpenRTM_aist.Properties),True)
00079     return
00080 
00081   def test_init(self):
00082     self.assertEqual(self._inport.getThebuffer(),None)
00083     profile = self._inport.getPortProfile()
00084     prop    = OpenRTM_aist.NVUtil.toProperties(profile.properties)
00085 
00086     self.assertEqual(prop.getProperty("dataport.dataflow_type"),"")
00087     self.assertEqual(prop.getProperty("dataport.interface_type"),"")
00088     
00089     pstr = self._inport.getProviderTypes()
00090     self.assertEqual(len(pstr),0)
00091     cstr = self._inport.getConsumerTypes()
00092     self.assertEqual(len(cstr),0)
00093 
00094     self._inport.init(prop)
00095     
00096     # self._singlebufferがTrueの場合self._thebufferが取得される
00097     self.assertNotEqual(self._inport.getThebuffer(),None)
00098     profile = self._inport.getPortProfile()
00099     prop    = OpenRTM_aist.NVUtil.toProperties(profile.properties)
00100 
00101     # getPortProfileのpropertiesに以下が追加される
00102     val = any.from_any(prop.getProperty("dataport.dataflow_type"),keep_structs=True)
00103     self.assertEqual(val,"push, pull")
00104     val = any.from_any(prop.getProperty("dataport.interface_type"),keep_structs=True)
00105     self.assertEqual(val,"corba_cdr")
00106     
00107     # ProviderTypes,ConsumerTypesが取得される
00108     pstr = self._inport.getProviderTypes()
00109     self.assertEqual(pstr[0],"corba_cdr")
00110     cstr = self._inport.getConsumerTypes()
00111     self.assertEqual(cstr[0],"corba_cdr")
00112 
00113     return
00114 
00115   def test_activate_deactivateInterfaces(self):
00116     profile = self._inport.getPortProfile()
00117     prop    = OpenRTM_aist.NVUtil.toProperties(profile.properties)
00118     self._inport.init(prop)
00119     prof = RTC.ConnectorProfile("InPortBaseTest0",
00120                                 "id0",
00121                                 [self._inport.get_port_profile().port_ref],
00122                                 self._nvlist)
00123 
00124     self._inport.publishInterfaces_public(prof)
00125     prof.connector_id = "id1"
00126     prof.name = "InPortBaseTest1"
00127     self._inport.publishInterfaces_public(prof)
00128 
00129     self._inport.activateInterfaces()
00130     self._inport.deactivateInterfaces()
00131 
00132 
00133     return
00134 
00135   def test_subscribe_unsubscribeInterfaces(self):
00136     profile = self._inport.getPortProfile()
00137     prop    = OpenRTM_aist.NVUtil.toProperties(profile.properties)
00138     self._inport.init(prop)
00139     prof = RTC.ConnectorProfile("InPortBaseTest0",
00140                                 "id0",
00141                                 [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref],
00142                                 self._nvlist)
00143     ret, con_prof = self._outport.connect(prof)
00144     self.assertEqual(self._inport.subscribeInterfaces(prof),RTC.RTC_OK)
00145     self._inport.unsubscribeInterfaces(prof)
00146     ret = self._outport.disconnect(prof.connector_id)
00147     return
00148     
00149 
00150   def test_getConnectorProfiles(self):
00151     profile = self._inport.getPortProfile()
00152     prop    = OpenRTM_aist.NVUtil.toProperties(profile.properties)
00153     self._inport.init(prop)
00154 
00155     prof = RTC.ConnectorProfile("InPortBaseTest0",
00156                                 "id0",
00157                                 [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref],
00158                                 self._nvlist)
00159     ret, con_prof = self._outport.connect(prof)
00160 
00161     cprofs = self._inport.getConnectorProfiles()
00162     self.assertEqual(len(cprofs),1)
00163 
00164     self.assertEqual(cprofs[0].name,"InPortBaseTest0")
00165     self.assertEqual(cprofs[0].id,"id0")
00166     
00167     ret = self._outport.disconnect(prof.connector_id)
00168     return
00169 
00170 
00171   def test_getConnectorIds(self):
00172     profile = self._inport.getPortProfile()
00173     prop    = OpenRTM_aist.NVUtil.toProperties(profile.properties)
00174     self._inport.init(prop)
00175 
00176     prof = RTC.ConnectorProfile("InPortBaseTest0",
00177                                 "id0",
00178                                 [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref],
00179                                 self._nvlist)
00180     ret, con_prof = self._outport.connect(prof)
00181 
00182     cids = self._inport.getConnectorIds()
00183     self.assertEqual(len(cids),1)
00184 
00185     self.assertEqual(cids[0],"id0")
00186     
00187     ret = self._outport.disconnect(prof.connector_id)
00188     return
00189 
00190 
00191   def test_getConnectorNames(self):
00192     profile = self._inport.getPortProfile()
00193     prop    = OpenRTM_aist.NVUtil.toProperties(profile.properties)
00194     self._inport.init(prop)
00195 
00196     prof = RTC.ConnectorProfile("InPortBaseTest0",
00197                                 "id0",
00198                                 [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref],
00199                                 self._nvlist)
00200     ret, con_prof = self._outport.connect(prof)
00201 
00202     cnames = self._inport.getConnectorNames()
00203     self.assertEqual(len(cnames),1)
00204 
00205     self.assertEqual(cnames[0],"InPortBaseTest0")
00206     
00207     ret = self._outport.disconnect(prof.connector_id)
00208     return
00209 
00210 
00211   def test_getConnectorById(self):
00212     profile = self._inport.getPortProfile()
00213     prop    = OpenRTM_aist.NVUtil.toProperties(profile.properties)
00214     self._inport.init(prop)
00215 
00216     prof = RTC.ConnectorProfile("InPortBaseTest0",
00217                                 "id0",
00218                                 [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref],
00219                                 self._nvlist)
00220     ret, con_prof = self._outport.connect(prof)
00221 
00222     con = self._inport.getConnectorById("id")
00223     self.assertEqual(con,None)
00224 
00225     con = self._inport.getConnectorById("id0")
00226     self.assertEqual(con.profile().name,"InPortBaseTest0")
00227     self.assertEqual(con.profile().id,"id0")
00228     
00229     ret = self._outport.disconnect(prof.connector_id)
00230     return
00231 
00232 
00233   def test_getConnectorByName(self):
00234     profile = self._inport.getPortProfile()
00235     prop    = OpenRTM_aist.NVUtil.toProperties(profile.properties)
00236     self._inport.init(prop)
00237 
00238     prof = RTC.ConnectorProfile("InPortBaseTest0",
00239                                 "id0",
00240                                 [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref],
00241                                 self._nvlist)
00242     ret, con_prof = self._outport.connect(prof)
00243 
00244     con = self._inport.getConnectorByName("test")
00245     self.assertEqual(con,None)
00246 
00247     con = self._inport.getConnectorByName("InPortBaseTest0")
00248 
00249     self.assertEqual(con.profile().name,"InPortBaseTest0")
00250     self.assertEqual(con.profile().id,"id0")
00251     
00252     ret = self._outport.disconnect(prof.connector_id)
00253     return
00254 
00255 
00256   def test_getConnectorProfileById(self):
00257     profile = self._inport.getPortProfile()
00258     prop    = OpenRTM_aist.NVUtil.toProperties(profile.properties)
00259     self._inport.init(prop)
00260 
00261     prof = RTC.ConnectorProfile("InPortBaseTest0",
00262                                 "id0",
00263                                 [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref],
00264                                 self._nvlist)
00265     ret, con_prof = self._outport.connect(prof)
00266 
00267     cprof = [None]
00268     ret = self._inport.getConnectorProfileById("test", cprof)
00269     self.assertEqual(ret,False)
00270 
00271     ret = self._inport.getConnectorProfileById("id0", cprof)
00272     self.assertEqual(ret,True)
00273 
00274     self.assertEqual(cprof[0].name,"InPortBaseTest0")
00275     self.assertEqual(cprof[0].id,"id0")
00276     
00277     ret = self._outport.disconnect(prof.connector_id)
00278     return
00279 
00280 
00281   def test_getConnectorProfileByName(self):
00282     profile = self._inport.getPortProfile()
00283     prop    = OpenRTM_aist.NVUtil.toProperties(profile.properties)
00284     self._inport.init(prop)
00285 
00286     prof = RTC.ConnectorProfile("InPortBaseTest0",
00287                                 "id0",
00288                                 [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref],
00289                                 self._nvlist)
00290     ret, con_prof = self._outport.connect(prof)
00291 
00292     cprof = [None]
00293     ret = self._inport.getConnectorProfileByName("test", cprof)
00294     self.assertEqual(ret,False)
00295 
00296     ret = self._inport.getConnectorProfileByName("InPortBaseTest0", cprof)
00297     self.assertEqual(ret,True)
00298 
00299     self.assertEqual(cprof[0].name,"InPortBaseTest0")
00300     self.assertEqual(cprof[0].id,"id0")
00301     
00302     ret = self._outport.disconnect(prof.connector_id)
00303     return
00304 
00305 
00306 
00307 ############### test #################
00308 if __name__ == '__main__':
00309         unittest.main()
00310 


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Thu Aug 27 2015 14:17:28