test_PublisherPeriodic.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # -*- coding: euc-jp -*-
00003 
00004 #
00005 #  \file  test_PublisherPeriodic.py
00006 #  \brief test for PublisherPeriodic class
00007 #  \date  $Date: 2007/09/27 $
00008 #  \author Shinji Kurihara
00009 # 
00010 #  Copyright (C) 2006
00011 #      Noriaki Ando
00012 #      Task-intelligence Research Group,
00013 #      Intelligent Systems Research Institute,
00014 #      National Institute of
00015 #          Advanced Industrial Science and Technology (AIST), Japan
00016 #      All rights reserved.
00017  
00018 import sys,time
00019 sys.path.insert(1,"../")
00020 
00021 import unittest
00022 
00023 import OpenRTM_aist
00024 from PublisherPeriodic import *
00025 
00026 class ConsumerMock(OpenRTM_aist.InPortCorbaCdrConsumer):
00027   def __init__(self):
00028     buff = OpenRTM_aist.CdrRingBuffer()
00029     prop = OpenRTM_aist.Properties()
00030     prop.setProperty("write.full_policy","do_nothing")
00031     buff.init(prop)
00032     self._buffer = buff
00033 
00034   def __del__(self):
00035     pass
00036 
00037   def convertReturnCode(self, ret):
00038     if ret == OpenRTM_aist.BufferStatus.BUFFER_OK:
00039       return self.PORT_OK
00040 
00041     elif ret == OpenRTM_aist.BufferStatus.BUFFER_ERROR:
00042       return self.PORT_ERROR
00043 
00044     elif ret == OpenRTM_aist.BufferStatus.BUFFER_FULL:
00045       return self.SEND_FULL
00046 
00047     elif ret == OpenRTM_aist.BufferStatus.TIMEOUT:
00048       return self.SEND_TIMEOUT
00049 
00050     elif ret == OpenRTM_aist.BufferStatus.NOT_SUPPORTED:
00051       return self.UNKNOWN_ERROR
00052 
00053     else:
00054       return self.UNKNOWN_ERROR
00055 
00056   def put(self,data):
00057     return self.convertReturnCode(self._buffer.write(data))
00058 
00059     """
00060     if self._buffer.full():
00061       return OpenRTM_aist.DataPortStatus.BUFFER_FULL
00062 
00063     ret = self._buffer.write(data)
00064     if ret == OpenRTM_aist.BufferStatus.BUFFER_OK:
00065       return OpenRTM_aist.DataPortStatus.PORT_OK
00066     elif ret == OpenRTM_aist.BufferStatus.BUFFER_ERROR:
00067       return OpenRTM_aist.DataPortStatus.PORT_ERROR
00068     elif ret == OpenRTM_aist.BufferStatus.BUFFER_FULL:
00069       return OpenRTM_aist.DataPortStatus.BUFFER_FULL
00070     elif ret == OpenRTM_aist.BufferStatus.BUFFER_EMPTY:
00071       return OpenRTM_aist.DataPortStatus.BUFFER_EMPTY
00072     elif ret == OpenRTM_aist.BufferStatus.TIMEOUT:
00073       return OpenRTM_aist.DataPortStatus.BUFFER_TIMEOUT
00074     else:
00075       return OpenRTM_aist.DataPortStatus.UNKNOWN_ERROR
00076     """
00077 
00078   def get_m_put_data(self):
00079     cdr = [0]
00080     self._buffer.read(cdr)
00081     return cdr[0]
00082 
00083   def get_m_put_data_len(self):
00084     ic = self._buffer.readable()
00085     return ic
00086 
00087 
00088 
00089 class TestPublisherPeriodic(unittest.TestCase):
00090 
00091   def setUp(self):
00092     time.sleep(0.1)
00093     return
00094 
00095   def tearDown(self):
00096     time.sleep(0.1)
00097     OpenRTM_aist.Manager.instance().shutdownManager()
00098     return
00099 
00100   def test_init(self):
00101     _pn = PublisherPeriodic()
00102     prop = OpenRTM_aist.Properties()
00103     # Propertiesが空の状態でも正常に動作することを確認する
00104     ret = _pn.init(prop)
00105     self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, ret)
00106     time.sleep(0.1)
00107     _pn.__del__()
00108 
00109     _pn = PublisherPeriodic()
00110     prop.setProperty("publisher.push_policy","new")
00111     prop.setProperty("thread_type","bar")
00112     prop.setProperty("measurement.exec_time","default")
00113     prop.setProperty("measurement.period_count","1")
00114     
00115     #thread_type が不正の場合 INVALID_ARGS を返すことを確認する。
00116     ret = _pn.init(prop)
00117     self.assertEqual(OpenRTM_aist.DataPortStatus.INVALID_ARGS, ret)
00118     _pn.__del__()
00119     
00120     _pn = PublisherPeriodic()
00121     #以下のpropertiesの設定で動作することを確認する。
00122     prop.setProperty("publisher.push_policy","all")
00123     prop.setProperty("publisher.skip_count","0")
00124     prop.setProperty("thread_type","default")
00125     prop.setProperty("measurement.exec_time","enable")
00126     prop.setProperty("measurement.exec_count","0")
00127     prop.setProperty("measurement.period_time","enable")
00128     prop.setProperty("measurement.period_count","0")
00129     retcode = _pn.init(prop)
00130     self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
00131     _pn.__del__()
00132     
00133     _pn = PublisherPeriodic()
00134     prop.setProperty("publisher.push_policy","fifo")
00135     prop.setProperty("publisher.skip_count","1")
00136     prop.setProperty("thread_type","default")
00137     prop.setProperty("measurement.exec_time","disable")
00138     prop.setProperty("measurement.exec_count","1")
00139     prop.setProperty("measurement.period_time","disable")
00140     prop.setProperty("measurement.period_count","0")
00141     retcode = _pn.init(prop)
00142     self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
00143     _pn.__del__()
00144     
00145     _pn = PublisherPeriodic()
00146     prop.setProperty("publisher.push_policy","fifo")
00147     prop.setProperty("publisher.skip_count","1")
00148     prop.setProperty("thread_type","default")
00149     prop.setProperty("measurement.exec_time","disable")
00150     prop.setProperty("measurement.exec_count","1")
00151     prop.setProperty("measurement.period_time","disable")
00152     prop.setProperty("measurement.period_count","1")
00153     retcode = _pn.init(prop)
00154     self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
00155     _pn.__del__()
00156     
00157     _pn = PublisherPeriodic()
00158     prop.setProperty("publisher.push_policy","skip")
00159     prop.setProperty("publisher.skip_count","-1")
00160     prop.setProperty("thread_type","default")
00161     prop.setProperty("measurement.exec_time","bar")
00162     prop.setProperty("measurement.exec_count","-1")
00163     prop.setProperty("measurement.period_time","bar")
00164     prop.setProperty("measurement.period_count","-1")
00165     retcode = _pn.init(prop)
00166     self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
00167     _pn.__del__()
00168     
00169     _pn = PublisherPeriodic()
00170     prop.setProperty("publisher.push_policy","new")
00171     prop.setProperty("publisher.skip_count","1")
00172     prop.setProperty("thread_type","default")
00173     prop.setProperty("measurement.exec_time","enable")
00174     prop.setProperty("measurement.exec_count","1")
00175     prop.setProperty("measurement.period_time","enable")
00176     prop.setProperty("measurement.period_count","1")
00177     retcode = _pn.init(prop)
00178     self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
00179     _pn.__del__()
00180     
00181     _pn = PublisherPeriodic()
00182     prop.setProperty("publisher.push_policy","bar")
00183     prop.setProperty("publisher.skip_count","0")
00184     prop.setProperty("thread_type","default")
00185     prop.setProperty("measurement.exec_time","enable")
00186     prop.setProperty("measurement.exec_count","0")
00187     prop.setProperty("measurement.period_time","enable")
00188     prop.setProperty("measurement.period_count","0")
00189     retcode = _pn.init(prop)
00190     self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
00191     _pn.__del__()
00192     
00193     return
00194 
00195   def test_setConsumer(self):
00196     _pn = PublisherPeriodic()
00197     self.assertEqual(_pn.setConsumer(None),OpenRTM_aist.DataPortStatus.INVALID_ARGS)
00198     self.assertEqual(_pn.setConsumer(ConsumerMock()),OpenRTM_aist.DataPortStatus.PORT_OK)
00199     _pn.__del__()
00200     return
00201 
00202   def test_setBuffer(self):
00203     _pn = PublisherPeriodic()
00204     self.assertEqual(_pn.setBuffer(None),OpenRTM_aist.DataPortStatus.INVALID_ARGS)
00205     self.assertEqual(_pn.setBuffer(OpenRTM_aist.RingBuffer()),OpenRTM_aist.DataPortStatus.PORT_OK)
00206     _pn.__del__()
00207     return
00208 
00209   def test_write(self):
00210     _pn = PublisherPeriodic()
00211     prop = OpenRTM_aist.Properties()
00212     retcode = _pn.init(prop)
00213     _pn.setBuffer(OpenRTM_aist.RingBuffer())
00214     _pn.__del__()
00215     #self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00216     return
00217 
00218   def test_activate_deactivate_isActive(self):
00219     _pn = PublisherPeriodic()
00220     prop = OpenRTM_aist.Properties()
00221     retcode = _pn.init(prop)
00222     self.assertEqual(_pn.isActive(),False)
00223     cons = ConsumerMock()
00224     self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
00225     _pn.setBuffer(OpenRTM_aist.CdrRingBuffer())
00226     self.assertEqual(_pn.activate(),OpenRTM_aist.DataPortStatus.PORT_OK)
00227     self.assertEqual(_pn.isActive(),True)
00228     self.assertEqual(_pn.deactivate(),OpenRTM_aist.DataPortStatus.PORT_OK)
00229     self.assertEqual(_pn.isActive(),False)
00230     _pn.__del__()
00231     return
00232 
00233   def test_pushAll(self):
00234     _pn = PublisherPeriodic()
00235     prop = OpenRTM_aist.Properties()
00236     cinfo = OpenRTM_aist.ConnectorInfo("",
00237                                        "",
00238                                        [],
00239                                        prop)
00240     self.assertEqual(_pn.setListener(cinfo,OpenRTM_aist.ConnectorListeners()),
00241                      OpenRTM_aist.DataPortStatus.PORT_OK)
00242     prop = OpenRTM_aist.Properties()
00243     prop.setProperty("publisher.push_policy","all")
00244     prop.setProperty("publisher.skip_count","0")
00245     prop.setProperty("thread_type","default")
00246     prop.setProperty("measurement.exec_time","enable")
00247     prop.setProperty("measurement.exec_count","0")
00248     prop.setProperty("measurement.period_time","enable")
00249     prop.setProperty("measurement.period_count","0")
00250     retcode = _pn.init(prop)
00251     cons = ConsumerMock()
00252     self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
00253     _pn.setBuffer(OpenRTM_aist.CdrRingBuffer())
00254     _pn.activate()
00255 
00256     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00257     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00258     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00259     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00260     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00261     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00262     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00263     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00264     _pn.deactivate()
00265     _pn.__del__()
00266     return
00267 
00268   def test_pushFifo(self):
00269     _pn = PublisherPeriodic()
00270     prop = OpenRTM_aist.Properties()
00271     cinfo = OpenRTM_aist.ConnectorInfo("",
00272                                        "",
00273                                        [],
00274                                        prop)
00275     self.assertEqual(_pn.setListener(cinfo,OpenRTM_aist.ConnectorListeners()),
00276                      OpenRTM_aist.DataPortStatus.PORT_OK)
00277     prop = OpenRTM_aist.Properties()
00278     prop.setProperty("publisher.push_policy","fifo")
00279     prop.setProperty("publisher.skip_count","0")
00280     prop.setProperty("thread_type","default")
00281     prop.setProperty("measurement.exec_time","enable")
00282     prop.setProperty("measurement.exec_count","0")
00283     prop.setProperty("measurement.period_time","enable")
00284     prop.setProperty("measurement.period_count","0")
00285     retcode = _pn.init(prop)
00286     cons = ConsumerMock()
00287     self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
00288     _pn.setBuffer(OpenRTM_aist.CdrRingBuffer())
00289     _pn.activate()
00290 
00291     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00292     time.sleep(0.01)
00293     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00294     time.sleep(0.01)
00295     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00296     time.sleep(0.01)
00297     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00298     time.sleep(0.01)
00299     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00300     time.sleep(0.01)
00301     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00302     time.sleep(0.01)
00303     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00304     time.sleep(0.01)
00305     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00306     time.sleep(0.01)
00307     _pn.deactivate()
00308     _pn.__del__()
00309     return
00310 
00311 
00312   def test_pushSkip(self):
00313     _pn = PublisherPeriodic()
00314     prop = OpenRTM_aist.Properties()
00315     cinfo = OpenRTM_aist.ConnectorInfo("",
00316                                        "",
00317                                        [],
00318                                        prop)
00319     self.assertEqual(_pn.setListener(cinfo,OpenRTM_aist.ConnectorListeners()),
00320                      OpenRTM_aist.DataPortStatus.PORT_OK)
00321     prop = OpenRTM_aist.Properties()
00322     prop.setProperty("publisher.push_policy","skip")
00323     prop.setProperty("publisher.skip_count","0")
00324     prop.setProperty("thread_type","default")
00325     prop.setProperty("measurement.exec_time","enable")
00326     prop.setProperty("measurement.exec_count","0")
00327     prop.setProperty("measurement.period_time","enable")
00328     prop.setProperty("measurement.period_count","0")
00329     retcode = _pn.init(prop)
00330     cons = ConsumerMock()
00331     self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
00332     _pn.setBuffer(OpenRTM_aist.CdrRingBuffer())
00333     _pn.activate()
00334 
00335     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00336     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00337     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00338     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00339     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00340     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00341     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00342     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00343     _pn.deactivate()
00344     _pn.__del__()
00345     return
00346 
00347   def test_pushNew(self):
00348     _pn = PublisherPeriodic()
00349     prop = OpenRTM_aist.Properties()
00350     cinfo = OpenRTM_aist.ConnectorInfo("",
00351                                        "",
00352                                        [],
00353                                        prop)
00354     self.assertEqual(_pn.setListener(cinfo,OpenRTM_aist.ConnectorListeners()),
00355                      OpenRTM_aist.DataPortStatus.PORT_OK)
00356     prop = OpenRTM_aist.Properties()
00357     prop.setProperty("publisher.push_policy","new")
00358     prop.setProperty("publisher.skip_count","0")
00359     prop.setProperty("thread_type","default")
00360     prop.setProperty("measurement.exec_time","enable")
00361     prop.setProperty("measurement.exec_count","0")
00362     prop.setProperty("measurement.period_time","enable")
00363     prop.setProperty("measurement.period_count","0")
00364     retcode = _pn.init(prop)
00365     cons = ConsumerMock()
00366     self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
00367     _pn.setBuffer(OpenRTM_aist.CdrRingBuffer())
00368     _pn.activate()
00369 
00370     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00371     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00372     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00373     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00374     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00375     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00376     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00377     time.sleep(0.01)
00378     self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00379     _pn.deactivate()
00380     time.sleep(0.01)
00381     _pn.__del__()
00382     return
00383 
00384   def test_convertReturn(self):
00385     _pn = PublisherPeriodic()
00386     self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.BUFFER_OK,0),
00387          OpenRTM_aist.DataPortStatus.PORT_OK)
00388     self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.BUFFER_EMPTY,0),
00389          OpenRTM_aist.DataPortStatus.PORT_ERROR)
00390     self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.TIMEOUT,0),
00391          OpenRTM_aist.DataPortStatus.BUFFER_TIMEOUT)
00392     self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET,0),
00393          OpenRTM_aist.DataPortStatus.PRECONDITION_NOT_MET)
00394     self.assertEqual(_pn.convertReturn(100,0),
00395          OpenRTM_aist.DataPortStatus.PORT_ERROR)
00396     _pn.__del__()
00397     return
00398 
00399 ############ test #################
00400 if __name__ == '__main__':
00401   unittest.main()


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Thu Aug 27 2015 14:17:28