19 sys.path.insert(1,
"../")
24 from PublisherPeriodic
import *
30 prop.setProperty(
"write.full_policy",
"do_nothing")
38 if ret == OpenRTM_aist.BufferStatus.BUFFER_OK:
41 elif ret == OpenRTM_aist.BufferStatus.BUFFER_ERROR:
42 return self.PORT_ERROR
44 elif ret == OpenRTM_aist.BufferStatus.BUFFER_FULL:
47 elif ret == OpenRTM_aist.BufferStatus.TIMEOUT:
48 return self.SEND_TIMEOUT
50 elif ret == OpenRTM_aist.BufferStatus.NOT_SUPPORTED:
51 return self.UNKNOWN_ERROR
54 return self.UNKNOWN_ERROR
60 if self._buffer.full(): 61 return OpenRTM_aist.DataPortStatus.BUFFER_FULL 63 ret = self._buffer.write(data) 64 if ret == OpenRTM_aist.BufferStatus.BUFFER_OK: 65 return OpenRTM_aist.DataPortStatus.PORT_OK 66 elif ret == OpenRTM_aist.BufferStatus.BUFFER_ERROR: 67 return OpenRTM_aist.DataPortStatus.PORT_ERROR 68 elif ret == OpenRTM_aist.BufferStatus.BUFFER_FULL: 69 return OpenRTM_aist.DataPortStatus.BUFFER_FULL 70 elif ret == OpenRTM_aist.BufferStatus.BUFFER_EMPTY: 71 return OpenRTM_aist.DataPortStatus.BUFFER_EMPTY 72 elif ret == OpenRTM_aist.BufferStatus.TIMEOUT: 73 return OpenRTM_aist.DataPortStatus.BUFFER_TIMEOUT 75 return OpenRTM_aist.DataPortStatus.UNKNOWN_ERROR 80 self._buffer.read(cdr)
84 ic = self._buffer.readable()
97 OpenRTM_aist.Manager.instance().shutdownManager()
105 self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, ret)
110 prop.setProperty(
"publisher.push_policy",
"new")
111 prop.setProperty(
"thread_type",
"bar")
112 prop.setProperty(
"measurement.exec_time",
"default")
113 prop.setProperty(
"measurement.period_count",
"1")
117 self.assertEqual(OpenRTM_aist.DataPortStatus.INVALID_ARGS, ret)
122 prop.setProperty(
"publisher.push_policy",
"all")
123 prop.setProperty(
"publisher.skip_count",
"0")
124 prop.setProperty(
"thread_type",
"default")
125 prop.setProperty(
"measurement.exec_time",
"enable")
126 prop.setProperty(
"measurement.exec_count",
"0")
127 prop.setProperty(
"measurement.period_time",
"enable")
128 prop.setProperty(
"measurement.period_count",
"0")
129 retcode = _pn.init(prop)
130 self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
134 prop.setProperty(
"publisher.push_policy",
"fifo")
135 prop.setProperty(
"publisher.skip_count",
"1")
136 prop.setProperty(
"thread_type",
"default")
137 prop.setProperty(
"measurement.exec_time",
"disable")
138 prop.setProperty(
"measurement.exec_count",
"1")
139 prop.setProperty(
"measurement.period_time",
"disable")
140 prop.setProperty(
"measurement.period_count",
"0")
141 retcode = _pn.init(prop)
142 self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
146 prop.setProperty(
"publisher.push_policy",
"fifo")
147 prop.setProperty(
"publisher.skip_count",
"1")
148 prop.setProperty(
"thread_type",
"default")
149 prop.setProperty(
"measurement.exec_time",
"disable")
150 prop.setProperty(
"measurement.exec_count",
"1")
151 prop.setProperty(
"measurement.period_time",
"disable")
152 prop.setProperty(
"measurement.period_count",
"1")
153 retcode = _pn.init(prop)
154 self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
158 prop.setProperty(
"publisher.push_policy",
"skip")
159 prop.setProperty(
"publisher.skip_count",
"-1")
160 prop.setProperty(
"thread_type",
"default")
161 prop.setProperty(
"measurement.exec_time",
"bar")
162 prop.setProperty(
"measurement.exec_count",
"-1")
163 prop.setProperty(
"measurement.period_time",
"bar")
164 prop.setProperty(
"measurement.period_count",
"-1")
165 retcode = _pn.init(prop)
166 self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
170 prop.setProperty(
"publisher.push_policy",
"new")
171 prop.setProperty(
"publisher.skip_count",
"1")
172 prop.setProperty(
"thread_type",
"default")
173 prop.setProperty(
"measurement.exec_time",
"enable")
174 prop.setProperty(
"measurement.exec_count",
"1")
175 prop.setProperty(
"measurement.period_time",
"enable")
176 prop.setProperty(
"measurement.period_count",
"1")
177 retcode = _pn.init(prop)
178 self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
182 prop.setProperty(
"publisher.push_policy",
"bar")
183 prop.setProperty(
"publisher.skip_count",
"0")
184 prop.setProperty(
"thread_type",
"default")
185 prop.setProperty(
"measurement.exec_time",
"enable")
186 prop.setProperty(
"measurement.exec_count",
"0")
187 prop.setProperty(
"measurement.period_time",
"enable")
188 prop.setProperty(
"measurement.period_count",
"0")
189 retcode = _pn.init(prop)
190 self.assertEqual(OpenRTM_aist.DataPortStatus.PORT_OK, retcode)
197 self.assertEqual(_pn.setConsumer(
None),OpenRTM_aist.DataPortStatus.INVALID_ARGS)
198 self.assertEqual(_pn.setConsumer(
ConsumerMock()),OpenRTM_aist.DataPortStatus.PORT_OK)
204 self.assertEqual(_pn.setBuffer(
None),OpenRTM_aist.DataPortStatus.INVALID_ARGS)
212 retcode = _pn.init(prop)
221 retcode = _pn.init(prop)
222 self.assertEqual(_pn.isActive(),
False)
224 self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
226 self.assertEqual(_pn.activate(),OpenRTM_aist.DataPortStatus.PORT_OK)
227 self.assertEqual(_pn.isActive(),
True)
228 self.assertEqual(_pn.deactivate(),OpenRTM_aist.DataPortStatus.PORT_OK)
229 self.assertEqual(_pn.isActive(),
False)
241 OpenRTM_aist.DataPortStatus.PORT_OK)
243 prop.setProperty(
"publisher.push_policy",
"all")
244 prop.setProperty(
"publisher.skip_count",
"0")
245 prop.setProperty(
"thread_type",
"default")
246 prop.setProperty(
"measurement.exec_time",
"enable")
247 prop.setProperty(
"measurement.exec_count",
"0")
248 prop.setProperty(
"measurement.period_time",
"enable")
249 prop.setProperty(
"measurement.period_count",
"0")
250 retcode = _pn.init(prop)
252 self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
256 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
257 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
258 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
259 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
260 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
261 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
262 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
263 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
276 OpenRTM_aist.DataPortStatus.PORT_OK)
278 prop.setProperty(
"publisher.push_policy",
"fifo")
279 prop.setProperty(
"publisher.skip_count",
"0")
280 prop.setProperty(
"thread_type",
"default")
281 prop.setProperty(
"measurement.exec_time",
"enable")
282 prop.setProperty(
"measurement.exec_count",
"0")
283 prop.setProperty(
"measurement.period_time",
"enable")
284 prop.setProperty(
"measurement.period_count",
"0")
285 retcode = _pn.init(prop)
287 self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
291 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
293 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
295 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
297 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
299 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
301 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
303 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
305 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
320 OpenRTM_aist.DataPortStatus.PORT_OK)
322 prop.setProperty(
"publisher.push_policy",
"skip")
323 prop.setProperty(
"publisher.skip_count",
"0")
324 prop.setProperty(
"thread_type",
"default")
325 prop.setProperty(
"measurement.exec_time",
"enable")
326 prop.setProperty(
"measurement.exec_count",
"0")
327 prop.setProperty(
"measurement.period_time",
"enable")
328 prop.setProperty(
"measurement.period_count",
"0")
329 retcode = _pn.init(prop)
331 self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
335 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
336 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
337 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
338 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
339 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
340 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
341 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
342 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
355 OpenRTM_aist.DataPortStatus.PORT_OK)
357 prop.setProperty(
"publisher.push_policy",
"new")
358 prop.setProperty(
"publisher.skip_count",
"0")
359 prop.setProperty(
"thread_type",
"default")
360 prop.setProperty(
"measurement.exec_time",
"enable")
361 prop.setProperty(
"measurement.exec_count",
"0")
362 prop.setProperty(
"measurement.period_time",
"enable")
363 prop.setProperty(
"measurement.period_count",
"0")
364 retcode = _pn.init(prop)
366 self.assertEqual(_pn.setConsumer(cons),OpenRTM_aist.DataPortStatus.PORT_OK)
370 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
371 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
372 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
373 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
374 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
375 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
376 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
378 self.assertEqual(_pn.write(123,0,0),OpenRTM_aist.DataPortStatus.PORT_OK)
386 self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.BUFFER_OK,0),
387 OpenRTM_aist.DataPortStatus.PORT_OK)
388 self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.BUFFER_EMPTY,0),
389 OpenRTM_aist.DataPortStatus.PORT_ERROR)
390 self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.TIMEOUT,0),
391 OpenRTM_aist.DataPortStatus.BUFFER_TIMEOUT)
392 self.assertEqual(_pn.convertReturn(OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET,0),
393 OpenRTM_aist.DataPortStatus.PRECONDITION_NOT_MET)
394 self.assertEqual(_pn.convertReturn(100,0),
395 OpenRTM_aist.DataPortStatus.PORT_ERROR)
400 if __name__ ==
'__main__':
def convertReturnCode(self, ret)
Return codes conversionReturnCode convertReturnCode(OpenRTM::PortStatus ret)
def test_activate_deactivate_isActive(self)
def convertReturnCode(self, ret)
The Properties class represents a persistent set of properties.
def test_setConsumer(self)
InPortCorbaCdrConsumer class.
def get_m_put_data_len(self)
def test_convertReturn(self)