19 sys.path.insert(1,
"../")
24 from PublisherNew
import *
30 prop.setProperty(
"write.full_policy",
"do_nothing")
38 if ret == OpenRTM_aist.BufferStatus.BUFFER_OK:
41 elif ret == OpenRTM_aist.BufferStatus.BUFFER_ERROR:
42 return self.PORT_ERROR
44 elif ret == OpenRTM_aist.BufferStatus.BUFFER_FULL:
47 elif ret == OpenRTM_aist.BufferStatus.TIMEOUT:
48 return self.SEND_TIMEOUT
50 elif ret == OpenRTM_aist.BufferStatus.NOT_SUPPORTED:
51 return self.UNKNOWN_ERROR
54 return self.UNKNOWN_ERROR
60 if self._buffer.full(): 61 return OpenRTM_aist.DataPortStatus.BUFFER_FULL 63 ret = self._buffer.write(data) 64 if ret == OpenRTM_aist.BufferStatus.BUFFER_OK: 65 return OpenRTM_aist.DataPortStatus.PORT_OK 66 elif ret == OpenRTM_aist.BufferStatus.BUFFER_ERROR: 67 return OpenRTM_aist.DataPortStatus.PORT_ERROR 68 elif ret == OpenRTM_aist.BufferStatus.BUFFER_FULL: 69 return OpenRTM_aist.DataPortStatus.BUFFER_FULL 70 elif ret == OpenRTM_aist.BufferStatus.BUFFER_EMPTY: 71 return OpenRTM_aist.DataPortStatus.BUFFER_EMPTY 72 elif ret == OpenRTM_aist.BufferStatus.TIMEOUT: 73 return OpenRTM_aist.DataPortStatus.BUFFER_TIMEOUT 75 return OpenRTM_aist.DataPortStatus.UNKNOWN_ERROR 80 self._buffer.read(cdr)
84 ic = self._buffer.readable()
96 OpenRTM_aist.Manager.instance().shutdownManager()
104 self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, ret)
108 prop.setProperty(
"publisher.push_policy",
"new")
109 prop.setProperty(
"thread_type",
"bar")
110 prop.setProperty(
"measurement.exec_time",
"default")
111 prop.setProperty(
"measurement.period_count",
"1")
115 self.assertEqual(OpenRTM_aist.DataPortStatus.INVALID_ARGS, ret)
120 prop.setProperty(
"publisher.push_policy",
"all")
121 prop.setProperty(
"publisher.skip_count",
"0")
122 prop.setProperty(
"thread_type",
"default")
123 prop.setProperty(
"measurement.exec_time",
"enable")
124 prop.setProperty(
"measurement.exec_count",
"0")
125 prop.setProperty(
"measurement.period_time",
"enable")
126 prop.setProperty(
"measurement.period_count",
"0")
127 retcode = _pn.init(prop)
128 self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
132 prop.setProperty(
"publisher.push_policy",
"fifo")
133 prop.setProperty(
"publisher.skip_count",
"1")
134 prop.setProperty(
"thread_type",
"default")
135 prop.setProperty(
"measurement.exec_time",
"disable")
136 prop.setProperty(
"measurement.exec_count",
"1")
137 prop.setProperty(
"measurement.period_time",
"disable")
138 prop.setProperty(
"measurement.period_count",
"0")
139 retcode = _pn.init(prop)
140 self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
144 prop.setProperty(
"publisher.push_policy",
"fifo")
145 prop.setProperty(
"publisher.skip_count",
"1")
146 prop.setProperty(
"thread_type",
"default")
147 prop.setProperty(
"measurement.exec_time",
"disable")
148 prop.setProperty(
"measurement.exec_count",
"1")
149 prop.setProperty(
"measurement.period_time",
"disable")
150 prop.setProperty(
"measurement.period_count",
"1")
151 retcode = _pn.init(prop)
152 self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
156 prop.setProperty(
"publisher.push_policy",
"skip")
157 prop.setProperty(
"publisher.skip_count",
"-1")
158 prop.setProperty(
"thread_type",
"default")
159 prop.setProperty(
"measurement.exec_time",
"bar")
160 prop.setProperty(
"measurement.exec_count",
"-1")
161 prop.setProperty(
"measurement.period_time",
"bar")
162 prop.setProperty(
"measurement.period_count",
"-1")
163 retcode = _pn.init(prop)
164 self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
168 prop.setProperty(
"publisher.push_policy",
"new")
169 prop.setProperty(
"publisher.skip_count",
"1")
170 prop.setProperty(
"thread_type",
"default")
171 prop.setProperty(
"measurement.exec_time",
"enable")
172 prop.setProperty(
"measurement.exec_count",
"1")
173 prop.setProperty(
"measurement.period_time",
"enable")
174 prop.setProperty(
"measurement.period_count",
"1")
175 retcode = _pn.init(prop)
176 self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
180 prop.setProperty(
"publisher.push_policy",
"bar")
181 prop.setProperty(
"publisher.skip_count",
"0")
182 prop.setProperty(
"thread_type",
"default")
183 prop.setProperty(
"measurement.exec_time",
"enable")
184 prop.setProperty(
"measurement.exec_count",
"0")
185 prop.setProperty(
"measurement.period_time",
"enable")
186 prop.setProperty(
"measurement.period_count",
"0")
187 retcode = _pn.init(prop)
188 self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
194 self.assertEqual(_pn.setConsumer(
None),OpenRTM_aist.DataPortStatus.INVALID_ARGS)
195 self.assertEqual(_pn.setConsumer(
ConsumerMock()),OpenRTM_aist.DataPortStatus.PORT_OK)
201 self.assertEqual(_pn.setBuffer(
None),OpenRTM_aist.DataPortStatus.INVALID_ARGS)
209 retcode = _pn.init(prop)
217 self.assertEqual(_pn.isActive(),
False)
218 self.assertEqual(_pn.activate(),OpenRTM_aist.DataPortStatus.PORT_OK)
219 self.assertEqual(_pn.isActive(),
True)
220 self.assertEqual(_pn.deactivate(),OpenRTM_aist.DataPortStatus.PORT_OK)
221 self.assertEqual(_pn.isActive(),
False)
233 OpenRTM_aist.DataPortStatus.PORT_OK)
235 prop.setProperty(
"publisher.push_policy",
"all")
236 prop.setProperty(
"publisher.skip_count",
"0")
237 prop.setProperty(
"thread_type",
"default")
238 prop.setProperty(
"measurement.exec_time",
"enable")
239 prop.setProperty(
"measurement.exec_count",
"0")
240 prop.setProperty(
"measurement.period_time",
"enable")
241 prop.setProperty(
"measurement.period_count",
"0")
242 retcode = _pn.init(prop)
244 self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
248 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
249 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
250 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
251 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
252 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
253 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
254 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
255 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
268 OpenRTM_aist.DataPortStatus.PORT_OK)
270 prop.setProperty(
"publisher.push_policy",
"fifo")
271 prop.setProperty(
"publisher.skip_count",
"0")
272 prop.setProperty(
"thread_type",
"default")
273 prop.setProperty(
"measurement.exec_time",
"enable")
274 prop.setProperty(
"measurement.exec_count",
"0")
275 prop.setProperty(
"measurement.period_time",
"enable")
276 prop.setProperty(
"measurement.period_count",
"0")
277 retcode = _pn.init(prop)
279 self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
283 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
284 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
285 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
286 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
287 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
288 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
289 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
290 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
304 OpenRTM_aist.DataPortStatus.PORT_OK)
306 prop.setProperty(
"publisher.push_policy",
"skip")
307 prop.setProperty(
"publisher.skip_count",
"0")
308 prop.setProperty(
"thread_type",
"default")
309 prop.setProperty(
"measurement.exec_time",
"enable")
310 prop.setProperty(
"measurement.exec_count",
"0")
311 prop.setProperty(
"measurement.period_time",
"enable")
312 prop.setProperty(
"measurement.period_count",
"0")
313 retcode = _pn.init(prop)
315 self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
319 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
320 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
321 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
322 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
323 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
324 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
325 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
326 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
339 OpenRTM_aist.DataPortStatus.PORT_OK)
341 prop.setProperty(
"publisher.push_policy",
"new")
342 prop.setProperty(
"publisher.skip_count",
"0")
343 prop.setProperty(
"thread_type",
"default")
344 prop.setProperty(
"measurement.exec_time",
"enable")
345 prop.setProperty(
"measurement.exec_count",
"0")
346 prop.setProperty(
"measurement.period_time",
"enable")
347 prop.setProperty(
"measurement.period_count",
"0")
348 retcode = _pn.init(prop)
350 self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
354 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
355 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
356 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
357 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
358 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
359 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
360 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
361 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
368 self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.BUFFER_OK,0),
369 OpenRTM_aist.DataPortStatus.PORT_OK)
370 self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.BUFFER_FULL,0),
371 OpenRTM_aist.DataPortStatus.BUFFER_FULL)
372 self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.TIMEOUT,0),
373 OpenRTM_aist.DataPortStatus.BUFFER_TIMEOUT)
374 self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET,0),
375 OpenRTM_aist.DataPortStatus.PRECONDITION_NOT_MET)
376 self.assertEqual(_pn.convertReturn(100,0),
377 OpenRTM_aist.DataPortStatus.PORT_ERROR)
382 if __name__ ==
'__main__':
def convertReturnCode(self, ret)
Return codes conversionReturnCode convertReturnCode(OpenRTM::PortStatus ret)
def test_setConsumer(self)
The Properties class represents a persistent set of properties.
def convertReturnCode(self, ret)
def test_activate_deactivate_isActive(self)
def get_m_put_data_len(self)
InPortCorbaCdrConsumer class.
def test_convertReturn(self)