Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 import OpenRTM_aist
00020
00021 import sys
00022 sys.path.insert(1,"../")
00023
00024 import unittest
00025
00026 from InPortCorbaConsumer import *
00027 from NVUtil import *
00028
00029 from omniORB import CORBA
00030 import RTC, RTC__POA
00031 import SDOPackage,SDOPackage__POA
00032 from omniORB import any
00033
00034
00035 class InPortTest(RTC__POA.InPortAny):
00036 def __init__(self):
00037 self.orb = CORBA.ORB_init()
00038 self.poa = self.orb.resolve_initial_references("RootPOA")
00039 poaManager = self.poa._get_the_POAManager()
00040 poaManager.activate()
00041
00042
00043 def put(self, data):
00044 print "put data: ", data
00045
00046
00047 class test(OpenRTM_aist.RTObject_impl):
00048 def __init__(self):
00049 self.orb = CORBA.ORB_init()
00050 self.poa = self.orb.resolve_initial_references("RootPOA")
00051 OpenRTM_aist.RTObject_impl.__init__(self, orb=self.orb, poa=self.poa)
00052 poaManager = self.poa._get_the_POAManager()
00053 poaManager.activate()
00054
00055
00056 class TestInPortCorbaConsumer(unittest.TestCase):
00057 def setUp(self):
00058 ringbuf = OpenRTM_aist.RingBuffer(8)
00059 ringbuf.init(RTC.TimedLong(RTC.Time(0,0), 0))
00060 self._ipcc = InPortCorbaConsumer(OpenRTM_aist.OutPort("out",
00061 RTC.TimedLong(RTC.Time(0,0), 0),
00062 ringbuf))
00063
00064 def test_equal_operator(self):
00065 self.assertEqual(self._ipcc.equal_operator(self._ipcc), self._ipcc)
00066
00067 def test_put(self):
00068 self._ipcc.setObject(InPortTest()._this())
00069 self._ipcc.put(RTC.TimedLong(RTC.Time(0,0), 123))
00070
00071 def test_push(self):
00072 self._ipcc.setObject(InPortTest()._this())
00073 self._ipcc.push()
00074
00075 def test_clone(self):
00076 self._ipcc.clone()
00077
00078 def test_subscribeInterface(self):
00079 port = any.to_any(InPortTest()._this())
00080
00081
00082 prop = [newNV("dataport.dataflow_type","Push"),
00083 newNV("dataport.corba_any.inport_ref",port)]
00084 self.assertEqual(self._ipcc.subscribeInterface(prop), True)
00085
00086
00087 def unsubscribeInterface(self, properties):
00088 pass
00089
00090
00091
00092 if __name__ == '__main__':
00093 unittest.main()