Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 import sys
00020 sys.path.insert(1,"../")
00021
00022 from omniORB import *
00023 from omniORB import any
00024
00025 import unittest
00026 from OutPortPullConnector import *
00027
00028 import RTC, RTC__POA
00029
00030 import OpenRTM_aist
00031
00032 class MyBuffer:
00033 def __init__(self):
00034 self._data = None
00035 return
00036
00037 def write(self, data):
00038 self._data = data
00039 return
00040
00041 def read(self):
00042 return self._data
00043
00044 def init(self,info):
00045 pass
00046
00047 class OutPortProviderMock:
00048 _buffer = None
00049 _prop = None
00050
00051 def init(self, prop):
00052 self._prop = prop
00053
00054 def setBuffer(self, buff):
00055 self._buffer = buff
00056
00057 def setListener(self, info, listener):
00058 pass
00059
00060 def setConnector(self, con):
00061 pass
00062
00063
00064 class TestOutPortPullConnector(unittest.TestCase):
00065 def setUp(self):
00066 self._buffer = MyBuffer()
00067 self._profile = OpenRTM_aist.ConnectorInfo("test",
00068 "id",
00069 ["in","out"],
00070 OpenRTM_aist.Properties())
00071
00072 self._oc = OutPortPullConnector(self._profile,OutPortProviderMock(),OpenRTM_aist.ConnectorListeners(), self._buffer)
00073 return
00074
00075 def test_write(self):
00076 self._oc.write(123)
00077
00078 return
00079
00080
00081 def test_disconnect(self):
00082 self.assertEqual(self._oc.disconnect(), OpenRTM_aist.DataPortStatus.PORT_OK)
00083 return
00084
00085
00086 def test_getBuffer(self):
00087 self.assertEqual(self._oc.getBuffer(),self._buffer)
00088 return
00089
00090
00091
00092 if __name__ == '__main__':
00093 unittest.main()