00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 import sys,time
00019 sys.path.insert(1,"../")
00020
00021 import unittest
00022
00023 import OpenRTM_aist
00024 from PublisherNew import *
00025
00026 class ConsumerMock(OpenRTM_aist.InPortCorbaCdrConsumer):
00027 def __init__(self):
00028 buff = OpenRTM_aist.CdrRingBuffer()
00029 prop = OpenRTM_aist.Properties()
00030 prop.setProperty("write.full_policy","do_nothing")
00031 buff.init(prop)
00032 self._buffer = buff
00033
00034 def __del__(self):
00035 pass
00036
00037 def convertReturnCode(self, ret):
00038 if ret == OpenRTM_aist.BufferStatus.BUFFER_OK:
00039 return self.PORT_OK
00040
00041 elif ret == OpenRTM_aist.BufferStatus.BUFFER_ERROR:
00042 return self.PORT_ERROR
00043
00044 elif ret == OpenRTM_aist.BufferStatus.BUFFER_FULL:
00045 return self.SEND_FULL
00046
00047 elif ret == OpenRTM_aist.BufferStatus.TIMEOUT:
00048 return self.SEND_TIMEOUT
00049
00050 elif ret == OpenRTM_aist.BufferStatus.NOT_SUPPORTED:
00051 return self.UNKNOWN_ERROR
00052
00053 else:
00054 return self.UNKNOWN_ERROR
00055
00056 def put(self,data):
00057 ret = self.convertReturnCode(self._buffer.write(data))
00058 return ret
00059 """
00060 if self._buffer.full():
00061 return OpenRTM_aist.DataPortStatus.BUFFER_FULL
00062
00063 ret = self._buffer.write(data)
00064 if ret == OpenRTM_aist.BufferStatus.BUFFER_OK:
00065 return OpenRTM_aist.DataPortStatus.PORT_OK
00066 elif ret == OpenRTM_aist.BufferStatus.BUFFER_ERROR:
00067 return OpenRTM_aist.DataPortStatus.PORT_ERROR
00068 elif ret == OpenRTM_aist.BufferStatus.BUFFER_FULL:
00069 return OpenRTM_aist.DataPortStatus.BUFFER_FULL
00070 elif ret == OpenRTM_aist.BufferStatus.BUFFER_EMPTY:
00071 return OpenRTM_aist.DataPortStatus.BUFFER_EMPTY
00072 elif ret == OpenRTM_aist.BufferStatus.TIMEOUT:
00073 return OpenRTM_aist.DataPortStatus.BUFFER_TIMEOUT
00074 else:
00075 return OpenRTM_aist.DataPortStatus.UNKNOWN_ERROR
00076 """
00077
00078 def get_m_put_data(self):
00079 cdr = [0]
00080 self._buffer.read(cdr)
00081 return cdr[0]
00082
00083 def get_m_put_data_len(self):
00084 ic = self._buffer.readable()
00085 return ic
00086
00087
00088
00089 class TestPublisherNew(unittest.TestCase):
00090
00091 def setUp(self):
00092 time.sleep(0.1)
00093 return
00094
00095 def tearDown(self):
00096 OpenRTM_aist.Manager.instance().shutdownManager()
00097 return
00098
00099 def test_init(self):
00100 _pn = PublisherNew()
00101 prop = OpenRTM_aist.Properties()
00102
00103 ret = _pn.init(prop)
00104 self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, ret)
00105 _pn.__del__()
00106
00107 _pn = PublisherNew()
00108 prop.setProperty("publisher.push_policy","new")
00109 prop.setProperty("thread_type","bar")
00110 prop.setProperty("measurement.exec_time","default")
00111 prop.setProperty("measurement.period_count","1")
00112
00113
00114 ret = _pn.init(prop)
00115 self.assertEqual(OpenRTM_aist.DataPortStatus.INVALID_ARGS, ret)
00116 _pn.__del__()
00117
00118 _pn = PublisherNew()
00119
00120 prop.setProperty("publisher.push_policy","all")
00121 prop.setProperty("publisher.skip_count","0")
00122 prop.setProperty("thread_type","default")
00123 prop.setProperty("measurement.exec_time","enable")
00124 prop.setProperty("measurement.exec_count","0")
00125 prop.setProperty("measurement.period_time","enable")
00126 prop.setProperty("measurement.period_count","0")
00127 retcode = _pn.init(prop)
00128 self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
00129 _pn.__del__()
00130
00131 _pn = PublisherNew()
00132 prop.setProperty("publisher.push_policy","fifo")
00133 prop.setProperty("publisher.skip_count","1")
00134 prop.setProperty("thread_type","default")
00135 prop.setProperty("measurement.exec_time","disable")
00136 prop.setProperty("measurement.exec_count","1")
00137 prop.setProperty("measurement.period_time","disable")
00138 prop.setProperty("measurement.period_count","0")
00139 retcode = _pn.init(prop)
00140 self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
00141 _pn.__del__()
00142
00143 _pn = PublisherNew()
00144 prop.setProperty("publisher.push_policy","fifo")
00145 prop.setProperty("publisher.skip_count","1")
00146 prop.setProperty("thread_type","default")
00147 prop.setProperty("measurement.exec_time","disable")
00148 prop.setProperty("measurement.exec_count","1")
00149 prop.setProperty("measurement.period_time","disable")
00150 prop.setProperty("measurement.period_count","1")
00151 retcode = _pn.init(prop)
00152 self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
00153 _pn.__del__()
00154
00155 _pn = PublisherNew()
00156 prop.setProperty("publisher.push_policy","skip")
00157 prop.setProperty("publisher.skip_count","-1")
00158 prop.setProperty("thread_type","default")
00159 prop.setProperty("measurement.exec_time","bar")
00160 prop.setProperty("measurement.exec_count","-1")
00161 prop.setProperty("measurement.period_time","bar")
00162 prop.setProperty("measurement.period_count","-1")
00163 retcode = _pn.init(prop)
00164 self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
00165 _pn.__del__()
00166
00167 _pn = PublisherNew()
00168 prop.setProperty("publisher.push_policy","new")
00169 prop.setProperty("publisher.skip_count","1")
00170 prop.setProperty("thread_type","default")
00171 prop.setProperty("measurement.exec_time","enable")
00172 prop.setProperty("measurement.exec_count","1")
00173 prop.setProperty("measurement.period_time","enable")
00174 prop.setProperty("measurement.period_count","1")
00175 retcode = _pn.init(prop)
00176 self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
00177 _pn.__del__()
00178
00179 _pn = PublisherNew()
00180 prop.setProperty("publisher.push_policy","bar")
00181 prop.setProperty("publisher.skip_count","0")
00182 prop.setProperty("thread_type","default")
00183 prop.setProperty("measurement.exec_time","enable")
00184 prop.setProperty("measurement.exec_count","0")
00185 prop.setProperty("measurement.period_time","enable")
00186 prop.setProperty("measurement.period_count","0")
00187 retcode = _pn.init(prop)
00188 self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
00189 _pn.__del__()
00190 return
00191
00192 def test_setConsumer(self):
00193 _pn = PublisherNew()
00194 self.assertEqual(_pn.setConsumer(None),OpenRTM_aist.DataPortStatus.INVALID_ARGS)
00195 self.assertEqual(_pn.setConsumer(ConsumerMock()),OpenRTM_aist.DataPortStatus.PORT_OK)
00196 _pn.__del__()
00197 return
00198
00199 def test_setBuffer(self):
00200 _pn = PublisherNew()
00201 self.assertEqual(_pn.setBuffer(None),OpenRTM_aist.DataPortStatus.INVALID_ARGS)
00202 self.assertEqual(_pn.setBuffer(OpenRTM_aist.RingBuffer()),OpenRTM_aist.DataPortStatus.PORT_OK)
00203 _pn.__del__()
00204 return
00205
00206 def test_write(self):
00207 _pn = PublisherNew()
00208 prop = OpenRTM_aist.Properties()
00209 retcode = _pn.init(prop)
00210 _pn.setBuffer(OpenRTM_aist.RingBuffer())
00211
00212 _pn.__del__()
00213 return
00214
00215 def test_activate_deactivate_isActive(self):
00216 _pn = PublisherNew()
00217 self.assertEqual(_pn.isActive(),False)
00218 self.assertEqual(_pn.activate(),OpenRTM_aist.DataPortStatus.PORT_OK)
00219 self.assertEqual(_pn.isActive(),True)
00220 self.assertEqual(_pn.deactivate(),OpenRTM_aist.DataPortStatus.PORT_OK)
00221 self.assertEqual(_pn.isActive(),False)
00222 _pn.__del__()
00223 return
00224
00225 def test_pushAll(self):
00226 _pn = PublisherNew()
00227 prop = OpenRTM_aist.Properties()
00228 cinfo = OpenRTM_aist.ConnectorInfo("",
00229 "",
00230 [],
00231 prop)
00232 self.assertEqual(_pn.setListener(cinfo,OpenRTM_aist.ConnectorListeners()),
00233 OpenRTM_aist.DataPortStatus.PORT_OK)
00234 prop = OpenRTM_aist.Properties()
00235 prop.setProperty("publisher.push_policy","all")
00236 prop.setProperty("publisher.skip_count","0")
00237 prop.setProperty("thread_type","default")
00238 prop.setProperty("measurement.exec_time","enable")
00239 prop.setProperty("measurement.exec_count","0")
00240 prop.setProperty("measurement.period_time","enable")
00241 prop.setProperty("measurement.period_count","0")
00242 retcode = _pn.init(prop)
00243 cons = ConsumerMock()
00244 self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
00245 _pn.setBuffer(OpenRTM_aist.CdrRingBuffer())
00246 _pn.activate()
00247
00248 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00249 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00250 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00251 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00252 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00253 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00254 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00255 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00256 _pn.deactivate()
00257 _pn.__del__()
00258 return
00259
00260 def test_pushFifo(self):
00261 _pn = PublisherNew()
00262 prop = OpenRTM_aist.Properties()
00263 cinfo = OpenRTM_aist.ConnectorInfo("",
00264 "",
00265 [],
00266 prop)
00267 self.assertEqual(_pn.setListener(cinfo,OpenRTM_aist.ConnectorListeners()),
00268 OpenRTM_aist.DataPortStatus.PORT_OK)
00269 prop = OpenRTM_aist.Properties()
00270 prop.setProperty("publisher.push_policy","fifo")
00271 prop.setProperty("publisher.skip_count","0")
00272 prop.setProperty("thread_type","default")
00273 prop.setProperty("measurement.exec_time","enable")
00274 prop.setProperty("measurement.exec_count","0")
00275 prop.setProperty("measurement.period_time","enable")
00276 prop.setProperty("measurement.period_count","0")
00277 retcode = _pn.init(prop)
00278 cons = ConsumerMock()
00279 self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
00280 _pn.setBuffer(OpenRTM_aist.CdrRingBuffer())
00281 _pn.activate()
00282
00283 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00284 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00285 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00286 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00287 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00288 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00289 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00290 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00291 _pn.deactivate()
00292 _pn.__del__()
00293 return
00294
00295
00296 def test_pushSkip(self):
00297 _pn = PublisherNew()
00298 prop = OpenRTM_aist.Properties()
00299 cinfo = OpenRTM_aist.ConnectorInfo("",
00300 "",
00301 [],
00302 prop)
00303 self.assertEqual(_pn.setListener(cinfo,OpenRTM_aist.ConnectorListeners()),
00304 OpenRTM_aist.DataPortStatus.PORT_OK)
00305 prop = OpenRTM_aist.Properties()
00306 prop.setProperty("publisher.push_policy","skip")
00307 prop.setProperty("publisher.skip_count","0")
00308 prop.setProperty("thread_type","default")
00309 prop.setProperty("measurement.exec_time","enable")
00310 prop.setProperty("measurement.exec_count","0")
00311 prop.setProperty("measurement.period_time","enable")
00312 prop.setProperty("measurement.period_count","0")
00313 retcode = _pn.init(prop)
00314 cons = ConsumerMock()
00315 self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
00316 _pn.setBuffer(OpenRTM_aist.CdrRingBuffer())
00317 _pn.activate()
00318
00319 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00320 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00321 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00322 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00323 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00324 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00325 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00326 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00327 _pn.deactivate()
00328 _pn.__del__()
00329 return
00330
00331 def test_pushNew(self):
00332 _pn = PublisherNew()
00333 prop = OpenRTM_aist.Properties()
00334 cinfo = OpenRTM_aist.ConnectorInfo("",
00335 "",
00336 [],
00337 prop)
00338 self.assertEqual(_pn.setListener(cinfo,OpenRTM_aist.ConnectorListeners()),
00339 OpenRTM_aist.DataPortStatus.PORT_OK)
00340 prop = OpenRTM_aist.Properties()
00341 prop.setProperty("publisher.push_policy","new")
00342 prop.setProperty("publisher.skip_count","0")
00343 prop.setProperty("thread_type","default")
00344 prop.setProperty("measurement.exec_time","enable")
00345 prop.setProperty("measurement.exec_count","0")
00346 prop.setProperty("measurement.period_time","enable")
00347 prop.setProperty("measurement.period_count","0")
00348 retcode = _pn.init(prop)
00349 cons = ConsumerMock()
00350 self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
00351 _pn.setBuffer(OpenRTM_aist.CdrRingBuffer())
00352 _pn.activate()
00353
00354 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00355 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00356 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00357 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00358 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00359 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00360 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00361 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
00362 _pn.deactivate()
00363 _pn.__del__()
00364 return
00365
00366 def test_convertReturn(self):
00367 _pn = PublisherNew()
00368 self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.BUFFER_OK,0),
00369 OpenRTM_aist.DataPortStatus.PORT_OK)
00370 self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.BUFFER_FULL,0),
00371 OpenRTM_aist.DataPortStatus.BUFFER_FULL)
00372 self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.TIMEOUT,0),
00373 OpenRTM_aist.DataPortStatus.BUFFER_TIMEOUT)
00374 self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET,0),
00375 OpenRTM_aist.DataPortStatus.PRECONDITION_NOT_MET)
00376 self.assertEqual(_pn.convertReturn(100,0),
00377 OpenRTM_aist.DataPortStatus.PORT_ERROR)
00378 _pn.__del__()
00379 return
00380
00381
00382 if __name__ == '__main__':
00383 unittest.main()