18 sys.path.insert(1,
"../")
23 from OutPortBase
import *
40 def __init__(self, name="name", id="id", ports=None, properties=None):
67 class TestOutPortBase(unittest.TestCase):
69 OpenRTM_aist.Manager.init(sys.argv)
75 OpenRTM_aist.Manager.instance().shutdownManager()
79 self.assertEqual(self._opb.getName(),
"unknown.test")
83 self.assertEqual(self._opb.connectors(),[])
87 self.assertEqual(self._opb.getConnectorProfiles(),[])
89 cprof = self._opb.getConnectorProfiles()[0]
90 self.assertEqual(cprof.name,
"name")
94 self.assertEqual(self._opb.getConnectorIds(),[])
96 cprof = self._opb.getConnectorIds()[0]
97 self.assertEqual(cprof,
"id")
101 self.assertEqual(self._opb.getConnectorProfileById(
"id",[]),
False)
104 self.assertEqual(self._opb.getConnectorProfileById(
"id",prof),
True)
105 self.assertEqual(prof[0].id,
"id")
109 self.assertEqual(self._opb.getConnectorProfileByName(
"name",[]),
False)
112 self.assertEqual(self._opb.getConnectorProfileByName(
"name",prof),
True)
113 self.assertEqual(prof[0].name,
"name")
118 self._opb.activateInterfaces()
119 self._opb.deactivateInterfaces()
123 cprof = RTC.ConnectorProfile(
"",
"",[],[])
124 self.assertEqual(self._opb.publishInterfaces(cprof),RTC.BAD_PARAMETER)
125 cprof = RTC.ConnectorProfile(
"connecto0",
"",[],[])
138 self.assertEqual(self._opb.publishInterfaces(cprof),RTC.RTC_OK)
142 cprof = RTC.ConnectorProfile(
"",
"",[],[])
143 self.assertEqual(self._opb.subscribeInterfaces(cprof),RTC.BAD_PARAMETER)
144 cprof = RTC.ConnectorProfile(
"connecto0",
"",[],[])
163 cprof = RTC.ConnectorProfile(
"connecto0",
"",[],[])
175 self._opb.unsubscribeInterfaces(cprof)
179 self._opb.initProviders()
183 self._opb.initConsumers()
188 cprof = RTC.ConnectorProfile(
"connecto0",
"",[],[])
201 prop.setProperty(
"interface_type",
"corba_cdr")
202 prop.setProperty(
"dataflow_type",
"pull")
203 prop.setProperty(
"subscription_type",
"flush")
204 self.assertNotEqual(self._opb.createProvider(cprof,prop),0)
210 cprof = RTC.ConnectorProfile(
"connecto0",
"",[],[])
223 prop.setProperty(
"interface_type",
"corba_cdr")
224 prop.setProperty(
"dataflow_type",
"push")
225 prop.setProperty(
"subscription_type",
"flush")
226 self.assertEqual(self._opb.createConsumer(cprof,prop),0)
232 cprof = RTC.ConnectorProfile(
"connecto0",
"",[],[])
245 prop.setProperty(
"interface_type",
"corba_cdr")
246 prop.setProperty(
"dataflow_type",
"push")
247 prop.setProperty(
"subscription_type",
"flush")
248 self.assertNotEqual(self._opb.createConnector(cprof,prop,consumer_=
ConsumerMock()),0)
253 self.assertEqual(self._opb.getConnectorById(
"test"),0)
256 conn = self._opb.getConnectorById(
"id")
257 self.assertEqual(conn.id(),
"id")
262 self.assertEqual(self._opb.getConnectorByName(
"test"),0)
265 conn = self._opb.getConnectorByName(
"name")
266 self.assertEqual(conn.name(),
"name")
271 if __name__ ==
'__main__':
def test_unsubscribeInterfaces(self)
def test_activateInterfaces(self)
def push_back(seq, elem)
Push the new element back to the CORBA sequence.
def test_createConsumer(self)
The Properties class represents a persistent set of properties.
def test_getConnectorIds(self)
def test_getConnectorProfileById(self)
def test_getConnectorById(self)
def test_initConsumers(self)
def test_initProviders(self)
def test_connectors(self)
def test_getConnectorProfileByName(self)
def __init__(self, name="name", id="id", ports=None, properties=None)
def test_createProvider(self)
def test_createConnector(self)
def test_getConnectorProfiles(self)
def test_getConnectorByName(self)
def test_subscribeInterfaces(self)
def test_publishInterfaces(self)
def newNV(name, value)
Create NameVale.