Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 from omniORB import any
00019 from omniORB import CORBA
00020
00021 import OpenRTM_aist
00022 import RTC, RTC__POA
00023 import SDOPackage, SDOPackage__POA
00024
00025 import sys
00026 sys.path.insert(1,"../")
00027
00028 import unittest
00029
00030 from OutPortCorbaCdrProvider import *
00031
00032 class DummyBuffer:
00033 def __init__(self):
00034 self._cdr = None
00035 self._empty = True
00036
00037 def empty(self):
00038 return self._empty
00039
00040 def write(self,d):
00041 self._cdr = d
00042 self._empty = False
00043 return 0
00044
00045 def read(self,cdr):
00046 cdr[0] = self._cdr
00047 self._empty = True
00048 return 0
00049
00050 class TestOutPortCorbaCdrProvider(unittest.TestCase):
00051
00052 def setUp(self):
00053 OpenRTM_aist.Manager.instance()
00054 OpenRTM_aist.OutPortCorbaCdrProviderInit()
00055 self._opp = OpenRTM_aist.OutPortCorbaCdrProvider()
00056 return
00057
00058 def test_setBuffer(self):
00059 self._opp.setBuffer(DummyBuffer())
00060 return
00061
00062 def test_get(self):
00063 ret,data=self._opp.get()
00064 self.assertEqual(ret,OpenRTM.UNKNOWN_ERROR)
00065
00066 prop = OpenRTM_aist.Properties()
00067 cinfo = OpenRTM_aist.ConnectorInfo("",
00068 "",
00069 [],
00070 prop)
00071 self._opp.setListener(cinfo,OpenRTM_aist.ConnectorListeners())
00072 buff = DummyBuffer()
00073 self._opp.setBuffer(buff)
00074 ret,data=self._opp.get()
00075 self.assertEqual(ret,OpenRTM.BUFFER_EMPTY)
00076
00077 buff.write(123)
00078 ret,data=self._opp.get()
00079 self.assertEqual(data,123)
00080 return
00081
00082
00083
00084 if __name__ == '__main__':
00085 unittest.main()