test_OutPortPushConnector.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- Python -*-
00003 
00004 #
00005 #  \file test_OutPortPushConnector.py
00006 #  \brief test for OutPortPushConnector class
00007 #  \date $Date: 2007/09/19$
00008 #  \author Shinji Kurihara
00009 # 
00010 #  Copyright (C) 2006
00011 #      Noriaki Ando
00012 #      Task-intelligence Research Group,
00013 #      Intelligent Systems Research Institute,
00014 #      National Institute of
00015 #          Advanced Industrial Science and Technology (AIST), Japan
00016 #      All rights reserved.
00017 # 
00018 
00019 import sys
00020 sys.path.insert(1,"../")
00021 
00022 from omniORB import *
00023 from omniORB import any
00024 
00025 import unittest
00026 from OutPortPushConnector import *
00027 
00028 import OpenRTM
00029 import RTC
00030 import OpenRTM_aist
00031 
00032 class MyBuffer:
00033   def __init__(self):
00034     self._data = None
00035     return
00036 
00037   def write(self, data, sec=0, usec=0):
00038     self._data = data
00039     return
00040     
00041   def read(self):
00042     return self._data
00043 
00044   def init(self,info):
00045     return
00046 
00047 class ConsumerMock:
00048   def __init__(self,buff):
00049     self._data = None
00050     self._buff = buff
00051     return
00052 
00053   def init(self,prop):
00054     return
00055 
00056   def write(self, data):
00057     self._data = data
00058     return OpenRTM.PORT_OK
00059 
00060   def put(self, data):
00061     self._buff.write(data)
00062     return OpenRTM.PORT_OK
00063 
00064 class TestOutPortPushConnector(unittest.TestCase):
00065   def setUp(self):
00066     self._buffer = MyBuffer()
00067     self._consumer = ConsumerMock(self._buffer)
00068     self._profile = OpenRTM_aist.ConnectorInfo("test",
00069                                                "id",
00070                                                ["in","out"],
00071                                                OpenRTM_aist.Properties())
00072 
00073     self._oc = OutPortPushConnector(self._profile,self._consumer,OpenRTM_aist.ConnectorListeners(),self._buffer)
00074     return
00075 
00076   def test_write(self):
00077     wdata = RTC.TimedLong(RTC.Time(0,0), 123)
00078     self._oc.write(wdata)
00079     val = self._buffer.read()
00080     rdata = RTC.TimedLong(RTC.Time(0,0), 0)
00081     get_data = cdrUnmarshal(any.to_any(rdata).typecode(),val,1)
00082     self.assertEqual(get_data.data, 123)
00083     return
00084 
00085 
00086   def test_disconnect(self):
00087     self.assertEqual(self._oc.disconnect(), OpenRTM_aist.DataPortStatus.PORT_OK)
00088     return
00089 
00090 
00091   def test_activate(self):
00092     self._oc.activate()
00093     return
00094 
00095   def test_deactivate(self):
00096     self._oc.deactivate()
00097     return
00098 
00099   def test_getBuffer(self):
00100     self.assertEqual(self._oc.getBuffer(),self._buffer)
00101     return
00102 
00103   def test_createPublisher(self):
00104     self._oc.createPublisher(self._profile)
00105     return
00106 
00107   def test_createBuffer(self):
00108     self._oc.createBuffer(self._profile)
00109     return
00110 
00111 ############### test #################
00112 if __name__ == '__main__':
00113         unittest.main()


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Thu Aug 27 2015 14:17:28