InPort.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: euc-jp -*-
3 
4 
16 
17 from omniORB import *
18 from omniORB import any
19 import sys
20 import copy
21 import time
22 
23 import OpenRTM_aist
24 
25 
59  """
60  """
61 
62 
63 
64 
101  def __init__(self, name, value, buffer=None, read_block=False, write_block=False,
102  read_timeout=0, write_timeout = 0):
103  OpenRTM_aist.InPortBase.__init__(self, name, OpenRTM_aist.toTypename(value))
104  self._name = name
105  self._value = value
106  self._OnRead = None
107  self._OnReadConvert = None
109 
110  def __del__(self, InPortBase=OpenRTM_aist.InPortBase):
111  InPortBase.__del__(self)
112  return
113 
114 
129  def name(self):
130  return self._name
131 
132 
133 
150  def isNew(self):
151  self._rtcout.RTC_TRACE("isNew()")
152 
153  if len(self._connectors) == 0:
154  self._rtcout.RTC_DEBUG("no connectors")
155  return False
156 
157  r = self._connectors[0].getBuffer().readable()
158  if r > 0:
159  self._rtcout.RTC_DEBUG("isNew() = True, readable data: %d",r)
160  return True
161 
162  self._rtcout.RTC_DEBUG("isNew() = False, no readable data")
163  return False
164 
165 
166 
190  def isEmpty(self):
191  self._rtcout.RTC_TRACE("isEmpty()")
192 
193  if len(self._connectors) == 0:
194  self._rtcout.RTC_DEBUG("no connectors")
195  return True
196 
197  r = self._connectors[0].getBuffer().readable()
198  if r == 0:
199  self._rtcout.RTC_DEBUG("isEmpty() = true, buffer is empty")
200  return True
201 
202  self._rtcout.RTC_DEBUG("isEmpty() = false, data exists in the buffer")
203  return False
204 
205 
206 
278  def read(self):
279  self._rtcout.RTC_TRACE("DataType read()")
280 
281  if self._OnRead is not None:
282  self._OnRead()
283  self._rtcout.RTC_TRACE("OnRead called")
284 
285  if len(self._connectors) == 0:
286  self._rtcout.RTC_DEBUG("no connectors")
287  return self._value
288 
289  _val = copy.deepcopy(self._value)
290  cdr = [_val]
291  ret = self._connectors[0].read(cdr)
292 
293 
294  if ret == OpenRTM_aist.DataPortStatus.PORT_OK:
295  self._rtcout.RTC_DEBUG("data read succeeded")
296  self._value = cdr[0]
297 
298  if self._OnReadConvert is not None:
299  self._value = self._OnReadConvert(self._value)
300  self._rtcout.RTC_DEBUG("OnReadConvert called")
301  return self._value
302  return self._value
303 
304  elif ret == OpenRTM_aist.DataPortStatus.BUFFER_EMPTY:
305  self._rtcout.RTC_WARN("buffer empty")
306  return self._value
307 
308  elif ret == OpenRTM_aist.DataPortStatus.BUFFER_TIMEOUT:
309  self._rtcout.RTC_WARN("buffer read timeout")
310  return self._value
311 
312  self._rtcout.RTC_ERROR("unknown retern value from buffer.read()")
313  return self._value
314 
315 
316 
333  def update(self):
334  self.read()
335 
336 
337 
351  def setOnRead(self, on_read):
352  self._OnRead = on_read
353 
354 
355 
370  def setOnReadConvert(self, on_rconvert):
371  self._OnReadConvert = on_rconvert
372 
def update(self)
Read into bound T-type data from current InPort.
Definition: InPort.py:334
def __del__(self, InPortBase=OpenRTM_aist.InPortBase)
Definition: InPort.py:111
def isNew(self)
bool isNew()
Definition: InPort.py:151
def isEmpty(self)
Check whether the data is newest.
Definition: InPort.py:191
def read(self)
Readout the value from DataPort.
Definition: InPort.py:279
def setOnRead(self, on_read)
Definition: InPort.py:352
InPort template class.
Definition: InPort.py:58
def name(self)
const char* name()
Definition: InPort.py:130
def __init__(self, name, value, buffer=None, read_block=False, write_block=False, read_timeout=0, write_timeout=0)
A constructor.
Definition: InPort.py:103
def setOnReadConvert(self, on_rconvert)
Definition: InPort.py:371


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Thu Jun 6 2019 19:11:34