Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 import sys
00019 sys.path.insert(1,"../")
00020
00021 import unittest
00022
00023 from InPortPushConnector import *
00024
00025 import RTC, RTC__POA
00026 import OpenRTM_aist
00027 from OpenRTM_aist import *
00028
00029
00030 class InPortProviderMock:
00031 _buffer = None
00032 _prop = None
00033
00034 def init(self, prop):
00035 self._prop = prop
00036
00037 def setBuffer(self, buff):
00038 self._buffer = buff
00039
00040 def setListener(self, info, listener):
00041 pass
00042
00043
00044 class TestInPortPushConnector(unittest.TestCase):
00045 def setUp(self):
00046 OpenRTM_aist.Manager.instance()
00047 self._con = InPortPushConnector(OpenRTM_aist.ConnectorInfo("name","id",[],OpenRTM_aist.Properties()),InPortProviderMock(),OpenRTM_aist.ConnectorListeners())
00048
00049 def test_read(self):
00050 data = []
00051 self.assertEqual(self._con.read(data),OpenRTM_aist.DataPortStatus.BUFFER_TIMEOUT)
00052 data = 123
00053 self.assertEqual(self._con.read(data),OpenRTM_aist.DataPortStatus.BUFFER_TIMEOUT)
00054 return
00055
00056 def test_disconnect(self):
00057 self.assertEqual(self._con.disconnect(),OpenRTM_aist.DataPortStatus.PORT_OK)
00058 return
00059
00060 def test_activate(self):
00061 self._con.activate()
00062 return
00063
00064 def test_deactivate(self):
00065 self._con.deactivate()
00066 return
00067
00068 def test_createBuffer(self):
00069 self._con.createBuffer(OpenRTM_aist.ConnectorInfo("name","id",[],OpenRTM_aist.Properties()))
00070 return
00071
00072
00073
00074
00075
00076 if __name__ == '__main__':
00077 unittest.main()
00078