InPort.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: euc-jp -*-
3 
4 
16 
17 from omniORB import *
18 from omniORB import any
19 import sys
20 import copy
21 import time
22 
23 import OpenRTM_aist
24 
25 
59  """
60  """
61 
62 
63 
64 
101  def __init__(self, name, value, buffer=None,
102  read_block=False, write_block=False,
103  read_timeout=0, write_timeout = 0):
104  OpenRTM_aist.InPortBase.__init__(self, name, OpenRTM_aist.toTypename(value))
105  self._name = name
106  self._value = value
107  self._OnRead = None
108  self._OnReadConvert = None
109 
110 
111  def __del__(self, InPortBase=OpenRTM_aist.InPortBase):
112  InPortBase.__del__(self)
113  return
114 
115 
130  def name(self):
131  return self._name
132 
133 
134 
151  def isNew(self):
152  self._rtcout.RTC_TRACE("isNew()")
153 
154  if len(self._connectors) == 0:
155  self._rtcout.RTC_DEBUG("no connectors")
156  return False
157 
158  r = self._connectors[0].getBuffer().readable()
159  if r > 0:
160  self._rtcout.RTC_DEBUG("isNew() = True, readable data: %d",r)
161  return True
162 
163  self._rtcout.RTC_DEBUG("isNew() = False, no readable data")
164  return False
165 
166 
167 
191  def isEmpty(self):
192  self._rtcout.RTC_TRACE("isEmpty()")
193 
194  if len(self._connectors) == 0:
195  self._rtcout.RTC_DEBUG("no connectors")
196  return True
197 
198  r = self._connectors[0].getBuffer().readable()
199  if r == 0:
200  self._rtcout.RTC_DEBUG("isEmpty() = true, buffer is empty")
201  return True
202 
203  self._rtcout.RTC_DEBUG("isEmpty() = false, data exists in the buffer")
204  return False
205 
206 
207 
279  def read(self):
280  self._rtcout.RTC_TRACE("DataType read()")
281 
282  if self._OnRead is not None:
283  self._OnRead()
284  self._rtcout.RTC_TRACE("OnRead called")
285 
286  if len(self._connectors) == 0:
287  self._rtcout.RTC_DEBUG("no connectors")
288  return self._value
289 
290  _val = copy.deepcopy(self._value)
291  cdr = [_val]
292  ret = self._connectors[0].read(cdr)
293 
294 
295  if ret == OpenRTM_aist.DataPortStatus.PORT_OK:
296  self._rtcout.RTC_DEBUG("data read succeeded")
297  self._value = cdr[0]
298 
299  if self._OnReadConvert is not None:
300  self._value = self._OnReadConvert(self._value)
301  self._rtcout.RTC_DEBUG("OnReadConvert called")
302  return self._value
303  return self._value
304 
305  elif ret == OpenRTM_aist.DataPortStatus.BUFFER_EMPTY:
306  self._rtcout.RTC_WARN("buffer empty")
307  return self._value
308 
309  elif ret == OpenRTM_aist.DataPortStatus.BUFFER_TIMEOUT:
310  self._rtcout.RTC_WARN("buffer read timeout")
311  return self._value
312 
313  self._rtcout.RTC_ERROR("unknown retern value from buffer.read()")
314  return self._value
315 
316 
317 
334  def update(self):
335  self.read()
336 
337 
338 
352  def setOnRead(self, on_read):
353  self._OnRead = on_read
354 
355 
356 
371  def setOnReadConvert(self, on_rconvert):
372  self._OnReadConvert = on_rconvert
def update(self)
Read into bound T-type data from current InPort.
Definition: InPort.py:334
def __del__(self, InPortBase=OpenRTM_aist.InPortBase)
Definition: InPort.py:111
def isNew(self)
bool isNew()
Definition: InPort.py:151
def isEmpty(self)
Check whether the data is newest.
Definition: InPort.py:191
def read(self)
Readout the value from DataPort.
Definition: InPort.py:279
def setOnRead(self, on_read)
Definition: InPort.py:352
InPort template class.
Definition: InPort.py:58
def name(self)
const char* name()
Definition: InPort.py:130
def __init__(self, name, value, buffer=None, read_block=False, write_block=False, read_timeout=0, write_timeout=0)
A constructor.
Definition: InPort.py:103
def setOnReadConvert(self, on_rconvert)
Definition: InPort.py:371


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Mon Feb 28 2022 23:01:06