test_InPortCorbaCdrProvider.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- Python -*-
00003 
00004 
00005 #  \file test_InPortCorbaCdrProvider.py
00006 #  \brief test for InPortCorbaCdrProvider class
00007 #  \date $Date: 2007/09/20 $
00008 #  \author Shinji Kurihara
00009 # 
00010 #  Copyright (C) 2003-2005
00011 #      Task-intelligence Research Group,
00012 #      Intelligent Systems Research Institute,
00013 #      National Institute of
00014 #          Advanced Industrial Science and Technology (AIST), Japan
00015 #      All rights reserved.
00016  
00017 
00018 from omniORB import *
00019 from omniORB import any
00020 
00021 import sys
00022 sys.path.insert(1,"../")
00023 
00024 import unittest
00025 
00026 from InPortCorbaCdrProvider import *
00027 
00028 import RTC, RTC__POA
00029 import OpenRTM
00030 import OpenRTM_aist
00031 
00032 
00033 class BufferMock:
00034         def __init__(self):
00035                 self._data = None
00036                 return
00037 
00038         def write(self, data):
00039                 self._data = data
00040                 return OpenRTM_aist.BufferStatus.BUFFER_OK
00041 
00042         def read(self, value):
00043                 if len(value) > 0:
00044                         value[0] = self._data
00045                 else:
00046                         value.append(self._data)
00047                 return OpenRTM_aist.BufferStatus.BUFFER_OK
00048 
00049         def full(self):
00050                 return False
00051 
00052 
00053 class ConnectorMock:
00054         def __init__(self, buffer):
00055                 self._buffer = buffer
00056                 return
00057 
00058         def write(self, data):
00059                 self._buffer.write(data)
00060                 return OpenRTM_aist.BufferStatus.BUFFER_OK
00061 
00062 
00063 
00064 class TestInPortCorbaCdrProvider(unittest.TestCase):
00065         def setUp(self):
00066                 InPortCorbaCdrProviderInit()
00067                 OpenRTM_aist.CdrRingBufferInit()
00068                 self._prov = OpenRTM_aist.InPortProviderFactory.instance().createObject("corba_cdr")
00069                 self._inp  = OpenRTM_aist.InPort("in",RTC.TimedLong(RTC.Time(0,0),0))
00070                 self._orb  = OpenRTM_aist.Manager.instance().getORB()
00071                 self._buffer = BufferMock()
00072                 return
00073         
00074         def test_init(self):
00075                 self._prov.init(OpenRTM_aist.Properties())
00076                 return
00077 
00078         def test_setBuffer(self):
00079                 self._prov.setBuffer(self._buffer)
00080                 return
00081 
00082         def test_put(self):
00083                 self._con = ConnectorMock(self._buffer)
00084                 self._prov._connector = self._con
00085                 self._prov.setBuffer(self._buffer)
00086                 data = RTC.TimedLong(RTC.Time(0,0),123)
00087                 cdr = cdrMarshal(any.to_any(data).typecode(), data, 1)
00088                 self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
00089                 self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
00090                 self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
00091                 self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
00092                 self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
00093                 self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
00094                 self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
00095                 self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
00096                 self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
00097                 self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
00098                 self.assertEqual(self._prov.put(cdr),OpenRTM.PORT_OK)
00099                 val = []
00100                 self.assertEqual(self._buffer.read(val), OpenRTM_aist.BufferStatus.BUFFER_OK)
00101                 get_data = cdrUnmarshal(any.to_any(data).typecode(), val[0], 1)
00102                 self.assertEqual(get_data.data, 123)
00103                 return
00104 
00105 
00106 
00107 ############### test #################
00108 if __name__ == '__main__':
00109         unittest.main()
00110 


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Thu Aug 27 2015 14:17:28