Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 from omniORB import *
00019 from omniORB import any
00020
00021 import sys
00022 sys.path.insert(1,"../")
00023
00024 import unittest
00025
00026 from InPortCorbaCdrProvider import *
00027
00028 import RTC, RTC__POA
00029 import OpenRTM
00030 import OpenRTM_aist
00031
00032
00033 class BufferMock:
00034 def __init__(self):
00035 self._data = None
00036 return
00037
00038 def write(self, data):
00039 self._data = data
00040 return OpenRTM_aist.BufferStatus.BUFFER_OK
00041
00042 def read(self, value):
00043 if len(value) > 0:
00044 value[0] = self._data
00045 else:
00046 value.append(self._data)
00047 return OpenRTM_aist.BufferStatus.BUFFER_OK
00048
00049 def full(self):
00050 return False
00051
00052
00053 class ConnectorMock:
00054 def __init__(self, buffer):
00055 self._buffer = buffer
00056 return
00057
00058 def write(self, data):
00059 self._buffer.write(data)
00060 return OpenRTM_aist.BufferStatus.BUFFER_OK
00061
00062
00063
00064 class TestInPortCorbaCdrProvider(unittest.TestCase):
00065 def setUp(self):
00066 InPortCorbaCdrProviderInit()
00067 OpenRTM_aist.CdrRingBufferInit()
00068 self._prov = OpenRTM_aist.InPortProviderFactory.instance().createObject("corba_cdr")
00069 self._inp = OpenRTM_aist.InPort("in",RTC.TimedLong(RTC.Time(0,0),0))
00070 self._orb = OpenRTM_aist.Manager.instance().getORB()
00071 self._buffer = BufferMock()
00072 return
00073
00074 def test_init(self):
00075 self._prov.init(OpenRTM_aist.Properties())
00076 return
00077
00078 def test_setBuffer(self):
00079 self._prov.setBuffer(self._buffer)
00080 return
00081
00082 def test_put(self):
00083 self._con = ConnectorMock(self._buffer)
00084 self._prov._connector = self._con
00085 self._prov.setBuffer(self._buffer)
00086 data = RTC.TimedLong(RTC.Time(0,0),123)
00087 cdr = cdrMarshal(any.to_any(data).typecode(), data, 1)
00088 self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
00089 self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
00090 self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
00091 self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
00092 self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
00093 self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
00094 self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
00095 self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
00096 self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
00097 self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
00098 self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
00099 val = []
00100 self.assertEqual(self._buffer.read(val), OpenRTM_aist.BufferStatus.BUFFER_OK)
00101 get_data = cdrUnmarshal(any.to_any(data).typecode(), val[0], 1)
00102 self.assertEqual(get_data.data, 123)
00103 return
00104
00105
00106
00107
00108 if __name__ == '__main__':
00109 unittest.main()
00110