test_InPort.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- Python -*-
00003 
00004 
00005 #  \file test_InPort.py
00006 #  \brief test for InPort template class
00007 #  \date $Date: 2007/09/20 $
00008 #  \author Shinji Kurihara
00009 # 
00010 #  Copyright (C) 2003-2005
00011 #      Task-intelligence Research Group,
00012 #      Intelligent Systems Research Institute,
00013 #      National Institute of
00014 #          Advanced Industrial Science and Technology (AIST), Japan
00015 #      All rights reserved.
00016  
00017 
00018 import sys
00019 sys.path.insert(1,"../")
00020 
00021 import unittest
00022 
00023 from InPort import *
00024 
00025 import RTC, RTC__POA
00026 import OpenRTM_aist
00027 
00028 ShareCount = 0
00029 
00030 class OnRWTest:
00031   def __init__(self):
00032     pass
00033 
00034   def echo(self, value=None):
00035     print "OnRW Called"
00036 
00037 
00038 class OnRWConvertTest:
00039   def __init__(self):
00040     pass
00041 
00042   def echo(self, value=None):
00043     print "OnRWConvert Called"
00044     return value
00045 
00046 
00047 class BufferMock:
00048   def __init__(self):
00049     self._ShareCount = 0
00050     self._data = None
00051     return
00052 
00053   def write(self, data):
00054     global ShareCount
00055     ShareCount += 1
00056     self._ShareCount = ShareCount
00057     self._data = data
00058     return OpenRTM_aist.BufferStatus.BUFFER_OK
00059 
00060   def read(self, value):
00061     global ShareCount
00062     ShareCount -= 1
00063     if ShareCount < 0:
00064       ShareCount = 0
00065     self._ShareCount = ShareCount
00066     if len(value) > 0:
00067       value[0] = self._data
00068     else:
00069       value.append(self._data)
00070     return OpenRTM_aist.BufferStatus.BUFFER_OK
00071 
00072   def readable(self):
00073     return self._ShareCount
00074 
00075 
00076 class ConnectorMock:
00077   def __init__(self, buffer):
00078     self._buffer = buffer
00079     return
00080 
00081   def write(self, data):
00082     self._buffer.write(data)
00083     return OpenRTM_aist.BufferStatus.BUFFER_OK
00084 
00085   def read(self, data):
00086     self._buffer.read(data)
00087     return OpenRTM_aist.DataPortStatus.PORT_OK
00088 
00089   def getBuffer(self):
00090     ret = self._buffer.readable()
00091     return self._buffer
00092 
00093 
00094 class TestInPort(unittest.TestCase):
00095   def setUp(self):
00096     OpenRTM_aist.Manager.instance()
00097     self._buffer = BufferMock()
00098     self._ipn = InPort("in", RTC.TimedLong(RTC.Time(0,0), 0), self._buffer)
00099     self._connector = ConnectorMock(self._buffer)
00100     self._ipn._connectors = [self._connector]
00101 
00102   def tearDown(self):
00103     OpenRTM_aist.Manager.instance().shutdownManager()
00104     return
00105 
00106   def test_name(self):
00107     self.assertEqual(self._ipn.name(), "in")
00108 
00109   def test_read(self):
00110     self.assertEqual(self._ipn.isEmpty(), True)
00111     self.assertEqual(self._ipn.isNew(), False)
00112     self._connector.write(RTC.TimedLong(RTC.Time(0,0), 123))
00113     self.assertEqual(self._ipn.isEmpty(), False)
00114     self.assertEqual(self._ipn.isNew(), True)
00115     read_data = self._ipn.read()
00116     self.assertEqual(self._ipn.isEmpty(), True)
00117     self.assertEqual(self._ipn.isNew(), False)
00118     self.assertEqual(read_data.data, 123)
00119     self._ipn.update()
00120     return
00121 
00122   def test_OnRead(self):
00123     self._connector.write(RTC.TimedLong(RTC.Time(0,0), 456))
00124     self._ipn.setOnRead(OnRWTest().echo)
00125     self._ipn.setOnReadConvert(OnRWConvertTest().echo)
00126     read_data = self._ipn.read()
00127     self.assertEqual(read_data.data, 456)
00128     return
00129 
00130 
00131 ############### test #################
00132 if __name__ == '__main__':
00133   unittest.main()
00134 


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Thu Aug 27 2015 14:17:28