00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 import sys
00019 sys.path.insert(1,"../")
00020
00021 from omniORB import CORBA, PortableServer
00022 from omniORB import any
00023
00024 import unittest
00025
00026 from InPortBase import *
00027
00028 import RTC, RTC__POA
00029 import OpenRTM_aist
00030
00031
00032 class InPortMock(InPortBase):
00033 def __init__(self, name, data_type):
00034 InPortBase.__init__(self, name, data_type)
00035 return
00036
00037 def getThebuffer(self):
00038 return self._thebuffer
00039
00040 def getProviderTypes(self):
00041 return self._providerTypes
00042
00043 def getConsumerTypes(self):
00044 return self._consumerTypes
00045
00046 def publishInterfaces_public(self, connector_profile):
00047 return self.publishInterfaces(connector_profile)
00048
00049
00050
00051 class TestInPortBase(unittest.TestCase):
00052 def setUp(self):
00053
00054 self._orb = CORBA.ORB_init(sys.argv)
00055 self._poa = self._orb.resolve_initial_references("RootPOA")
00056 self._poa._get_the_POAManager().activate()
00057
00058 self._inport = InPortMock("in", OpenRTM_aist.toTypename(RTC.TimedLong(RTC.Time(0,0), 0)))
00059 self._outport = OpenRTM_aist.OutPort("out",RTC.TimedLong(RTC.Time(0,0), 0))
00060 profile = self._outport.getPortProfile()
00061 prop = OpenRTM_aist.NVUtil.toProperties(profile.properties)
00062 self._outport.init(prop)
00063
00064 self._nvlist = [OpenRTM_aist.NVUtil.newNV("dataport.interface_type","corba_cdr"),
00065 OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type","push"),
00066 OpenRTM_aist.NVUtil.newNV("dataport.subscription_type","flush")]
00067
00068 import time
00069 time.sleep(0.05)
00070 return
00071
00072
00073 def tearDown(self):
00074 OpenRTM_aist.Manager.instance().shutdownManager()
00075 return
00076
00077 def test_properties(self):
00078 self.assertEqual(isinstance(self._inport.properties(),OpenRTM_aist.Properties),True)
00079 return
00080
00081 def test_init(self):
00082 self.assertEqual(self._inport.getThebuffer(),None)
00083 profile = self._inport.getPortProfile()
00084 prop = OpenRTM_aist.NVUtil.toProperties(profile.properties)
00085
00086 self.assertEqual(prop.getProperty("dataport.dataflow_type"),"")
00087 self.assertEqual(prop.getProperty("dataport.interface_type"),"")
00088
00089 pstr = self._inport.getProviderTypes()
00090 self.assertEqual(len(pstr),0)
00091 cstr = self._inport.getConsumerTypes()
00092 self.assertEqual(len(cstr),0)
00093
00094 self._inport.init(prop)
00095
00096
00097 self.assertNotEqual(self._inport.getThebuffer(),None)
00098 profile = self._inport.getPortProfile()
00099 prop = OpenRTM_aist.NVUtil.toProperties(profile.properties)
00100
00101
00102 val = any.from_any(prop.getProperty("dataport.dataflow_type"),keep_structs=True)
00103 self.assertEqual(val,"push, pull")
00104 val = any.from_any(prop.getProperty("dataport.interface_type"),keep_structs=True)
00105 self.assertEqual(val,"corba_cdr")
00106
00107
00108 pstr = self._inport.getProviderTypes()
00109 self.assertEqual(pstr[0],"corba_cdr")
00110 cstr = self._inport.getConsumerTypes()
00111 self.assertEqual(cstr[0],"corba_cdr")
00112
00113 return
00114
00115 def test_activate_deactivateInterfaces(self):
00116 profile = self._inport.getPortProfile()
00117 prop = OpenRTM_aist.NVUtil.toProperties(profile.properties)
00118 self._inport.init(prop)
00119 prof = RTC.ConnectorProfile("InPortBaseTest0",
00120 "id0",
00121 [self._inport.get_port_profile().port_ref],
00122 self._nvlist)
00123
00124 self._inport.publishInterfaces_public(prof)
00125 prof.connector_id = "id1"
00126 prof.name = "InPortBaseTest1"
00127 self._inport.publishInterfaces_public(prof)
00128
00129 self._inport.activateInterfaces()
00130 self._inport.deactivateInterfaces()
00131
00132
00133 return
00134
00135 def test_subscribe_unsubscribeInterfaces(self):
00136 profile = self._inport.getPortProfile()
00137 prop = OpenRTM_aist.NVUtil.toProperties(profile.properties)
00138 self._inport.init(prop)
00139 prof = RTC.ConnectorProfile("InPortBaseTest0",
00140 "id0",
00141 [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref],
00142 self._nvlist)
00143 ret, con_prof = self._outport.connect(prof)
00144 self.assertEqual(self._inport.subscribeInterfaces(prof),RTC.RTC_OK)
00145 self._inport.unsubscribeInterfaces(prof)
00146 ret = self._outport.disconnect(prof.connector_id)
00147 return
00148
00149
00150 def test_getConnectorProfiles(self):
00151 profile = self._inport.getPortProfile()
00152 prop = OpenRTM_aist.NVUtil.toProperties(profile.properties)
00153 self._inport.init(prop)
00154
00155 prof = RTC.ConnectorProfile("InPortBaseTest0",
00156 "id0",
00157 [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref],
00158 self._nvlist)
00159 ret, con_prof = self._outport.connect(prof)
00160
00161 cprofs = self._inport.getConnectorProfiles()
00162 self.assertEqual(len(cprofs),1)
00163
00164 self.assertEqual(cprofs[0].name,"InPortBaseTest0")
00165 self.assertEqual(cprofs[0].id,"id0")
00166
00167 ret = self._outport.disconnect(prof.connector_id)
00168 return
00169
00170
00171 def test_getConnectorIds(self):
00172 profile = self._inport.getPortProfile()
00173 prop = OpenRTM_aist.NVUtil.toProperties(profile.properties)
00174 self._inport.init(prop)
00175
00176 prof = RTC.ConnectorProfile("InPortBaseTest0",
00177 "id0",
00178 [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref],
00179 self._nvlist)
00180 ret, con_prof = self._outport.connect(prof)
00181
00182 cids = self._inport.getConnectorIds()
00183 self.assertEqual(len(cids),1)
00184
00185 self.assertEqual(cids[0],"id0")
00186
00187 ret = self._outport.disconnect(prof.connector_id)
00188 return
00189
00190
00191 def test_getConnectorNames(self):
00192 profile = self._inport.getPortProfile()
00193 prop = OpenRTM_aist.NVUtil.toProperties(profile.properties)
00194 self._inport.init(prop)
00195
00196 prof = RTC.ConnectorProfile("InPortBaseTest0",
00197 "id0",
00198 [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref],
00199 self._nvlist)
00200 ret, con_prof = self._outport.connect(prof)
00201
00202 cnames = self._inport.getConnectorNames()
00203 self.assertEqual(len(cnames),1)
00204
00205 self.assertEqual(cnames[0],"InPortBaseTest0")
00206
00207 ret = self._outport.disconnect(prof.connector_id)
00208 return
00209
00210
00211 def test_getConnectorById(self):
00212 profile = self._inport.getPortProfile()
00213 prop = OpenRTM_aist.NVUtil.toProperties(profile.properties)
00214 self._inport.init(prop)
00215
00216 prof = RTC.ConnectorProfile("InPortBaseTest0",
00217 "id0",
00218 [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref],
00219 self._nvlist)
00220 ret, con_prof = self._outport.connect(prof)
00221
00222 con = self._inport.getConnectorById("id")
00223 self.assertEqual(con,None)
00224
00225 con = self._inport.getConnectorById("id0")
00226 self.assertEqual(con.profile().name,"InPortBaseTest0")
00227 self.assertEqual(con.profile().id,"id0")
00228
00229 ret = self._outport.disconnect(prof.connector_id)
00230 return
00231
00232
00233 def test_getConnectorByName(self):
00234 profile = self._inport.getPortProfile()
00235 prop = OpenRTM_aist.NVUtil.toProperties(profile.properties)
00236 self._inport.init(prop)
00237
00238 prof = RTC.ConnectorProfile("InPortBaseTest0",
00239 "id0",
00240 [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref],
00241 self._nvlist)
00242 ret, con_prof = self._outport.connect(prof)
00243
00244 con = self._inport.getConnectorByName("test")
00245 self.assertEqual(con,None)
00246
00247 con = self._inport.getConnectorByName("InPortBaseTest0")
00248
00249 self.assertEqual(con.profile().name,"InPortBaseTest0")
00250 self.assertEqual(con.profile().id,"id0")
00251
00252 ret = self._outport.disconnect(prof.connector_id)
00253 return
00254
00255
00256 def test_getConnectorProfileById(self):
00257 profile = self._inport.getPortProfile()
00258 prop = OpenRTM_aist.NVUtil.toProperties(profile.properties)
00259 self._inport.init(prop)
00260
00261 prof = RTC.ConnectorProfile("InPortBaseTest0",
00262 "id0",
00263 [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref],
00264 self._nvlist)
00265 ret, con_prof = self._outport.connect(prof)
00266
00267 cprof = [None]
00268 ret = self._inport.getConnectorProfileById("test", cprof)
00269 self.assertEqual(ret,False)
00270
00271 ret = self._inport.getConnectorProfileById("id0", cprof)
00272 self.assertEqual(ret,True)
00273
00274 self.assertEqual(cprof[0].name,"InPortBaseTest0")
00275 self.assertEqual(cprof[0].id,"id0")
00276
00277 ret = self._outport.disconnect(prof.connector_id)
00278 return
00279
00280
00281 def test_getConnectorProfileByName(self):
00282 profile = self._inport.getPortProfile()
00283 prop = OpenRTM_aist.NVUtil.toProperties(profile.properties)
00284 self._inport.init(prop)
00285
00286 prof = RTC.ConnectorProfile("InPortBaseTest0",
00287 "id0",
00288 [self._inport.get_port_profile().port_ref,self._outport.get_port_profile().port_ref],
00289 self._nvlist)
00290 ret, con_prof = self._outport.connect(prof)
00291
00292 cprof = [None]
00293 ret = self._inport.getConnectorProfileByName("test", cprof)
00294 self.assertEqual(ret,False)
00295
00296 ret = self._inport.getConnectorProfileByName("InPortBaseTest0", cprof)
00297 self.assertEqual(ret,True)
00298
00299 self.assertEqual(cprof[0].name,"InPortBaseTest0")
00300 self.assertEqual(cprof[0].id,"id0")
00301
00302 ret = self._outport.disconnect(prof.connector_id)
00303 return
00304
00305
00306
00307
00308 if __name__ == '__main__':
00309 unittest.main()
00310