Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 from omniORB import any
00019 from omniORB import CORBA
00020
00021 import OpenRTM_aist
00022 import RTC, RTC__POA
00023 import SDOPackage, SDOPackage__POA
00024
00025 import sys
00026 sys.path.insert(1,"../")
00027
00028 import unittest
00029
00030 from OutPortCorbaConsumer import *
00031
00032 class OutPortTest(RTC__POA.OutPortAny):
00033 def __init__(self):
00034 self.orb = CORBA.ORB_init()
00035 self.poa = self.orb.resolve_initial_references("RootPOA")
00036 poaManager = self.poa._get_the_POAManager()
00037 poaManager.activate()
00038
00039
00040 def get(self):
00041 print "Called get operation."
00042 return any.to_any(RTC.TimedLong(RTC.Time(0,0),123))
00043
00044
00045 class TestOutPortCorbaConsumer(unittest.TestCase):
00046
00047 def setUp(self):
00048 ringbuf = OpenRTM_aist.RingBuffer(8)
00049 ringbuf.init(RTC.TimedLong(RTC.Time(0,0),0))
00050 self._opcc = OutPortCorbaConsumer(OpenRTM_aist.InPort("in",
00051 RTC.TimedLong(RTC.Time(0,0),0),
00052 ringbuf))
00053
00054
00055
00056 def test_get(self):
00057 self._opcc.setObject(OutPortTest()._this())
00058 data=[None]
00059 self.assertEqual(self._opcc.get(data),True)
00060 self.assertEqual(data[0].data,123)
00061
00062
00063 def test_pull(self):
00064 self._opcc.pull()
00065
00066
00067 def test_subscribeInterface(self):
00068 port = any.to_any(OutPortTest()._this())
00069 prop = [SDOPackage.NameValue("dataport.dataflow_type","Push"),
00070 SDOPackage.NameValue("dataport.corba_any.outport_ref",port)]
00071 self.assertEqual(self._opcc.subscribeInterface(prop), True)
00072 self._opcc.unsubscribeInterface(prop)
00073
00074
00075
00076 if __name__ == '__main__':
00077 unittest.main()