Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 import OpenRTM_aist
00020 import RTC, RTC__POA
00021
00022 import CORBA
00023 import sys
00024 sys.path.insert(1,"../")
00025
00026 import unittest
00027
00028 from DataOutPort import *
00029
00030 class TestObj(OpenRTM_aist.RTObject_impl):
00031 def __init__(self):
00032 self._orb = CORBA.ORB_init()
00033 self._poa = self._orb.resolve_initial_references("RootPOA")
00034 OpenRTM_aist.RTObject_impl.__init__(self, orb=self._orb, poa=self._poa)
00035
00036 class TestDataOutPort(unittest.TestCase):
00037 def setUp(self):
00038 self._orb = CORBA.ORB_init()
00039 self._poa = self._orb.resolve_initial_references("RootPOA")
00040 outport = OpenRTM_aist.OutPort("out", RTC.TimedLong(RTC.Time(0,0),0),
00041 OpenRTM_aist.RingBuffer(8))
00042 self._dop = DataOutPort("out", outport, None)
00043
00044
00045 def test_case(self):
00046 nvlist = [OpenRTM_aist.NVUtil.newNV("dataport.interface_type","CORBA_Any"),
00047 OpenRTM_aist.NVUtil.newNV("dataport.dataflow_type","Push"),
00048 OpenRTM_aist.NVUtil.newNV("dataport.subscription_type","Flush")]
00049
00050 prof = RTC.ConnectorProfile("connector0","",[TestObj(),TestObj()],nvlist)
00051
00052 self.assertEqual(self._dop.publishInterfaces(prof), RTC.RTC_OK)
00053 self.assertEqual(self._dop.subscribeInterfaces(prof), RTC.RTC_OK)
00054 self._dop.unsubscribeInterfaces(prof)
00055
00056
00057
00058
00059 if __name__ == '__main__':
00060 unittest.main()