test_PublisherPeriodic.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: euc-jp -*-
3 
4 #
5 # \file test_PublisherPeriodic.py
6 # \brief test for PublisherPeriodic class
7 # \date $Date: 2007/09/27 $
8 # \author Shinji Kurihara
9 #
10 # Copyright (C) 2006
11 # Noriaki Ando
12 # Task-intelligence Research Group,
13 # Intelligent Systems Research Institute,
14 # National Institute of
15 # Advanced Industrial Science and Technology (AIST), Japan
16 # All rights reserved.
17 
18 import sys,time
19 sys.path.insert(1,"../")
20 
21 import unittest
22 
23 import OpenRTM_aist
24 from PublisherPeriodic import *
25 
27  def __init__(self):
30  prop.setProperty("write.full_policy","do_nothing")
31  buff.init(prop)
32  self._buffer = buff
33 
34  def __del__(self):
35  pass
36 
37  def convertReturnCode(self, ret):
38  if ret == OpenRTM_aist.BufferStatus.BUFFER_OK:
39  return self.PORT_OK
40 
41  elif ret == OpenRTM_aist.BufferStatus.BUFFER_ERROR:
42  return self.PORT_ERROR
43 
44  elif ret == OpenRTM_aist.BufferStatus.BUFFER_FULL:
45  return self.SEND_FULL
46 
47  elif ret == OpenRTM_aist.BufferStatus.TIMEOUT:
48  return self.SEND_TIMEOUT
49 
50  elif ret == OpenRTM_aist.BufferStatus.NOT_SUPPORTED:
51  return self.UNKNOWN_ERROR
52 
53  else:
54  return self.UNKNOWN_ERROR
55 
56  def put(self,data):
57  return self.convertReturnCode(self._buffer.write(data))
58 
59  """
60  if self._buffer.full():
61  return OpenRTM_aist.DataPortStatus.BUFFER_FULL
62 
63  ret = self._buffer.write(data)
64  if ret == OpenRTM_aist.BufferStatus.BUFFER_OK:
65  return OpenRTM_aist.DataPortStatus.PORT_OK
66  elif ret == OpenRTM_aist.BufferStatus.BUFFER_ERROR:
67  return OpenRTM_aist.DataPortStatus.PORT_ERROR
68  elif ret == OpenRTM_aist.BufferStatus.BUFFER_FULL:
69  return OpenRTM_aist.DataPortStatus.BUFFER_FULL
70  elif ret == OpenRTM_aist.BufferStatus.BUFFER_EMPTY:
71  return OpenRTM_aist.DataPortStatus.BUFFER_EMPTY
72  elif ret == OpenRTM_aist.BufferStatus.TIMEOUT:
73  return OpenRTM_aist.DataPortStatus.BUFFER_TIMEOUT
74  else:
75  return OpenRTM_aist.DataPortStatus.UNKNOWN_ERROR
76  """
77 
78  def get_m_put_data(self):
79  cdr = [0]
80  self._buffer.read(cdr)
81  return cdr[0]
82 
83  def get_m_put_data_len(self):
84  ic = self._buffer.readable()
85  return ic
86 
87 
88 
89 class TestPublisherPeriodic(unittest.TestCase):
90 
91  def setUp(self):
92  time.sleep(0.1)
93  return
94 
95  def tearDown(self):
96  time.sleep(0.1)
97  OpenRTM_aist.Manager.instance().shutdownManager()
98  return
99 
100  def test_init(self):
101  _pn = PublisherPeriodic()
102  prop = OpenRTM_aist.Properties()
103  # Propertiesが空の状態でも正常に動作することを確認する ret = _pn.init(prop) self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, ret) time.sleep(0.1) _pn.__del__() _pn = PublisherPeriodic() prop.setProperty("publisher.push_policy","new") prop.setProperty("thread_type","bar") prop.setProperty("measurement.exec_time","default") prop.setProperty("measurement.period_count","1") #thread_type が不正の場合 INVALID_ARGS を返すことを確認する。 ret = _pn.init(prop) self.assertEqual(OpenRTM_aist.DataPortStatus.INVALID_ARGS, ret) _pn.__del__() _pn = PublisherPeriodic() #以下のpropertiesの設定で動作することを確認する。 prop.setProperty("publisher.push_policy","all") prop.setProperty("publisher.skip_count","0") prop.setProperty("thread_type","default") prop.setProperty("measurement.exec_time","enable") prop.setProperty("measurement.exec_count","0") prop.setProperty("measurement.period_time","enable") prop.setProperty("measurement.period_count","0") retcode = _pn.init(prop) self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode) _pn.__del__() _pn = PublisherPeriodic() prop.setProperty("publisher.push_policy","fifo") prop.setProperty("publisher.skip_count","1") prop.setProperty("thread_type","default") prop.setProperty("measurement.exec_time","disable") prop.setProperty("measurement.exec_count","1") prop.setProperty("measurement.period_time","disable") prop.setProperty("measurement.period_count","0") retcode = _pn.init(prop) self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode) _pn.__del__() _pn = PublisherPeriodic() prop.setProperty("publisher.push_policy","fifo") prop.setProperty("publisher.skip_count","1") prop.setProperty("thread_type","default") prop.setProperty("measurement.exec_time","disable") prop.setProperty("measurement.exec_count","1") prop.setProperty("measurement.period_time","disable") prop.setProperty("measurement.period_count","1") retcode = _pn.init(prop) self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode) _pn.__del__() _pn = PublisherPeriodic() prop.setProperty("publisher.push_policy","skip") prop.setProperty("publisher.skip_count","-1") prop.setProperty("thread_type","default") prop.setProperty("measurement.exec_time","bar") prop.setProperty("measurement.exec_count","-1") prop.setProperty("measurement.period_time","bar") prop.setProperty("measurement.period_count","-1") retcode = _pn.init(prop) self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode) _pn.__del__() _pn = PublisherPeriodic() prop.setProperty("publisher.push_policy","new") prop.setProperty("publisher.skip_count","1") prop.setProperty("thread_type","default") prop.setProperty("measurement.exec_time","enable") prop.setProperty("measurement.exec_count","1") prop.setProperty("measurement.period_time","enable") prop.setProperty("measurement.period_count","1") retcode = _pn.init(prop) self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode) _pn.__del__() _pn = PublisherPeriodic() prop.setProperty("publisher.push_policy","bar") prop.setProperty("publisher.skip_count","0") prop.setProperty("thread_type","default") prop.setProperty("measurement.exec_time","enable") prop.setProperty("measurement.exec_count","0") prop.setProperty("measurement.period_time","enable") prop.setProperty("measurement.period_count","0") retcode = _pn.init(prop) self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode) _pn.__del__() return def test_setConsumer(self): _pn = PublisherPeriodic() self.assertEqual(_pn.setConsumer(None),OpenRTM_aist.DataPortStatus.INVALID_ARGS) self.assertEqual(_pn.setConsumer(ConsumerMock()),OpenRTM_aist.DataPortStatus.PORT_OK) _pn.__del__() return def test_setBuffer(self): _pn = PublisherPeriodic() self.assertEqual(_pn.setBuffer(None),OpenRTM_aist.DataPortStatus.INVALID_ARGS) self.assertEqual(_pn.setBuffer(OpenRTM_aist.RingBuffer()),OpenRTM_aist.DataPortStatus.PORT_OK) _pn.__del__() return def test_write(self): _pn = PublisherPeriodic() prop = OpenRTM_aist.Properties() retcode = _pn.init(prop) _pn.setBuffer(OpenRTM_aist.RingBuffer()) _pn.__del__() #self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) return def test_activate_deactivate_isActive(self): _pn = PublisherPeriodic() prop = OpenRTM_aist.Properties() retcode = _pn.init(prop) self.assertEqual(_pn.isActive(),False) cons = ConsumerMock() self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK) _pn.setBuffer(OpenRTM_aist.CdrRingBuffer()) self.assertEqual(_pn.activate(),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.isActive(),True) self.assertEqual(_pn.deactivate(),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.isActive(),False) _pn.__del__() return def test_pushAll(self): _pn = PublisherPeriodic() prop = OpenRTM_aist.Properties() cinfo = OpenRTM_aist.ConnectorInfo("", "", [], prop) self.assertEqual(_pn.setListener(cinfo,OpenRTM_aist.ConnectorListeners()), OpenRTM_aist.DataPortStatus.PORT_OK) prop = OpenRTM_aist.Properties() prop.setProperty("publisher.push_policy","all") prop.setProperty("publisher.skip_count","0") prop.setProperty("thread_type","default") prop.setProperty("measurement.exec_time","enable") prop.setProperty("measurement.exec_count","0") prop.setProperty("measurement.period_time","enable") prop.setProperty("measurement.period_count","0") retcode = _pn.init(prop) cons = ConsumerMock() self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK) _pn.setBuffer(OpenRTM_aist.CdrRingBuffer()) _pn.activate() self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) _pn.deactivate() _pn.__del__() return def test_pushFifo(self): _pn = PublisherPeriodic() prop = OpenRTM_aist.Properties() cinfo = OpenRTM_aist.ConnectorInfo("", "", [], prop) self.assertEqual(_pn.setListener(cinfo,OpenRTM_aist.ConnectorListeners()), OpenRTM_aist.DataPortStatus.PORT_OK) prop = OpenRTM_aist.Properties() prop.setProperty("publisher.push_policy","fifo") prop.setProperty("publisher.skip_count","0") prop.setProperty("thread_type","default") prop.setProperty("measurement.exec_time","enable") prop.setProperty("measurement.exec_count","0") prop.setProperty("measurement.period_time","enable") prop.setProperty("measurement.period_count","0") retcode = _pn.init(prop) cons = ConsumerMock() self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK) _pn.setBuffer(OpenRTM_aist.CdrRingBuffer()) _pn.activate() self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) time.sleep(0.01) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) time.sleep(0.01) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) time.sleep(0.01) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) time.sleep(0.01) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) time.sleep(0.01) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) time.sleep(0.01) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) time.sleep(0.01) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) time.sleep(0.01) _pn.deactivate() _pn.__del__() return def test_pushSkip(self): _pn = PublisherPeriodic() prop = OpenRTM_aist.Properties() cinfo = OpenRTM_aist.ConnectorInfo("", "", [], prop) self.assertEqual(_pn.setListener(cinfo,OpenRTM_aist.ConnectorListeners()), OpenRTM_aist.DataPortStatus.PORT_OK) prop = OpenRTM_aist.Properties() prop.setProperty("publisher.push_policy","skip") prop.setProperty("publisher.skip_count","0") prop.setProperty("thread_type","default") prop.setProperty("measurement.exec_time","enable") prop.setProperty("measurement.exec_count","0") prop.setProperty("measurement.period_time","enable") prop.setProperty("measurement.period_count","0") retcode = _pn.init(prop) cons = ConsumerMock() self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK) _pn.setBuffer(OpenRTM_aist.CdrRingBuffer()) _pn.activate() self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) _pn.deactivate() _pn.__del__() return def test_pushNew(self): _pn = PublisherPeriodic() prop = OpenRTM_aist.Properties() cinfo = OpenRTM_aist.ConnectorInfo("", "", [], prop) self.assertEqual(_pn.setListener(cinfo,OpenRTM_aist.ConnectorListeners()), OpenRTM_aist.DataPortStatus.PORT_OK) prop = OpenRTM_aist.Properties() prop.setProperty("publisher.push_policy","new") prop.setProperty("publisher.skip_count","0") prop.setProperty("thread_type","default") prop.setProperty("measurement.exec_time","enable") prop.setProperty("measurement.exec_count","0") prop.setProperty("measurement.period_time","enable") prop.setProperty("measurement.period_count","0") retcode = _pn.init(prop) cons = ConsumerMock() self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK) _pn.setBuffer(OpenRTM_aist.CdrRingBuffer()) _pn.activate() self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) time.sleep(0.01) self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK) _pn.deactivate() time.sleep(0.01) _pn.__del__() return def test_convertReturn(self): _pn = PublisherPeriodic() self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.BUFFER_OK,0), OpenRTM_aist.DataPortStatus.PORT_OK) self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.BUFFER_EMPTY,0), OpenRTM_aist.DataPortStatus.PORT_ERROR) self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.TIMEOUT,0), OpenRTM_aist.DataPortStatus.BUFFER_TIMEOUT) self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET,0), OpenRTM_aist.DataPortStatus.PRECONDITION_NOT_MET) self.assertEqual(_pn.convertReturn(100,0), OpenRTM_aist.DataPortStatus.PORT_ERROR) _pn.__del__() return ############ test ################# if __name__ == '__main__': unittest.main()
104  ret = _pn.init(prop)
105  self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, ret)
106  time.sleep(0.1)
107  _pn.__del__()
108 
109  _pn = PublisherPeriodic()
110  prop.setProperty("publisher.push_policy","new")
111  prop.setProperty("thread_type","bar")
112  prop.setProperty("measurement.exec_time","default")
113  prop.setProperty("measurement.period_count","1")
114 
115  #thread_type が不正の場合 INVALID_ARGS を返すことを確認する。
116  ret = _pn.init(prop)
117  self.assertEqual(OpenRTM_aist.DataPortStatus.INVALID_ARGS, ret)
118  _pn.__del__()
119 
120  _pn = PublisherPeriodic()
121  #以下のpropertiesの設定で動作することを確認する。
122  prop.setProperty("publisher.push_policy","all")
123  prop.setProperty("publisher.skip_count","0")
124  prop.setProperty("thread_type","default")
125  prop.setProperty("measurement.exec_time","enable")
126  prop.setProperty("measurement.exec_count","0")
127  prop.setProperty("measurement.period_time","enable")
128  prop.setProperty("measurement.period_count","0")
129  retcode = _pn.init(prop)
130  self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
131  _pn.__del__()
132 
133  _pn = PublisherPeriodic()
134  prop.setProperty("publisher.push_policy","fifo")
135  prop.setProperty("publisher.skip_count","1")
136  prop.setProperty("thread_type","default")
137  prop.setProperty("measurement.exec_time","disable")
138  prop.setProperty("measurement.exec_count","1")
139  prop.setProperty("measurement.period_time","disable")
140  prop.setProperty("measurement.period_count","0")
141  retcode = _pn.init(prop)
142  self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
143  _pn.__del__()
144 
145  _pn = PublisherPeriodic()
146  prop.setProperty("publisher.push_policy","fifo")
147  prop.setProperty("publisher.skip_count","1")
148  prop.setProperty("thread_type","default")
149  prop.setProperty("measurement.exec_time","disable")
150  prop.setProperty("measurement.exec_count","1")
151  prop.setProperty("measurement.period_time","disable")
152  prop.setProperty("measurement.period_count","1")
153  retcode = _pn.init(prop)
154  self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
155  _pn.__del__()
156 
157  _pn = PublisherPeriodic()
158  prop.setProperty("publisher.push_policy","skip")
159  prop.setProperty("publisher.skip_count","-1")
160  prop.setProperty("thread_type","default")
161  prop.setProperty("measurement.exec_time","bar")
162  prop.setProperty("measurement.exec_count","-1")
163  prop.setProperty("measurement.period_time","bar")
164  prop.setProperty("measurement.period_count","-1")
165  retcode = _pn.init(prop)
166  self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
167  _pn.__del__()
168 
169  _pn = PublisherPeriodic()
170  prop.setProperty("publisher.push_policy","new")
171  prop.setProperty("publisher.skip_count","1")
172  prop.setProperty("thread_type","default")
173  prop.setProperty("measurement.exec_time","enable")
174  prop.setProperty("measurement.exec_count","1")
175  prop.setProperty("measurement.period_time","enable")
176  prop.setProperty("measurement.period_count","1")
177  retcode = _pn.init(prop)
178  self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
179  _pn.__del__()
180 
181  _pn = PublisherPeriodic()
182  prop.setProperty("publisher.push_policy","bar")
183  prop.setProperty("publisher.skip_count","0")
184  prop.setProperty("thread_type","default")
185  prop.setProperty("measurement.exec_time","enable")
186  prop.setProperty("measurement.exec_count","0")
187  prop.setProperty("measurement.period_time","enable")
188  prop.setProperty("measurement.period_count","0")
189  retcode = _pn.init(prop)
190  self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
191  _pn.__del__()
192 
193  return
194 
195  def test_setConsumer(self):
196  _pn = PublisherPeriodic()
197  self.assertEqual(_pn.setConsumer(None),OpenRTM_aist.DataPortStatus.INVALID_ARGS)
198  self.assertEqual(_pn.setConsumer(ConsumerMock()),OpenRTM_aist.DataPortStatus.PORT_OK)
199  _pn.__del__()
200  return
201 
202  def test_setBuffer(self):
203  _pn = PublisherPeriodic()
204  self.assertEqual(_pn.setBuffer(None),OpenRTM_aist.DataPortStatus.INVALID_ARGS)
205  self.assertEqual(_pn.setBuffer(OpenRTM_aist.RingBuffer()),OpenRTM_aist.DataPortStatus.PORT_OK)
206  _pn.__del__()
207  return
208 
209  def test_write(self):
210  _pn = PublisherPeriodic()
211  prop = OpenRTM_aist.Properties()
212  retcode = _pn.init(prop)
213  _pn.setBuffer(OpenRTM_aist.RingBuffer())
214  _pn.__del__()
215  #self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
216  return
217 
219  _pn = PublisherPeriodic()
220  prop = OpenRTM_aist.Properties()
221  retcode = _pn.init(prop)
222  self.assertEqual(_pn.isActive(),False)
223  cons = ConsumerMock()
224  self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
225  _pn.setBuffer(OpenRTM_aist.CdrRingBuffer())
226  self.assertEqual(_pn.activate(),OpenRTM_aist.DataPortStatus.PORT_OK)
227  self.assertEqual(_pn.isActive(),True)
228  self.assertEqual(_pn.deactivate(),OpenRTM_aist.DataPortStatus.PORT_OK)
229  self.assertEqual(_pn.isActive(),False)
230  _pn.__del__()
231  return
232 
233  def test_pushAll(self):
234  _pn = PublisherPeriodic()
235  prop = OpenRTM_aist.Properties()
236  cinfo = OpenRTM_aist.ConnectorInfo("",
237  "",
238  [],
239  prop)
240  self.assertEqual(_pn.setListener(cinfo,OpenRTM_aist.ConnectorListeners()),
241  OpenRTM_aist.DataPortStatus.PORT_OK)
242  prop = OpenRTM_aist.Properties()
243  prop.setProperty("publisher.push_policy","all")
244  prop.setProperty("publisher.skip_count","0")
245  prop.setProperty("thread_type","default")
246  prop.setProperty("measurement.exec_time","enable")
247  prop.setProperty("measurement.exec_count","0")
248  prop.setProperty("measurement.period_time","enable")
249  prop.setProperty("measurement.period_count","0")
250  retcode = _pn.init(prop)
251  cons = ConsumerMock()
252  self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
253  _pn.setBuffer(OpenRTM_aist.CdrRingBuffer())
254  _pn.activate()
255 
256  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
257  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
258  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
259  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
260  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
261  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
262  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
263  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
264  _pn.deactivate()
265  _pn.__del__()
266  return
267 
268  def test_pushFifo(self):
269  _pn = PublisherPeriodic()
270  prop = OpenRTM_aist.Properties()
271  cinfo = OpenRTM_aist.ConnectorInfo("",
272  "",
273  [],
274  prop)
275  self.assertEqual(_pn.setListener(cinfo,OpenRTM_aist.ConnectorListeners()),
276  OpenRTM_aist.DataPortStatus.PORT_OK)
277  prop = OpenRTM_aist.Properties()
278  prop.setProperty("publisher.push_policy","fifo")
279  prop.setProperty("publisher.skip_count","0")
280  prop.setProperty("thread_type","default")
281  prop.setProperty("measurement.exec_time","enable")
282  prop.setProperty("measurement.exec_count","0")
283  prop.setProperty("measurement.period_time","enable")
284  prop.setProperty("measurement.period_count","0")
285  retcode = _pn.init(prop)
286  cons = ConsumerMock()
287  self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
288  _pn.setBuffer(OpenRTM_aist.CdrRingBuffer())
289  _pn.activate()
290 
291  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
292  time.sleep(0.01)
293  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
294  time.sleep(0.01)
295  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
296  time.sleep(0.01)
297  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
298  time.sleep(0.01)
299  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
300  time.sleep(0.01)
301  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
302  time.sleep(0.01)
303  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
304  time.sleep(0.01)
305  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
306  time.sleep(0.01)
307  _pn.deactivate()
308  _pn.__del__()
309  return
310 
311 
312  def test_pushSkip(self):
313  _pn = PublisherPeriodic()
314  prop = OpenRTM_aist.Properties()
315  cinfo = OpenRTM_aist.ConnectorInfo("",
316  "",
317  [],
318  prop)
319  self.assertEqual(_pn.setListener(cinfo,OpenRTM_aist.ConnectorListeners()),
320  OpenRTM_aist.DataPortStatus.PORT_OK)
321  prop = OpenRTM_aist.Properties()
322  prop.setProperty("publisher.push_policy","skip")
323  prop.setProperty("publisher.skip_count","0")
324  prop.setProperty("thread_type","default")
325  prop.setProperty("measurement.exec_time","enable")
326  prop.setProperty("measurement.exec_count","0")
327  prop.setProperty("measurement.period_time","enable")
328  prop.setProperty("measurement.period_count","0")
329  retcode = _pn.init(prop)
330  cons = ConsumerMock()
331  self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
332  _pn.setBuffer(OpenRTM_aist.CdrRingBuffer())
333  _pn.activate()
334 
335  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
336  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
337  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
338  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
339  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
340  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
341  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
342  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
343  _pn.deactivate()
344  _pn.__del__()
345  return
346 
347  def test_pushNew(self):
348  _pn = PublisherPeriodic()
349  prop = OpenRTM_aist.Properties()
350  cinfo = OpenRTM_aist.ConnectorInfo("",
351  "",
352  [],
353  prop)
354  self.assertEqual(_pn.setListener(cinfo,OpenRTM_aist.ConnectorListeners()),
355  OpenRTM_aist.DataPortStatus.PORT_OK)
356  prop = OpenRTM_aist.Properties()
357  prop.setProperty("publisher.push_policy","new")
358  prop.setProperty("publisher.skip_count","0")
359  prop.setProperty("thread_type","default")
360  prop.setProperty("measurement.exec_time","enable")
361  prop.setProperty("measurement.exec_count","0")
362  prop.setProperty("measurement.period_time","enable")
363  prop.setProperty("measurement.period_count","0")
364  retcode = _pn.init(prop)
365  cons = ConsumerMock()
366  self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
367  _pn.setBuffer(OpenRTM_aist.CdrRingBuffer())
368  _pn.activate()
369 
370  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
371  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
372  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
373  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
374  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
375  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
376  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
377  time.sleep(0.01)
378  self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
379  _pn.deactivate()
380  time.sleep(0.01)
381  _pn.__del__()
382  return
383 
385  _pn = PublisherPeriodic()
386  self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.BUFFER_OK,0),
387  OpenRTM_aist.DataPortStatus.PORT_OK)
388  self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.BUFFER_EMPTY,0),
389  OpenRTM_aist.DataPortStatus.PORT_ERROR)
390  self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.TIMEOUT,0),
391  OpenRTM_aist.DataPortStatus.BUFFER_TIMEOUT)
392  self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET,0),
393  OpenRTM_aist.DataPortStatus.PRECONDITION_NOT_MET)
394  self.assertEqual(_pn.convertReturn(100,0),
395  OpenRTM_aist.DataPortStatus.PORT_ERROR)
396  _pn.__del__()
397  return
398 
399 
400 if __name__ == '__main__':
401  unittest.main()
def convertReturnCode(self, ret)
Return codes conversionReturnCode convertReturnCode(OpenRTM::PortStatus ret)
The Properties class represents a persistent set of properties.
Definition: Properties.py:83


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Mon Feb 28 2022 23:01:07