test_PublisherFlush.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- Python -*-
00003 
00004 #
00005 #  \file  test_PublisherFlush.py
00006 #  \brief test for PublisherFlush class
00007 #  \date  $Date: 2007/09/27 $
00008 #  \author Shinji Kurihara
00009 # 
00010 #  Copyright (C) 2006
00011 #      Noriaki Ando
00012 #      Task-intelligence Research Group,
00013 #      Intelligent Systems Research Institute,
00014 #      National Institute of
00015 #          Advanced Industrial Science and Technology (AIST), Japan
00016 #      All rights reserved.
00017  
00018 import sys
00019 sys.path.insert(1,"../")
00020 
00021 import unittest
00022 
00023 from PublisherFlush import *
00024 import OpenRTM_aist
00025 
00026 class Consumer:
00027   def put(self,data):
00028     return data
00029 
00030 class TestPublisherNew(unittest.TestCase):
00031 
00032   def setUp(self):
00033     self._pf = PublisherFlush()
00034     return
00035 
00036   def test_init(self):
00037     self.assertEqual(self._pf.init(None),OpenRTM_aist.DataPortStatus.PORT_OK)
00038     return
00039 
00040   def test_setConsumer(self):
00041     self.assertEqual(self._pf.setConsumer(None),OpenRTM_aist.DataPortStatus.INVALID_ARGS)
00042     self.assertEqual(self._pf.init(Consumer()),OpenRTM_aist.DataPortStatus.PORT_OK)
00043     return
00044 
00045   def test_setBuffer(self):
00046     self.assertEqual(self._pf.setBuffer(None),OpenRTM_aist.DataPortStatus.PORT_OK)
00047     return
00048 
00049   def test_write(self):
00050     prop = OpenRTM_aist.Properties()
00051     cinfo = OpenRTM_aist.ConnectorInfo("",
00052                                        "",
00053                                        [],
00054                                        prop)
00055     self.assertEqual(self._pf.write(123,0,0),OpenRTM_aist.DataPortStatus.PRECONDITION_NOT_MET)
00056     self.assertEqual(self._pf.setConsumer(OpenRTM_aist.InPortCorbaCdrConsumer()),
00057                      OpenRTM_aist.DataPortStatus.PORT_OK)
00058     self.assertEqual(self._pf.setListener(cinfo,OpenRTM_aist.ConnectorListeners()),
00059                      OpenRTM_aist.DataPortStatus.PORT_OK)
00060     self.assertEqual(self._pf.write(123,0,0),OpenRTM_aist.DataPortStatus.CONNECTION_LOST)
00061     return
00062 
00063   def test_activate_deactivate_isActive(self):
00064     self.assertEqual(self._pf.isActive(),False)
00065     self.assertEqual(self._pf.activate(),OpenRTM_aist.DataPortStatus.PORT_OK)
00066     self.assertEqual(self._pf.isActive(),True)
00067     self.assertEqual(self._pf.activate(),OpenRTM_aist.DataPortStatus.PRECONDITION_NOT_MET)
00068     self.assertEqual(self._pf.deactivate(),OpenRTM_aist.DataPortStatus.PORT_OK)
00069     self.assertEqual(self._pf.isActive(),False)
00070     self.assertEqual(self._pf.deactivate(),OpenRTM_aist.DataPortStatus.PRECONDITION_NOT_MET)
00071     return
00072 
00073 
00074 ############### test #################
00075 if __name__ == '__main__':
00076   unittest.main()


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Thu Aug 27 2015 14:17:28