test_PublisherNew.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: euc-jp -*-
3 
4 #
5 # \file test_PublisherNew.py
6 # \brief test for PublisherNew class
7 # \date $Date: 2007/09/27 $
8 # \author Shinji Kurihara
9 #
10 # Copyright (C) 2006
11 # Noriaki Ando
12 # Task-intelligence Research Group,
13 # Intelligent Systems Research Institute,
14 # National Institute of
15 # Advanced Industrial Science and Technology (AIST), Japan
16 # All rights reserved.
17 
18 import sys,time
19 sys.path.insert(1,"../")
20 
21 import unittest
22 
23 import OpenRTM_aist
24 from PublisherNew import *
25 
27  def __init__(self):
30  prop.setProperty("write.full_policy","do_nothing")
31  buff.init(prop)
32  self._buffer = buff
33 
34  def __del__(self):
35  pass
36 
37  def convertReturnCode(self, ret):
38  if ret == OpenRTM_aist.BufferStatus.BUFFER_OK:
39  return self.PORT_OK
40 
41  elif ret == OpenRTM_aist.BufferStatus.BUFFER_ERROR:
42  return self.PORT_ERROR
43 
44  elif ret == OpenRTM_aist.BufferStatus.BUFFER_FULL:
45  return self.SEND_FULL
46 
47  elif ret == OpenRTM_aist.BufferStatus.TIMEOUT:
48  return self.SEND_TIMEOUT
49 
50  elif ret == OpenRTM_aist.BufferStatus.NOT_SUPPORTED:
51  return self.UNKNOWN_ERROR
52 
53  else:
54  return self.UNKNOWN_ERROR
55 
56  def put(self,data):
57  ret = self.convertReturnCode(self._buffer.write(data))
58  return ret
59  """
60  if self._buffer.full():
61  return OpenRTM_aist.DataPortStatus.BUFFER_FULL
62 
63  ret = self._buffer.write(data)
64  if ret == OpenRTM_aist.BufferStatus.BUFFER_OK:
65  return OpenRTM_aist.DataPortStatus.PORT_OK
66  elif ret == OpenRTM_aist.BufferStatus.BUFFER_ERROR:
67  return OpenRTM_aist.DataPortStatus.PORT_ERROR
68  elif ret == OpenRTM_aist.BufferStatus.BUFFER_FULL:
69  return OpenRTM_aist.DataPortStatus.BUFFER_FULL
70  elif ret == OpenRTM_aist.BufferStatus.BUFFER_EMPTY:
71  return OpenRTM_aist.DataPortStatus.BUFFER_EMPTY
72  elif ret == OpenRTM_aist.BufferStatus.TIMEOUT:
73  return OpenRTM_aist.DataPortStatus.BUFFER_TIMEOUT
74  else:
75  return OpenRTM_aist.DataPortStatus.UNKNOWN_ERROR
76  """
77 
78  def get_m_put_data(self):
79  cdr = [0]
80  self._buffer.read(cdr)
81  return cdr[0]
82 
83  def get_m_put_data_len(self):
84  ic = self._buffer.readable()
85  return ic
86 
87 
88 
89 class TestPublisherNew(unittest.TestCase):
90 
91  def setUp(self):
92  time.sleep(0.1)
93  return
94 
95  def tearDown(self):
96  OpenRTM_aist.Manager.instance().shutdownManager()
97  return
98 
99  def test_init(self):
100  _pn = PublisherNew()
101  prop = OpenRTM_aist.Properties()
102  # Propertiesが空の状態でも正常に動作することを確認する ret = _pn.init(prop) self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, ret) _pn.__del__() _pn = PublisherNew() prop.setProperty("publisher.push_policy","new") prop.setProperty("thread_type","bar") prop.setProperty("measurement.exec_time","default") prop.setProperty("measurement.period_count","1") #thread_type が不正の場合 INVALID_ARGS を返すことを確認する。 ret = _pn.init(prop) self.assertEqual(OpenRTM_aist.DataPortStatus.INVALID_ARGS, ret) _pn.__del__() _pn = PublisherNew() #以下のpropertiesの設定で動作することを確認する。 prop.setProperty("publisher.push_policy","all") prop.setProperty("publisher.skip_count","0") prop.setProperty("thread_type","default") prop.setProperty("measurement.exec_time","enable") prop.setProperty("measurement.exec_count","0") prop.setProperty("measurement.period_time","enable") prop.setProperty("measurement.period_count","0") retcode = _pn.init(prop) self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode) _pn.__del__() _pn = PublisherNew() prop.setProperty("publisher.push_policy","fifo") prop.setProperty("publisher.skip_count","1") prop.setProperty("thread_type","default") prop.setProperty("measurement.exec_time","disable") prop.setProperty("measurement.exec_count","1") prop.setProperty("measurement.period_time","disable") prop.setProperty("measurement.period_count","0") retcode = _pn.init(prop) self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode) _pn.__del__() _pn = PublisherNew() prop.setProperty("publisher.push_policy","fifo") prop.setProperty("publisher.skip_count","1") prop.setProperty("thread_type","default") prop.setProperty("measurement.exec_time","disable") prop.setProperty("measurement.exec_count","1") prop.setProperty("measurement.period_time","disable") prop.setProperty("measurement.period_count","1") retcode = _pn.init(prop) self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode) _pn.__del__() _pn = PublisherNew() prop.setProperty("publisher.push_policy","skip") prop.setProperty("publisher.skip_count","-1") prop.setProperty("thread_type","default") prop.setProperty("measurement.exec_time","bar") prop.setProperty("measurement.exec_count","-1") prop.setProperty("measurement.period_time","bar") prop.setProperty("measurement.period_count","-1") retcode = _pn.init(prop) self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode) _pn.__del__() _pn = PublisherNew() prop.setProperty("publisher.push_policy","new") prop.setProperty("publisher.skip_count","1") prop.setProperty("thread_type","default") prop.setProperty("measurement.exec_time","enable") prop.setProperty("measurement.exec_count","1") prop.setProperty("measurement.period_time","enable") prop.setProperty("measurement.period_count","1") retcode = _pn.init(prop) self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode) _pn.__del__() _pn = PublisherNew() prop.setProperty("publisher.push_policy","bar") prop.setProperty("publisher.skip_count","0") prop.setProperty("thread_type","default") prop.setProperty("measurement.exec_time","enable") prop.setProperty("measurement.exec_count","0") prop.setProperty("measurement.period_time","enable") prop.setProperty("measurement.period_count","0") retcode = _pn.init(prop) self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode) _pn.__del__() return def test_setConsumer(self): _pn = PublisherNew() self.assertEqual(_pn.setConsumer(None),OpenRTM_aist.DataPortStatus.INVALID_ARGS) self.assertEqual(_pn.setConsumer(ConsumerMock()),OpenRTM_aist.DataPortStatus.PORT_OK) _pn.__del__() return def test_setBuffer(self): _pn = PublisherNew() self.assertEqual(_pn.setBuffer(None),OpenRTM_aist.DataPortStatus.INVALID_ARGS) self.assertEqual(_pn.setBuffer(OpenRTM_aist.RingBuffer()),OpenRTM_aist.DataPortStatus.PORT_OK) _pn.__del__() return def test_write(self): _pn = PublisherNew() prop = OpenRTM_aist.Properties() retcode = _pn.init(prop) _pn.setBuffer(OpenRTM_aist.RingBuffer()) #self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) _pn.__del__() return def test_activate_deactivate_isActive(self): _pn = PublisherNew() self.assertEqual(_pn.isActive(),False) self.assertEqual(_pn.activate(),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.isActive(),True) self.assertEqual(_pn.deactivate(),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.isActive(),False) _pn.__del__() return def test_pushAll(self): _pn = PublisherNew() prop = OpenRTM_aist.Properties() cinfo = OpenRTM_aist.ConnectorInfo("", "", [], prop) self.assertEqual(_pn.setListener(cinfo,OpenRTM_aist.ConnectorListeners()), OpenRTM_aist.DataPortStatus.PORT_OK) prop = OpenRTM_aist.Properties() prop.setProperty("publisher.push_policy","all") prop.setProperty("publisher.skip_count","0") prop.setProperty("thread_type","default") prop.setProperty("measurement.exec_time","enable") prop.setProperty("measurement.exec_count","0") prop.setProperty("measurement.period_time","enable") prop.setProperty("measurement.period_count","0") retcode = _pn.init(prop) cons = ConsumerMock() self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK) _pn.setBuffer(OpenRTM_aist.CdrRingBuffer()) _pn.activate() self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) _pn.deactivate() _pn.__del__() return def test_pushFifo(self): _pn = PublisherNew() prop = OpenRTM_aist.Properties() cinfo = OpenRTM_aist.ConnectorInfo("", "", [], prop) self.assertEqual(_pn.setListener(cinfo,OpenRTM_aist.ConnectorListeners()), OpenRTM_aist.DataPortStatus.PORT_OK) prop = OpenRTM_aist.Properties() prop.setProperty("publisher.push_policy","fifo") prop.setProperty("publisher.skip_count","0") prop.setProperty("thread_type","default") prop.setProperty("measurement.exec_time","enable") prop.setProperty("measurement.exec_count","0") prop.setProperty("measurement.period_time","enable") prop.setProperty("measurement.period_count","0") retcode = _pn.init(prop) cons = ConsumerMock() self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK) _pn.setBuffer(OpenRTM_aist.CdrRingBuffer()) _pn.activate() self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) _pn.deactivate() _pn.__del__() return def test_pushSkip(self): _pn = PublisherNew() prop = OpenRTM_aist.Properties() cinfo = OpenRTM_aist.ConnectorInfo("", "", [], prop) self.assertEqual(_pn.setListener(cinfo,OpenRTM_aist.ConnectorListeners()), OpenRTM_aist.DataPortStatus.PORT_OK) prop = OpenRTM_aist.Properties() prop.setProperty("publisher.push_policy","skip") prop.setProperty("publisher.skip_count","0") prop.setProperty("thread_type","default") prop.setProperty("measurement.exec_time","enable") prop.setProperty("measurement.exec_count","0") prop.setProperty("measurement.period_time","enable") prop.setProperty("measurement.period_count","0") retcode = _pn.init(prop) cons = ConsumerMock() self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK) _pn.setBuffer(OpenRTM_aist.CdrRingBuffer()) _pn.activate() self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) _pn.deactivate() _pn.__del__() return def test_pushNew(self): _pn = PublisherNew() prop = OpenRTM_aist.Properties() cinfo = OpenRTM_aist.ConnectorInfo("", "", [], prop) self.assertEqual(_pn.setListener(cinfo,OpenRTM_aist.ConnectorListeners()), OpenRTM_aist.DataPortStatus.PORT_OK) prop = OpenRTM_aist.Properties() prop.setProperty("publisher.push_policy","new") prop.setProperty("publisher.skip_count","0") prop.setProperty("thread_type","default") prop.setProperty("measurement.exec_time","enable") prop.setProperty("measurement.exec_count","0") prop.setProperty("measurement.period_time","enable") prop.setProperty("measurement.period_count","0") retcode = _pn.init(prop) cons = ConsumerMock() self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK) _pn.setBuffer(OpenRTM_aist.CdrRingBuffer()) _pn.activate() self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) _pn.deactivate() _pn.__del__() return def test_convertReturn(self): _pn = PublisherNew() self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.BUFFER_OK,0), OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.BUFFER_FULL,0), OpenRTM_aist.DataPortStatus.BUFFER_FULL) self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.TIMEOUT,0), OpenRTM_aist.DataPortStatus.BUFFER_TIMEOUT) self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET,0), OpenRTM_aist.DataPortStatus.PRECONDITION_NOT_MET) self.assertEqual(_pn.convertReturn(100,0), OpenRTM_aist.DataPortStatus.PORT_ERROR) _pn.__del__() return ############ test ################# if __name__ == '__main__': unittest.main()
103  ret = _pn.init(prop)
104  self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, ret)
105  _pn.__del__()
106 
107  _pn = PublisherNew()
108  prop.setProperty("publisher.push_policy","new")
109  prop.setProperty("thread_type","bar")
110  prop.setProperty("measurement.exec_time","default")
111  prop.setProperty("measurement.period_count","1")
112 
113  #thread_type が不正の場合 INVALID_ARGS を返すことを確認する。
114  ret = _pn.init(prop)
115  self.assertEqual(OpenRTM_aist.DataPortStatus.INVALID_ARGS, ret)
116  _pn.__del__()
117 
118  _pn = PublisherNew()
119  #以下のpropertiesの設定で動作することを確認する。
120  prop.setProperty("publisher.push_policy","all")
121  prop.setProperty("publisher.skip_count","0")
122  prop.setProperty("thread_type","default")
123  prop.setProperty("measurement.exec_time","enable")
124  prop.setProperty("measurement.exec_count","0")
125  prop.setProperty("measurement.period_time","enable")
126  prop.setProperty("measurement.period_count","0")
127  retcode = _pn.init(prop)
128  self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
129  _pn.__del__()
130 
131  _pn = PublisherNew()
132  prop.setProperty("publisher.push_policy","fifo")
133  prop.setProperty("publisher.skip_count","1")
134  prop.setProperty("thread_type","default")
135  prop.setProperty("measurement.exec_time","disable")
136  prop.setProperty("measurement.exec_count","1")
137  prop.setProperty("measurement.period_time","disable")
138  prop.setProperty("measurement.period_count","0")
139  retcode = _pn.init(prop)
140  self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
141  _pn.__del__()
142 
143  _pn = PublisherNew()
144  prop.setProperty("publisher.push_policy","fifo")
145  prop.setProperty("publisher.skip_count","1")
146  prop.setProperty("thread_type","default")
147  prop.setProperty("measurement.exec_time","disable")
148  prop.setProperty("measurement.exec_count","1")
149  prop.setProperty("measurement.period_time","disable")
150  prop.setProperty("measurement.period_count","1")
151  retcode = _pn.init(prop)
152  self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
153  _pn.__del__()
154 
155  _pn = PublisherNew()
156  prop.setProperty("publisher.push_policy","skip")
157  prop.setProperty("publisher.skip_count","-1")
158  prop.setProperty("thread_type","default")
159  prop.setProperty("measurement.exec_time","bar")
160  prop.setProperty("measurement.exec_count","-1")
161  prop.setProperty("measurement.period_time","bar")
162  prop.setProperty("measurement.period_count","-1")
163  retcode = _pn.init(prop)
164  self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
165  _pn.__del__()
166 
167  _pn = PublisherNew()
168  prop.setProperty("publisher.push_policy","new")
169  prop.setProperty("publisher.skip_count","1")
170  prop.setProperty("thread_type","default")
171  prop.setProperty("measurement.exec_time","enable")
172  prop.setProperty("measurement.exec_count","1")
173  prop.setProperty("measurement.period_time","enable")
174  prop.setProperty("measurement.period_count","1")
175  retcode = _pn.init(prop)
176  self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
177  _pn.__del__()
178 
179  _pn = PublisherNew()
180  prop.setProperty("publisher.push_policy","bar")
181  prop.setProperty("publisher.skip_count","0")
182  prop.setProperty("thread_type","default")
183  prop.setProperty("measurement.exec_time","enable")
184  prop.setProperty("measurement.exec_count","0")
185  prop.setProperty("measurement.period_time","enable")
186  prop.setProperty("measurement.period_count","0")
187  retcode = _pn.init(prop)
188  self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
189  _pn.__del__()
190  return
191 
192  def test_setConsumer(self):
193  _pn = PublisherNew()
194  self.assertEqual(_pn.setConsumer(None),OpenRTM_aist.DataPortStatus.INVALID_ARGS)
195  self.assertEqual(_pn.setConsumer(ConsumerMock()),OpenRTM_aist.DataPortStatus.PORT_OK)
196  _pn.__del__()
197  return
198 
199  def test_setBuffer(self):
200  _pn = PublisherNew()
201  self.assertEqual(_pn.setBuffer(None),OpenRTM_aist.DataPortStatus.INVALID_ARGS)
202  self.assertEqual(_pn.setBuffer(OpenRTM_aist.RingBuffer()),OpenRTM_aist.DataPortStatus.PORT_OK)
203  _pn.__del__()
204  return
205 
206  def test_write(self):
207  _pn = PublisherNew()
208  prop = OpenRTM_aist.Properties()
209  retcode = _pn.init(prop)
210  _pn.setBuffer(OpenRTM_aist.RingBuffer())
211  #self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
212  _pn.__del__()
213  return
214 
216  _pn = PublisherNew()
217  self.assertEqual(_pn.isActive(),False)
218  self.assertEqual(_pn.activate(),OpenRTM_aist.DataPortStatus.PORT_OK)
219  self.assertEqual(_pn.isActive(),True)
220  self.assertEqual(_pn.deactivate(),OpenRTM_aist.DataPortStatus.PORT_OK)
221  self.assertEqual(_pn.isActive(),False)
222  _pn.__del__()
223  return
224 
225  def test_pushAll(self):
226  _pn = PublisherNew()
227  prop = OpenRTM_aist.Properties()
228  cinfo = OpenRTM_aist.ConnectorInfo("",
229  "",
230  [],
231  prop)
232  self.assertEqual(_pn.setListener(cinfo,OpenRTM_aist.ConnectorListeners()),
233  OpenRTM_aist.DataPortStatus.PORT_OK)
234  prop = OpenRTM_aist.Properties()
235  prop.setProperty("publisher.push_policy","all")
236  prop.setProperty("publisher.skip_count","0")
237  prop.setProperty("thread_type","default")
238  prop.setProperty("measurement.exec_time","enable")
239  prop.setProperty("measurement.exec_count","0")
240  prop.setProperty("measurement.period_time","enable")
241  prop.setProperty("measurement.period_count","0")
242  retcode = _pn.init(prop)
243  cons = ConsumerMock()
244  self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
245  _pn.setBuffer(OpenRTM_aist.CdrRingBuffer())
246  _pn.activate()
247 
248  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
249  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
250  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
251  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
252  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
253  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
254  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
255  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
256  _pn.deactivate()
257  _pn.__del__()
258  return
259 
260  def test_pushFifo(self):
261  _pn = PublisherNew()
262  prop = OpenRTM_aist.Properties()
263  cinfo = OpenRTM_aist.ConnectorInfo("",
264  "",
265  [],
266  prop)
267  self.assertEqual(_pn.setListener(cinfo,OpenRTM_aist.ConnectorListeners()),
268  OpenRTM_aist.DataPortStatus.PORT_OK)
269  prop = OpenRTM_aist.Properties()
270  prop.setProperty("publisher.push_policy","fifo")
271  prop.setProperty("publisher.skip_count","0")
272  prop.setProperty("thread_type","default")
273  prop.setProperty("measurement.exec_time","enable")
274  prop.setProperty("measurement.exec_count","0")
275  prop.setProperty("measurement.period_time","enable")
276  prop.setProperty("measurement.period_count","0")
277  retcode = _pn.init(prop)
278  cons = ConsumerMock()
279  self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
280  _pn.setBuffer(OpenRTM_aist.CdrRingBuffer())
281  _pn.activate()
282 
283  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
284  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
285  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
286  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
287  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
288  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
289  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
290  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
291  _pn.deactivate()
292  _pn.__del__()
293  return
294 
295 
296  def test_pushSkip(self):
297  _pn = PublisherNew()
298  prop = OpenRTM_aist.Properties()
299  cinfo = OpenRTM_aist.ConnectorInfo("",
300  "",
301  [],
302  prop)
303  self.assertEqual(_pn.setListener(cinfo,OpenRTM_aist.ConnectorListeners()),
304  OpenRTM_aist.DataPortStatus.PORT_OK)
305  prop = OpenRTM_aist.Properties()
306  prop.setProperty("publisher.push_policy","skip")
307  prop.setProperty("publisher.skip_count","0")
308  prop.setProperty("thread_type","default")
309  prop.setProperty("measurement.exec_time","enable")
310  prop.setProperty("measurement.exec_count","0")
311  prop.setProperty("measurement.period_time","enable")
312  prop.setProperty("measurement.period_count","0")
313  retcode = _pn.init(prop)
314  cons = ConsumerMock()
315  self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
316  _pn.setBuffer(OpenRTM_aist.CdrRingBuffer())
317  _pn.activate()
318 
319  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
320  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
321  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
322  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
323  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
324  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
325  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
326  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
327  _pn.deactivate()
328  _pn.__del__()
329  return
330 
331  def test_pushNew(self):
332  _pn = PublisherNew()
333  prop = OpenRTM_aist.Properties()
334  cinfo = OpenRTM_aist.ConnectorInfo("",
335  "",
336  [],
337  prop)
338  self.assertEqual(_pn.setListener(cinfo,OpenRTM_aist.ConnectorListeners()),
339  OpenRTM_aist.DataPortStatus.PORT_OK)
340  prop = OpenRTM_aist.Properties()
341  prop.setProperty("publisher.push_policy","new")
342  prop.setProperty("publisher.skip_count","0")
343  prop.setProperty("thread_type","default")
344  prop.setProperty("measurement.exec_time","enable")
345  prop.setProperty("measurement.exec_count","0")
346  prop.setProperty("measurement.period_time","enable")
347  prop.setProperty("measurement.period_count","0")
348  retcode = _pn.init(prop)
349  cons = ConsumerMock()
350  self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
351  _pn.setBuffer(OpenRTM_aist.CdrRingBuffer())
352  _pn.activate()
353 
354  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
355  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
356  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
357  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
358  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
359  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
360  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
361  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
362  _pn.deactivate()
363  _pn.__del__()
364  return
365 
367  _pn = PublisherNew()
368  self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.BUFFER_OK,0),
369  OpenRTM_aist.DataPortStatus.PORT_OK)
370  self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.BUFFER_FULL,0),
371  OpenRTM_aist.DataPortStatus.BUFFER_FULL)
372  self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.TIMEOUT,0),
373  OpenRTM_aist.DataPortStatus.BUFFER_TIMEOUT)
374  self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET,0),
375  OpenRTM_aist.DataPortStatus.PRECONDITION_NOT_MET)
376  self.assertEqual(_pn.convertReturn(100,0),
377  OpenRTM_aist.DataPortStatus.PORT_ERROR)
378  _pn.__del__()
379  return
380 
381 
382 if __name__ == '__main__':
383  unittest.main()
def convertReturnCode(self, ret)
Return codes conversionReturnCode convertReturnCode(OpenRTM::PortStatus ret)
The Properties class represents a persistent set of properties.
Definition: Properties.py:83


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Mon Feb 28 2022 23:01:07