00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 import sys,time
00019 sys.path.insert(1,"../")
00020
00021 import unittest
00022
00023 import OpenRTM_aist
00024 from PublisherPeriodic import *
00025
00026 class ConsumerMock(OpenRTM_aist.InPortCorbaCdrConsumer):
00027 def __init__(self):
00028 buff = OpenRTM_aist.CdrRingBuffer()
00029 prop = OpenRTM_aist.Properties()
00030 prop.setProperty("write.full_policy","do_nothing")
00031 buff.init(prop)
00032 self._buffer = buff
00033
00034 def __del__(self):
00035 pass
00036
00037 def convertReturnCode(self, ret):
00038 if ret == OpenRTM_aist.BufferStatus.BUFFER_OK:
00039 return self.PORT_OK
00040
00041 elif ret == OpenRTM_aist.BufferStatus.BUFFER_ERROR:
00042 return self.PORT_ERROR
00043
00044 elif ret == OpenRTM_aist.BufferStatus.BUFFER_FULL:
00045 return self.SEND_FULL
00046
00047 elif ret == OpenRTM_aist.BufferStatus.TIMEOUT:
00048 return self.SEND_TIMEOUT
00049
00050 elif ret == OpenRTM_aist.BufferStatus.NOT_SUPPORTED:
00051 return self.UNKNOWN_ERROR
00052
00053 else:
00054 return self.UNKNOWN_ERROR
00055
00056 def put(self,data):
00057 return self.convertReturnCode(self._buffer.write(data))
00058
00059 """
00060 if self._buffer.full():
00061 return OpenRTM_aist.DataPortStatus.BUFFER_FULL
00062
00063 ret = self._buffer.write(data)
00064 if ret == OpenRTM_aist.BufferStatus.BUFFER_OK:
00065 return OpenRTM_aist.DataPortStatus.PORT_OK
00066 elif ret == OpenRTM_aist.BufferStatus.BUFFER_ERROR:
00067 return OpenRTM_aist.DataPortStatus.PORT_ERROR
00068 elif ret == OpenRTM_aist.BufferStatus.BUFFER_FULL:
00069 return OpenRTM_aist.DataPortStatus.BUFFER_FULL
00070 elif ret == OpenRTM_aist.BufferStatus.BUFFER_EMPTY:
00071 return OpenRTM_aist.DataPortStatus.BUFFER_EMPTY
00072 elif ret == OpenRTM_aist.BufferStatus.TIMEOUT:
00073 return OpenRTM_aist.DataPortStatus.BUFFER_TIMEOUT
00074 else:
00075 return OpenRTM_aist.DataPortStatus.UNKNOWN_ERROR
00076 """
00077
00078 def get_m_put_data(self):
00079 cdr = [0]
00080 self._buffer.read(cdr)
00081 return cdr[0]
00082
00083 def get_m_put_data_len(self):
00084 ic = self._buffer.readable()
00085 return ic
00086
00087
00088
00089 class TestPublisherPeriodic(unittest.TestCase):
00090
00091 def setUp(self):
00092 time.sleep(0.1)
00093 return
00094
00095 def tearDown(self):
00096 time.sleep(0.1)
00097 OpenRTM_aist.Manager.instance().shutdownManager()
00098 return
00099
00100 def test_init(self):
00101 _pn = PublisherPeriodic()
00102 prop = OpenRTM_aist.Properties()
00103
00104 ret = _pn.init(prop)
00105 self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, ret)
00106 time.sleep(0.1)
00107 _pn.__del__()
00108
00109 _pn = PublisherPeriodic()
00110 prop.setProperty("publisher.push_policy","new")
00111 prop.setProperty("thread_type","bar")
00112 prop.setProperty("measurement.exec_time","default")
00113 prop.setProperty("measurement.period_count","1")
00114
00115
00116 ret = _pn.init(prop)
00117 self.assertEqual(OpenRTM_aist.DataPortStatus.INVALID_ARGS, ret)
00118 _pn.__del__()
00119
00120 _pn = PublisherPeriodic()
00121
00122 prop.setProperty("publisher.push_policy","all")
00123 prop.setProperty("publisher.skip_count","0")
00124 prop.setProperty("thread_type","default")
00125 prop.setProperty("measurement.exec_time","enable")
00126 prop.setProperty("measurement.exec_count","0")
00127 prop.setProperty("measurement.period_time","enable")
00128 prop.setProperty("measurement.period_count","0")
00129 retcode = _pn.init(prop)
00130 self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
00131 _pn.__del__()
00132
00133 _pn = PublisherPeriodic()
00134 prop.setProperty("publisher.push_policy","fifo")
00135 prop.setProperty("publisher.skip_count","1")
00136 prop.setProperty("thread_type","default")
00137 prop.setProperty("measurement.exec_time","disable")
00138 prop.setProperty("measurement.exec_count","1")
00139 prop.setProperty("measurement.period_time","disable")
00140 prop.setProperty("measurement.period_count","0")
00141 retcode = _pn.init(prop)
00142 self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
00143 _pn.__del__()
00144
00145 _pn = PublisherPeriodic()
00146 prop.setProperty("publisher.push_policy","fifo")
00147 prop.setProperty("publisher.skip_count","1")
00148 prop.setProperty("thread_type","default")
00149 prop.setProperty("measurement.exec_time","disable")
00150 prop.setProperty("measurement.exec_count","1")
00151 prop.setProperty("measurement.period_time","disable")
00152 prop.setProperty("measurement.period_count","1")
00153 retcode = _pn.init(prop)
00154 self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
00155 _pn.__del__()
00156
00157 _pn = PublisherPeriodic()
00158 prop.setProperty("publisher.push_policy","skip")
00159 prop.setProperty("publisher.skip_count","-1")
00160 prop.setProperty("thread_type","default")
00161 prop.setProperty("measurement.exec_time","bar")
00162 prop.setProperty("measurement.exec_count","-1")
00163 prop.setProperty("measurement.period_time","bar")
00164 prop.setProperty("measurement.period_count","-1")
00165 retcode = _pn.init(prop)
00166 self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
00167 _pn.__del__()
00168
00169 _pn = PublisherPeriodic()
00170 prop.setProperty("publisher.push_policy","new")
00171 prop.setProperty("publisher.skip_count","1")
00172 prop.setProperty("thread_type","default")
00173 prop.setProperty("measurement.exec_time","enable")
00174 prop.setProperty("measurement.exec_count","1")
00175 prop.setProperty("measurement.period_time","enable")
00176 prop.setProperty("measurement.period_count","1")
00177 retcode = _pn.init(prop)
00178 self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
00179 _pn.__del__()
00180
00181 _pn = PublisherPeriodic()
00182 prop.setProperty("publisher.push_policy","bar")
00183 prop.setProperty("publisher.skip_count","0")
00184 prop.setProperty("thread_type","default")
00185 prop.setProperty("measurement.exec_time","enable")
00186 prop.setProperty("measurement.exec_count","0")
00187 prop.setProperty("measurement.period_time","enable")
00188 prop.setProperty("measurement.period_count","0")
00189 retcode = _pn.init(prop)
00190 self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
00191 _pn.__del__()
00192
00193 return
00194
00195 def test_setConsumer(self):
00196 _pn = PublisherPeriodic()
00197 self.assertEqual(_pn.setConsumer(None),OpenRTM_aist.DataPortStatus.INVALID_ARGS)
00198 self.assertEqual(_pn.setConsumer(ConsumerMock()),OpenRTM_aist.DataPortStatus.PORT_OK)
00199 _pn.__del__()
00200 return
00201
00202 def test_setBuffer(self):
00203 _pn = PublisherPeriodic()
00204 self.assertEqual(_pn.setBuffer(None),OpenRTM_aist.DataPortStatus.INVALID_ARGS)
00205 self.assertEqual(_pn.setBuffer(OpenRTM_aist.RingBuffer()),OpenRTM_aist.DataPortStatus.PORT_OK)
00206 _pn.__del__()
00207 return
00208
00209 def test_write(self):
00210 _pn = PublisherPeriodic()
00211 prop = OpenRTM_aist.Properties()
00212 retcode = _pn.init(prop)
00213 _pn.setBuffer(OpenRTM_aist.RingBuffer())
00214 _pn.__del__()
00215
00216 return
00217
00218 def test_activate_deactivate_isActive(self):
00219 _pn = PublisherPeriodic()
00220 prop = OpenRTM_aist.Properties()
00221 retcode = _pn.init(prop)
00222 self.assertEqual(_pn.isActive(),False)
00223 cons = ConsumerMock()
00224 self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
00225 _pn.setBuffer(OpenRTM_aist.CdrRingBuffer())
00226 self.assertEqual(_pn.activate(),OpenRTM_aist.DataPortStatus.PORT_OK)
00227 self.assertEqual(_pn.isActive(),True)
00228 self.assertEqual(_pn.deactivate(),OpenRTM_aist.DataPortStatus.PORT_OK)
00229 self.assertEqual(_pn.isActive(),False)
00230 _pn.__del__()
00231 return
00232
00233 def test_pushAll(self):
00234 _pn = PublisherPeriodic()
00235 prop = OpenRTM_aist.Properties()
00236 cinfo = OpenRTM_aist.ConnectorInfo("",
00237 "",
00238 [],
00239 prop)
00240 self.assertEqual(_pn.setListener(cinfo,OpenRTM_aist.ConnectorListeners()),
00241 OpenRTM_aist.DataPortStatus.PORT_OK)
00242 prop = OpenRTM_aist.Properties()
00243 prop.setProperty("publisher.push_policy","all")
00244 prop.setProperty("publisher.skip_count","0")
00245 prop.setProperty("thread_type","default")
00246 prop.setProperty("measurement.exec_time","enable")
00247 prop.setProperty("measurement.exec_count","0")
00248 prop.setProperty("measurement.period_time","enable")
00249 prop.setProperty("measurement.period_count","0")
00250 retcode = _pn.init(prop)
00251 cons = ConsumerMock()
00252 self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
00253 _pn.setBuffer(OpenRTM_aist.CdrRingBuffer())
00254 _pn.activate()
00255
00256 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00257 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00258 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00259 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00260 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00261 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00262 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00263 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00264 _pn.deactivate()
00265 _pn.__del__()
00266 return
00267
00268 def test_pushFifo(self):
00269 _pn = PublisherPeriodic()
00270 prop = OpenRTM_aist.Properties()
00271 cinfo = OpenRTM_aist.ConnectorInfo("",
00272 "",
00273 [],
00274 prop)
00275 self.assertEqual(_pn.setListener(cinfo,OpenRTM_aist.ConnectorListeners()),
00276 OpenRTM_aist.DataPortStatus.PORT_OK)
00277 prop = OpenRTM_aist.Properties()
00278 prop.setProperty("publisher.push_policy","fifo")
00279 prop.setProperty("publisher.skip_count","0")
00280 prop.setProperty("thread_type","default")
00281 prop.setProperty("measurement.exec_time","enable")
00282 prop.setProperty("measurement.exec_count","0")
00283 prop.setProperty("measurement.period_time","enable")
00284 prop.setProperty("measurement.period_count","0")
00285 retcode = _pn.init(prop)
00286 cons = ConsumerMock()
00287 self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
00288 _pn.setBuffer(OpenRTM_aist.CdrRingBuffer())
00289 _pn.activate()
00290
00291 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00292 time.sleep(0.01)
00293 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00294 time.sleep(0.01)
00295 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00296 time.sleep(0.01)
00297 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00298 time.sleep(0.01)
00299 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00300 time.sleep(0.01)
00301 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00302 time.sleep(0.01)
00303 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00304 time.sleep(0.01)
00305 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00306 time.sleep(0.01)
00307 _pn.deactivate()
00308 _pn.__del__()
00309 return
00310
00311
00312 def test_pushSkip(self):
00313 _pn = PublisherPeriodic()
00314 prop = OpenRTM_aist.Properties()
00315 cinfo = OpenRTM_aist.ConnectorInfo("",
00316 "",
00317 [],
00318 prop)
00319 self.assertEqual(_pn.setListener(cinfo,OpenRTM_aist.ConnectorListeners()),
00320 OpenRTM_aist.DataPortStatus.PORT_OK)
00321 prop = OpenRTM_aist.Properties()
00322 prop.setProperty("publisher.push_policy","skip")
00323 prop.setProperty("publisher.skip_count","0")
00324 prop.setProperty("thread_type","default")
00325 prop.setProperty("measurement.exec_time","enable")
00326 prop.setProperty("measurement.exec_count","0")
00327 prop.setProperty("measurement.period_time","enable")
00328 prop.setProperty("measurement.period_count","0")
00329 retcode = _pn.init(prop)
00330 cons = ConsumerMock()
00331 self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
00332 _pn.setBuffer(OpenRTM_aist.CdrRingBuffer())
00333 _pn.activate()
00334
00335 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00336 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00337 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00338 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00339 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00340 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00341 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00342 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00343 _pn.deactivate()
00344 _pn.__del__()
00345 return
00346
00347 def test_pushNew(self):
00348 _pn = PublisherPeriodic()
00349 prop = OpenRTM_aist.Properties()
00350 cinfo = OpenRTM_aist.ConnectorInfo("",
00351 "",
00352 [],
00353 prop)
00354 self.assertEqual(_pn.setListener(cinfo,OpenRTM_aist.ConnectorListeners()),
00355 OpenRTM_aist.DataPortStatus.PORT_OK)
00356 prop = OpenRTM_aist.Properties()
00357 prop.setProperty("publisher.push_policy","new")
00358 prop.setProperty("publisher.skip_count","0")
00359 prop.setProperty("thread_type","default")
00360 prop.setProperty("measurement.exec_time","enable")
00361 prop.setProperty("measurement.exec_count","0")
00362 prop.setProperty("measurement.period_time","enable")
00363 prop.setProperty("measurement.period_count","0")
00364 retcode = _pn.init(prop)
00365 cons = ConsumerMock()
00366 self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
00367 _pn.setBuffer(OpenRTM_aist.CdrRingBuffer())
00368 _pn.activate()
00369
00370 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00371 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00372 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00373 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00374 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00375 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00376 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00377 time.sleep(0.01)
00378 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00379 _pn.deactivate()
00380 time.sleep(0.01)
00381 _pn.__del__()
00382 return
00383
00384 def test_convertReturn(self):
00385 _pn = PublisherPeriodic()
00386 self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.BUFFER_OK,0),
00387 OpenRTM_aist.DataPortStatus.PORT_OK)
00388 self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.BUFFER_EMPTY,0),
00389 OpenRTM_aist.DataPortStatus.PORT_ERROR)
00390 self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.TIMEOUT,0),
00391 OpenRTM_aist.DataPortStatus.BUFFER_TIMEOUT)
00392 self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET,0),
00393 OpenRTM_aist.DataPortStatus.PRECONDITION_NOT_MET)
00394 self.assertEqual(_pn.convertReturn(100,0),
00395 OpenRTM_aist.DataPortStatus.PORT_ERROR)
00396 _pn.__del__()
00397 return
00398
00399
00400 if __name__ == '__main__':
00401 unittest.main()