Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 from omniORB import any
00019 from omniORB import CORBA
00020
00021 import OpenRTM_aist
00022 import RTC, RTC__POA
00023 import SDOPackage, SDOPackage__POA
00024
00025 import sys
00026 sys.path.insert(1,"../")
00027
00028 import unittest
00029
00030 from OutPortCorbaCdrConsumer import *
00031
00032 class DummyBuffer:
00033 def empty():
00034 return False
00035
00036 def read(cdr):
00037 cdr[0] = 123
00038 return 0
00039
00040 class TestOutPortCorbaCdrConsumer(unittest.TestCase):
00041
00042 def setUp(self):
00043 OpenRTM_aist.Manager.instance()
00044 OpenRTM_aist.OutPortCorbaCdrProviderInit()
00045 self._opp = OpenRTM_aist.OutPortCorbaCdrProvider()
00046 self._opp.setBuffer(DummyBuffer())
00047 self._opcc = OutPortCorbaCdrConsumer()
00048 return
00049
00050 def test_setBuffer(self):
00051 self._opcc.setBuffer(DummyBuffer())
00052 return
00053
00054 def COMMENTtest_get(self):
00055 data = [None]
00056 self.assertEqual(self._opcc.subscribeInterface(self._opp._properties),True)
00057 self.assertEqual(self._opcc.get(data),OpenRTM_aist.DataPortStatus.PORT_OK)
00058 self.assertEqual(data[0],123)
00059 return
00060
00061 def test_subscribeInterface(self):
00062 self.assertEqual(self._opcc.subscribeInterface(self._opp._properties),True)
00063 return
00064
00065 def test_unsubscribeInterface(self):
00066 self._opcc.unsubscribeInterface(self._opp._properties)
00067 return
00068
00069
00070
00071 if __name__ == '__main__':
00072 unittest.main()