test_PublisherNew.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- coding: euc-jp -*-
00003 
00004 #
00005 #  \file  test_PublisherNew.py
00006 #  \brief test for PublisherNew class
00007 #  \date  $Date: 2007/09/27 $
00008 #  \author Shinji Kurihara
00009 # 
00010 #  Copyright (C) 2006
00011 #      Noriaki Ando
00012 #      Task-intelligence Research Group,
00013 #      Intelligent Systems Research Institute,
00014 #      National Institute of
00015 #          Advanced Industrial Science and Technology (AIST), Japan
00016 #      All rights reserved.
00017  
00018 import sys,time
00019 sys.path.insert(1,"../")
00020 
00021 import unittest
00022 
00023 import OpenRTM_aist
00024 from PublisherNew import *
00025 
00026 class ConsumerMock(OpenRTM_aist.InPortCorbaCdrConsumer):
00027   def __init__(self):
00028     buff = OpenRTM_aist.CdrRingBuffer()
00029     prop = OpenRTM_aist.Properties()
00030     prop.setProperty("write.full_policy","do_nothing")
00031     buff.init(prop)
00032     self._buffer = buff
00033 
00034   def __del__(self):
00035     pass
00036 
00037   def convertReturnCode(self, ret):
00038     if ret == OpenRTM_aist.BufferStatus.BUFFER_OK:
00039       return self.PORT_OK
00040 
00041     elif ret == OpenRTM_aist.BufferStatus.BUFFER_ERROR:
00042       return self.PORT_ERROR
00043 
00044     elif ret == OpenRTM_aist.BufferStatus.BUFFER_FULL:
00045       return self.SEND_FULL
00046 
00047     elif ret == OpenRTM_aist.BufferStatus.TIMEOUT:
00048       return self.SEND_TIMEOUT
00049 
00050     elif ret == OpenRTM_aist.BufferStatus.NOT_SUPPORTED:
00051       return self.UNKNOWN_ERROR
00052 
00053     else:
00054       return self.UNKNOWN_ERROR
00055 
00056   def put(self,data):
00057     ret = self.convertReturnCode(self._buffer.write(data))
00058     return ret
00059     """
00060     if self._buffer.full():
00061       return OpenRTM_aist.DataPortStatus.BUFFER_FULL
00062 
00063     ret = self._buffer.write(data)
00064     if ret == OpenRTM_aist.BufferStatus.BUFFER_OK:
00065       return OpenRTM_aist.DataPortStatus.PORT_OK
00066     elif ret == OpenRTM_aist.BufferStatus.BUFFER_ERROR:
00067       return OpenRTM_aist.DataPortStatus.PORT_ERROR
00068     elif ret == OpenRTM_aist.BufferStatus.BUFFER_FULL:
00069       return OpenRTM_aist.DataPortStatus.BUFFER_FULL
00070     elif ret == OpenRTM_aist.BufferStatus.BUFFER_EMPTY:
00071       return OpenRTM_aist.DataPortStatus.BUFFER_EMPTY
00072     elif ret == OpenRTM_aist.BufferStatus.TIMEOUT:
00073       return OpenRTM_aist.DataPortStatus.BUFFER_TIMEOUT
00074     else:
00075       return OpenRTM_aist.DataPortStatus.UNKNOWN_ERROR
00076     """
00077 
00078   def get_m_put_data(self):
00079     cdr = [0]
00080     self._buffer.read(cdr)
00081     return cdr[0]
00082 
00083   def get_m_put_data_len(self):
00084     ic = self._buffer.readable()
00085     return ic
00086 
00087 
00088 
00089 class TestPublisherNew(unittest.TestCase):
00090 
00091   def setUp(self):
00092     time.sleep(0.1)
00093     return
00094 
00095   def tearDown(self):
00096     OpenRTM_aist.Manager.instance().shutdownManager()
00097     return
00098 
00099   def test_init(self):
00100     _pn = PublisherNew()
00101     prop = OpenRTM_aist.Properties()
00102     # Propertiesが空の状態でも正常に動作することを確認する
00103     ret = _pn.init(prop)
00104     self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, ret)
00105     _pn.__del__()
00106     
00107     _pn = PublisherNew()
00108     prop.setProperty("publisher.push_policy","new")
00109     prop.setProperty("thread_type","bar")
00110     prop.setProperty("measurement.exec_time","default")
00111     prop.setProperty("measurement.period_count","1")
00112     
00113     #thread_type が不正の場合 INVALID_ARGS を返すことを確認する。
00114     ret = _pn.init(prop)
00115     self.assertEqual(OpenRTM_aist.DataPortStatus.INVALID_ARGS, ret)
00116     _pn.__del__()
00117     
00118     _pn = PublisherNew()
00119     #以下のpropertiesの設定で動作することを確認する。
00120     prop.setProperty("publisher.push_policy","all")
00121     prop.setProperty("publisher.skip_count","0")
00122     prop.setProperty("thread_type","default")
00123     prop.setProperty("measurement.exec_time","enable")
00124     prop.setProperty("measurement.exec_count","0")
00125     prop.setProperty("measurement.period_time","enable")
00126     prop.setProperty("measurement.period_count","0")
00127     retcode = _pn.init(prop)
00128     self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
00129     _pn.__del__()
00130     
00131     _pn = PublisherNew()
00132     prop.setProperty("publisher.push_policy","fifo")
00133     prop.setProperty("publisher.skip_count","1")
00134     prop.setProperty("thread_type","default")
00135     prop.setProperty("measurement.exec_time","disable")
00136     prop.setProperty("measurement.exec_count","1")
00137     prop.setProperty("measurement.period_time","disable")
00138     prop.setProperty("measurement.period_count","0")
00139     retcode = _pn.init(prop)
00140     self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
00141     _pn.__del__()
00142     
00143     _pn = PublisherNew()
00144     prop.setProperty("publisher.push_policy","fifo")
00145     prop.setProperty("publisher.skip_count","1")
00146     prop.setProperty("thread_type","default")
00147     prop.setProperty("measurement.exec_time","disable")
00148     prop.setProperty("measurement.exec_count","1")
00149     prop.setProperty("measurement.period_time","disable")
00150     prop.setProperty("measurement.period_count","1")
00151     retcode = _pn.init(prop)
00152     self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
00153     _pn.__del__()
00154     
00155     _pn = PublisherNew()
00156     prop.setProperty("publisher.push_policy","skip")
00157     prop.setProperty("publisher.skip_count","-1")
00158     prop.setProperty("thread_type","default")
00159     prop.setProperty("measurement.exec_time","bar")
00160     prop.setProperty("measurement.exec_count","-1")
00161     prop.setProperty("measurement.period_time","bar")
00162     prop.setProperty("measurement.period_count","-1")
00163     retcode = _pn.init(prop)
00164     self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
00165     _pn.__del__()
00166     
00167     _pn = PublisherNew()
00168     prop.setProperty("publisher.push_policy","new")
00169     prop.setProperty("publisher.skip_count","1")
00170     prop.setProperty("thread_type","default")
00171     prop.setProperty("measurement.exec_time","enable")
00172     prop.setProperty("measurement.exec_count","1")
00173     prop.setProperty("measurement.period_time","enable")
00174     prop.setProperty("measurement.period_count","1")
00175     retcode = _pn.init(prop)
00176     self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
00177     _pn.__del__()
00178     
00179     _pn = PublisherNew()
00180     prop.setProperty("publisher.push_policy","bar")
00181     prop.setProperty("publisher.skip_count","0")
00182     prop.setProperty("thread_type","default")
00183     prop.setProperty("measurement.exec_time","enable")
00184     prop.setProperty("measurement.exec_count","0")
00185     prop.setProperty("measurement.period_time","enable")
00186     prop.setProperty("measurement.period_count","0")
00187     retcode = _pn.init(prop)
00188     self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
00189     _pn.__del__()
00190     return
00191 
00192   def test_setConsumer(self):
00193     _pn = PublisherNew()
00194     self.assertEqual(_pn.setConsumer(None),OpenRTM_aist.DataPortStatus.INVALID_ARGS)
00195     self.assertEqual(_pn.setConsumer(ConsumerMock()),OpenRTM_aist.DataPortStatus.PORT_OK)
00196     _pn.__del__()
00197     return
00198 
00199   def test_setBuffer(self):
00200     _pn = PublisherNew()
00201     self.assertEqual(_pn.setBuffer(None),OpenRTM_aist.DataPortStatus.INVALID_ARGS)
00202     self.assertEqual(_pn.setBuffer(OpenRTM_aist.RingBuffer()),OpenRTM_aist.DataPortStatus.PORT_OK)
00203     _pn.__del__()
00204     return
00205 
00206   def test_write(self):
00207     _pn = PublisherNew()
00208     prop = OpenRTM_aist.Properties()
00209     retcode = _pn.init(prop)
00210     _pn.setBuffer(OpenRTM_aist.RingBuffer())
00211     #self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00212     _pn.__del__()
00213     return
00214 
00215   def test_activate_deactivate_isActive(self):
00216     _pn = PublisherNew()
00217     self.assertEqual(_pn.isActive(),False)
00218     self.assertEqual(_pn.activate(),OpenRTM_aist.DataPortStatus.PORT_OK)
00219     self.assertEqual(_pn.isActive(),True)
00220     self.assertEqual(_pn.deactivate(),OpenRTM_aist.DataPortStatus.PORT_OK)
00221     self.assertEqual(_pn.isActive(),False)
00222     _pn.__del__()
00223     return
00224 
00225   def test_pushAll(self):
00226     _pn = PublisherNew()
00227     prop = OpenRTM_aist.Properties()
00228     cinfo = OpenRTM_aist.ConnectorInfo("",
00229                                        "",
00230                                        [],
00231                                        prop)
00232     self.assertEqual(_pn.setListener(cinfo,OpenRTM_aist.ConnectorListeners()),
00233                      OpenRTM_aist.DataPortStatus.PORT_OK)
00234     prop = OpenRTM_aist.Properties()
00235     prop.setProperty("publisher.push_policy","all")
00236     prop.setProperty("publisher.skip_count","0")
00237     prop.setProperty("thread_type","default")
00238     prop.setProperty("measurement.exec_time","enable")
00239     prop.setProperty("measurement.exec_count","0")
00240     prop.setProperty("measurement.period_time","enable")
00241     prop.setProperty("measurement.period_count","0")
00242     retcode = _pn.init(prop)
00243     cons = ConsumerMock()
00244     self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
00245     _pn.setBuffer(OpenRTM_aist.CdrRingBuffer())
00246     _pn.activate()
00247 
00248     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00249     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00250     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00251     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00252     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00253     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00254     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00255     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00256     _pn.deactivate()
00257     _pn.__del__()
00258     return
00259 
00260   def test_pushFifo(self):
00261     _pn = PublisherNew()
00262     prop = OpenRTM_aist.Properties()
00263     cinfo = OpenRTM_aist.ConnectorInfo("",
00264                                        "",
00265                                        [],
00266                                        prop)
00267     self.assertEqual(_pn.setListener(cinfo,OpenRTM_aist.ConnectorListeners()),
00268                      OpenRTM_aist.DataPortStatus.PORT_OK)
00269     prop = OpenRTM_aist.Properties()
00270     prop.setProperty("publisher.push_policy","fifo")
00271     prop.setProperty("publisher.skip_count","0")
00272     prop.setProperty("thread_type","default")
00273     prop.setProperty("measurement.exec_time","enable")
00274     prop.setProperty("measurement.exec_count","0")
00275     prop.setProperty("measurement.period_time","enable")
00276     prop.setProperty("measurement.period_count","0")
00277     retcode = _pn.init(prop)
00278     cons = ConsumerMock()
00279     self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
00280     _pn.setBuffer(OpenRTM_aist.CdrRingBuffer())
00281     _pn.activate()
00282 
00283     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00284     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00285     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00286     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00287     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00288     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00289     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00290     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00291     _pn.deactivate()
00292     _pn.__del__()
00293     return
00294 
00295 
00296   def test_pushSkip(self):
00297     _pn = PublisherNew()
00298     prop = OpenRTM_aist.Properties()
00299     cinfo = OpenRTM_aist.ConnectorInfo("",
00300                                        "",
00301                                        [],
00302                                        prop)
00303     self.assertEqual(_pn.setListener(cinfo,OpenRTM_aist.ConnectorListeners()),
00304                      OpenRTM_aist.DataPortStatus.PORT_OK)
00305     prop = OpenRTM_aist.Properties()
00306     prop.setProperty("publisher.push_policy","skip")
00307     prop.setProperty("publisher.skip_count","0")
00308     prop.setProperty("thread_type","default")
00309     prop.setProperty("measurement.exec_time","enable")
00310     prop.setProperty("measurement.exec_count","0")
00311     prop.setProperty("measurement.period_time","enable")
00312     prop.setProperty("measurement.period_count","0")
00313     retcode = _pn.init(prop)
00314     cons = ConsumerMock()
00315     self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
00316     _pn.setBuffer(OpenRTM_aist.CdrRingBuffer())
00317     _pn.activate()
00318 
00319     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00320     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00321     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00322     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00323     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00324     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00325     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00326     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00327     _pn.deactivate()
00328     _pn.__del__()
00329     return
00330 
00331   def test_pushNew(self):
00332     _pn = PublisherNew()
00333     prop = OpenRTM_aist.Properties()
00334     cinfo = OpenRTM_aist.ConnectorInfo("",
00335                                        "",
00336                                        [],
00337                                        prop)
00338     self.assertEqual(_pn.setListener(cinfo,OpenRTM_aist.ConnectorListeners()),
00339                      OpenRTM_aist.DataPortStatus.PORT_OK)
00340     prop = OpenRTM_aist.Properties()
00341     prop.setProperty("publisher.push_policy","new")
00342     prop.setProperty("publisher.skip_count","0")
00343     prop.setProperty("thread_type","default")
00344     prop.setProperty("measurement.exec_time","enable")
00345     prop.setProperty("measurement.exec_count","0")
00346     prop.setProperty("measurement.period_time","enable")
00347     prop.setProperty("measurement.period_count","0")
00348     retcode = _pn.init(prop)
00349     cons = ConsumerMock()
00350     self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
00351     _pn.setBuffer(OpenRTM_aist.CdrRingBuffer())
00352     _pn.activate()
00353 
00354     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00355     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00356     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00357     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00358     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00359     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00360     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00361     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00362     _pn.deactivate()
00363     _pn.__del__()
00364     return
00365 
00366   def test_convertReturn(self):
00367     _pn = PublisherNew()
00368     self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.BUFFER_OK,0),
00369          OpenRTM_aist.DataPortStatus.PORT_OK)
00370     self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.BUFFER_FULL,0),
00371          OpenRTM_aist.DataPortStatus.BUFFER_FULL)
00372     self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.TIMEOUT,0),
00373          OpenRTM_aist.DataPortStatus.BUFFER_TIMEOUT)
00374     self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET,0),
00375          OpenRTM_aist.DataPortStatus.PRECONDITION_NOT_MET)
00376     self.assertEqual(_pn.convertReturn(100,0),
00377          OpenRTM_aist.DataPortStatus.PORT_ERROR)
00378     _pn.__del__()
00379     return
00380 
00381 ############ test #################
00382 if __name__ == '__main__':
00383   unittest.main()


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Thu Aug 27 2015 14:17:28