19 sys.path.insert(1,
"../")
21 from omniORB
import CORBA, PortableServer
22 from omniORB
import any
26 from InPortBase
import *
34 InPortBase.__init__(self, name, data_type)
38 return self._thebuffer
41 return self._providerTypes
44 return self._consumerTypes
47 return self.publishInterfaces(connector_profile)
54 self.
_orb = CORBA.ORB_init(sys.argv)
55 self.
_poa = self.
_orb.resolve_initial_references(
"RootPOA")
56 self.
_poa._get_the_POAManager().activate()
60 profile = self.
_outport.getPortProfile()
74 OpenRTM_aist.Manager.instance().shutdownManager()
82 self.assertEqual(self.
_inport.getThebuffer(),
None)
83 profile = self.
_inport.getPortProfile()
86 self.assertEqual(prop.getProperty(
"dataport.dataflow_type"),
"")
87 self.assertEqual(prop.getProperty(
"dataport.interface_type"),
"")
89 pstr = self.
_inport.getProviderTypes()
90 self.assertEqual(len(pstr),0)
91 cstr = self.
_inport.getConsumerTypes()
92 self.assertEqual(len(cstr),0)
97 self.assertNotEqual(self.
_inport.getThebuffer(),
None)
98 profile = self.
_inport.getPortProfile()
102 val = any.from_any(prop.getProperty(
"dataport.dataflow_type"),keep_structs=
True)
103 self.assertEqual(val,
"push, pull")
104 val = any.from_any(prop.getProperty(
"dataport.interface_type"),keep_structs=
True)
105 self.assertEqual(val,
"corba_cdr")
108 pstr = self.
_inport.getProviderTypes()
109 self.assertEqual(pstr[0],
"corba_cdr")
110 cstr = self.
_inport.getConsumerTypes()
111 self.assertEqual(cstr[0],
"corba_cdr")
116 profile = self.
_inport.getPortProfile()
119 prof = RTC.ConnectorProfile(
"InPortBaseTest0",
121 [self.
_inport.get_port_profile().port_ref],
124 self.
_inport.publishInterfaces_public(prof)
125 prof.connector_id =
"id1" 126 prof.name =
"InPortBaseTest1" 127 self.
_inport.publishInterfaces_public(prof)
129 self.
_inport.activateInterfaces()
130 self.
_inport.deactivateInterfaces()
136 profile = self.
_inport.getPortProfile()
139 prof = RTC.ConnectorProfile(
"InPortBaseTest0",
141 [self.
_inport.get_port_profile().port_ref,self.
_outport.get_port_profile().port_ref],
143 ret, con_prof = self.
_outport.connect(prof)
144 self.assertEqual(self.
_inport.subscribeInterfaces(prof),RTC.RTC_OK)
145 self.
_inport.unsubscribeInterfaces(prof)
146 ret = self.
_outport.disconnect(prof.connector_id)
151 profile = self.
_inport.getPortProfile()
155 prof = RTC.ConnectorProfile(
"InPortBaseTest0",
157 [self.
_inport.get_port_profile().port_ref,self.
_outport.get_port_profile().port_ref],
159 ret, con_prof = self.
_outport.connect(prof)
161 cprofs = self.
_inport.getConnectorProfiles()
162 self.assertEqual(len(cprofs),1)
164 self.assertEqual(cprofs[0].name,
"InPortBaseTest0")
165 self.assertEqual(cprofs[0].id,
"id0")
167 ret = self.
_outport.disconnect(prof.connector_id)
172 profile = self.
_inport.getPortProfile()
176 prof = RTC.ConnectorProfile(
"InPortBaseTest0",
178 [self.
_inport.get_port_profile().port_ref,self.
_outport.get_port_profile().port_ref],
180 ret, con_prof = self.
_outport.connect(prof)
182 cids = self.
_inport.getConnectorIds()
183 self.assertEqual(len(cids),1)
185 self.assertEqual(cids[0],
"id0")
187 ret = self.
_outport.disconnect(prof.connector_id)
192 profile = self.
_inport.getPortProfile()
196 prof = RTC.ConnectorProfile(
"InPortBaseTest0",
198 [self.
_inport.get_port_profile().port_ref,self.
_outport.get_port_profile().port_ref],
200 ret, con_prof = self.
_outport.connect(prof)
202 cnames = self.
_inport.getConnectorNames()
203 self.assertEqual(len(cnames),1)
205 self.assertEqual(cnames[0],
"InPortBaseTest0")
207 ret = self.
_outport.disconnect(prof.connector_id)
212 profile = self.
_inport.getPortProfile()
216 prof = RTC.ConnectorProfile(
"InPortBaseTest0",
218 [self.
_inport.get_port_profile().port_ref,self.
_outport.get_port_profile().port_ref],
220 ret, con_prof = self.
_outport.connect(prof)
222 con = self.
_inport.getConnectorById(
"id")
223 self.assertEqual(con,
None)
225 con = self.
_inport.getConnectorById(
"id0")
226 self.assertEqual(con.profile().name,
"InPortBaseTest0")
227 self.assertEqual(con.profile().id,
"id0")
229 ret = self.
_outport.disconnect(prof.connector_id)
234 profile = self.
_inport.getPortProfile()
238 prof = RTC.ConnectorProfile(
"InPortBaseTest0",
240 [self.
_inport.get_port_profile().port_ref,self.
_outport.get_port_profile().port_ref],
242 ret, con_prof = self.
_outport.connect(prof)
244 con = self.
_inport.getConnectorByName(
"test")
245 self.assertEqual(con,
None)
247 con = self.
_inport.getConnectorByName(
"InPortBaseTest0")
249 self.assertEqual(con.profile().name,
"InPortBaseTest0")
250 self.assertEqual(con.profile().id,
"id0")
252 ret = self.
_outport.disconnect(prof.connector_id)
257 profile = self.
_inport.getPortProfile()
261 prof = RTC.ConnectorProfile(
"InPortBaseTest0",
263 [self.
_inport.get_port_profile().port_ref,self.
_outport.get_port_profile().port_ref],
265 ret, con_prof = self.
_outport.connect(prof)
268 ret = self.
_inport.getConnectorProfileById(
"test", cprof)
269 self.assertEqual(ret,
False)
271 ret = self.
_inport.getConnectorProfileById(
"id0", cprof)
272 self.assertEqual(ret,
True)
274 self.assertEqual(cprof[0].name,
"InPortBaseTest0")
275 self.assertEqual(cprof[0].id,
"id0")
277 ret = self.
_outport.disconnect(prof.connector_id)
282 profile = self.
_inport.getPortProfile()
286 prof = RTC.ConnectorProfile(
"InPortBaseTest0",
288 [self.
_inport.get_port_profile().port_ref,self.
_outport.get_port_profile().port_ref],
290 ret, con_prof = self.
_outport.connect(prof)
293 ret = self.
_inport.getConnectorProfileByName(
"test", cprof)
294 self.assertEqual(ret,
False)
296 ret = self.
_inport.getConnectorProfileByName(
"InPortBaseTest0", cprof)
297 self.assertEqual(ret,
True)
299 self.assertEqual(cprof[0].name,
"InPortBaseTest0")
300 self.assertEqual(cprof[0].id,
"id0")
302 ret = self.
_outport.disconnect(prof.connector_id)
308 if __name__ ==
'__main__':
def getConsumerTypes(self)
def test_getConnectorIds(self)
def __init__(self, name, data_type)
def test_getConnectorProfileByName(self)
The Properties class represents a persistent set of properties.
def getProviderTypes(self)
def test_getConnectorByName(self)
def test_getConnectorById(self)
def test_getConnectorNames(self)
def test_getConnectorProfileById(self)
def test_subscribe_unsubscribeInterfaces(self)
def test_properties(self)
def toProperties(nv)
coil::Properties toProperties(const SDOPackage::NVList& nv);
def test_getConnectorProfiles(self)
def test_activate_deactivateInterfaces(self)
def newNV(name, value)
Create NameVale.
def publishInterfaces_public(self, connector_profile)