Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 import sys
00019 sys.path.insert(1,"../")
00020
00021 import unittest
00022
00023 from InPort import *
00024
00025 import RTC, RTC__POA
00026 import OpenRTM_aist
00027
00028 ShareCount = 0
00029
00030 class OnRWTest:
00031 def __init__(self):
00032 pass
00033
00034 def echo(self, value=None):
00035 print "OnRW Called"
00036
00037
00038 class OnRWConvertTest:
00039 def __init__(self):
00040 pass
00041
00042 def echo(self, value=None):
00043 print "OnRWConvert Called"
00044 return value
00045
00046
00047 class BufferMock:
00048 def __init__(self):
00049 self._ShareCount = 0
00050 self._data = None
00051 return
00052
00053 def write(self, data):
00054 global ShareCount
00055 ShareCount += 1
00056 self._ShareCount = ShareCount
00057 self._data = data
00058 return OpenRTM_aist.BufferStatus.BUFFER_OK
00059
00060 def read(self, value):
00061 global ShareCount
00062 ShareCount -= 1
00063 if ShareCount < 0:
00064 ShareCount = 0
00065 self._ShareCount = ShareCount
00066 if len(value) > 0:
00067 value[0] = self._data
00068 else:
00069 value.append(self._data)
00070 return OpenRTM_aist.BufferStatus.BUFFER_OK
00071
00072 def readable(self):
00073 return self._ShareCount
00074
00075
00076 class ConnectorMock:
00077 def __init__(self, buffer):
00078 self._buffer = buffer
00079 return
00080
00081 def write(self, data):
00082 self._buffer.write(data)
00083 return OpenRTM_aist.BufferStatus.BUFFER_OK
00084
00085 def read(self, data):
00086 self._buffer.read(data)
00087 return OpenRTM_aist.DataPortStatus.PORT_OK
00088
00089 def getBuffer(self):
00090 ret = self._buffer.readable()
00091 return self._buffer
00092
00093
00094 class TestInPort(unittest.TestCase):
00095 def setUp(self):
00096 OpenRTM_aist.Manager.instance()
00097 self._buffer = BufferMock()
00098 self._ipn = InPort("in", RTC.TimedLong(RTC.Time(0,0), 0), self._buffer)
00099 self._connector = ConnectorMock(self._buffer)
00100 self._ipn._connectors = [self._connector]
00101
00102 def tearDown(self):
00103 OpenRTM_aist.Manager.instance().shutdownManager()
00104 return
00105
00106 def test_name(self):
00107 self.assertEqual(self._ipn.name(), "in")
00108
00109 def test_read(self):
00110 self.assertEqual(self._ipn.isEmpty(), True)
00111 self.assertEqual(self._ipn.isNew(), False)
00112 self._connector.write(RTC.TimedLong(RTC.Time(0,0), 123))
00113 self.assertEqual(self._ipn.isEmpty(), False)
00114 self.assertEqual(self._ipn.isNew(), True)
00115 read_data = self._ipn.read()
00116 self.assertEqual(self._ipn.isEmpty(), True)
00117 self.assertEqual(self._ipn.isNew(), False)
00118 self.assertEqual(read_data.data, 123)
00119 self._ipn.update()
00120 return
00121
00122 def test_OnRead(self):
00123 self._connector.write(RTC.TimedLong(RTC.Time(0,0), 456))
00124 self._ipn.setOnRead(OnRWTest().echo)
00125 self._ipn.setOnReadConvert(OnRWConvertTest().echo)
00126 read_data = self._ipn.read()
00127 self.assertEqual(read_data.data, 456)
00128 return
00129
00130
00131
00132 if __name__ == '__main__':
00133 unittest.main()
00134