18 sys.path.insert(1,
"../")
    23 from OutPortBase 
import *
    40   def __init__(self, name="name", id="id", ports=None, properties=None):
    67 class TestOutPortBase(unittest.TestCase):
    69     OpenRTM_aist.Manager.init(sys.argv)
    75     OpenRTM_aist.Manager.instance().shutdownManager()
    79     self.assertEqual(self.
_opb.getName(),
"unknown.test")
    83     self.assertEqual(self.
_opb.connectors(),[])
    87     self.assertEqual(self.
_opb.getConnectorProfiles(),[])
    89     cprof = self.
_opb.getConnectorProfiles()[0]
    90     self.assertEqual(cprof.name,
"name")
    94     self.assertEqual(self.
_opb.getConnectorIds(),[])
    96     cprof = self.
_opb.getConnectorIds()[0]
    97     self.assertEqual(cprof,
"id")
   101     self.assertEqual(self.
_opb.getConnectorProfileById(
"id",[]),
False)
   104     self.assertEqual(self.
_opb.getConnectorProfileById(
"id",prof),
True)
   105     self.assertEqual(prof[0].id,
"id")
   109     self.assertEqual(self.
_opb.getConnectorProfileByName(
"name",[]),
False)
   112     self.assertEqual(self.
_opb.getConnectorProfileByName(
"name",prof),
True)
   113     self.assertEqual(prof[0].name,
"name")
   118     self.
_opb.activateInterfaces()
   119     self.
_opb.deactivateInterfaces()
   123     cprof = RTC.ConnectorProfile(
"",
"",[],[])
   124     self.assertEqual(self.
_opb.publishInterfaces(cprof),RTC.BAD_PARAMETER)
   125     cprof = RTC.ConnectorProfile(
"connecto0",
"",[],[])
   138     self.assertEqual(self.
_opb.publishInterfaces(cprof),RTC.RTC_OK)
   142     cprof = RTC.ConnectorProfile(
"",
"",[],[])
   143     self.assertEqual(self.
_opb.subscribeInterfaces(cprof),RTC.BAD_PARAMETER)
   144     cprof = RTC.ConnectorProfile(
"connecto0",
"",[],[])
   163     cprof = RTC.ConnectorProfile(
"connecto0",
"",[],[])
   175     self.
_opb.unsubscribeInterfaces(cprof)
   179     self.
_opb.initProviders()
   183     self.
_opb.initConsumers()
   188     cprof = RTC.ConnectorProfile(
"connecto0",
"",[],[])
   201     prop.setProperty(
"interface_type",
"corba_cdr")
   202     prop.setProperty(
"dataflow_type",
"pull")
   203     prop.setProperty(
"subscription_type",
"flush")
   204     self.assertNotEqual(self.
_opb.createProvider(cprof,prop),0)
   210     cprof = RTC.ConnectorProfile(
"connecto0",
"",[],[])
   223     prop.setProperty(
"interface_type",
"corba_cdr")
   224     prop.setProperty(
"dataflow_type",
"push")
   225     prop.setProperty(
"subscription_type",
"flush")
   226     self.assertEqual(self.
_opb.createConsumer(cprof,prop),0)
   232     cprof = RTC.ConnectorProfile(
"connecto0",
"",[],[])
   245     prop.setProperty(
"interface_type",
"corba_cdr")
   246     prop.setProperty(
"dataflow_type",
"push")
   247     prop.setProperty(
"subscription_type",
"flush")
   248     self.assertNotEqual(self.
_opb.createConnector(cprof,prop,consumer_=
ConsumerMock()),0)
   253     self.assertEqual(self.
_opb.getConnectorById(
"test"),0)
   256     conn = self.
_opb.getConnectorById(
"id")
   257     self.assertEqual(conn.id(),
"id")
   262     self.assertEqual(self.
_opb.getConnectorByName(
"test"),0)
   265     conn = self.
_opb.getConnectorByName(
"name")
   266     self.assertEqual(conn.name(),
"name")
   271 if __name__ == 
'__main__':
 
def test_unsubscribeInterfaces(self)
def test_activateInterfaces(self)
def push_back(seq, elem)
Push the new element back to the CORBA sequence. 
def test_createConsumer(self)
The Properties class represents a persistent set of properties. 
def test_getConnectorIds(self)
def test_getConnectorProfileById(self)
def test_getConnectorById(self)
def test_initConsumers(self)
def test_initProviders(self)
def test_connectors(self)
def test_getConnectorProfileByName(self)
def __init__(self, name="name", id="id", ports=None, properties=None)
def test_createProvider(self)
def test_createConnector(self)
def test_getConnectorProfiles(self)
def test_getConnectorByName(self)
def test_subscribeInterfaces(self)
def test_publishInterfaces(self)
def newNV(name, value)
Create NameVale.