22 from omniORB
import any
122 def __init__(self, info, provider, listeners, buffer = 0):
123 OpenRTM_aist.InPortConnector.__init__(self, info, buffer)
138 self.
_buffer.init(info.properties.getNode(
"buffer"))
200 self.
_rtcout.RTC_TRACE(
"read()")
212 if type(data) == list:
219 if ret == OpenRTM_aist.BufferStatus.BUFFER_OK:
222 elif ret == OpenRTM_aist.BufferStatus.BUFFER_EMPTY:
225 elif ret == OpenRTM_aist.BufferStatus.TIMEOUT:
228 elif ret == OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET:
255 self.
_rtcout.RTC_TRACE(
"disconnect()")
259 cfactory = OpenRTM_aist.InPortProviderFactory.instance()
266 bfactory = OpenRTM_aist.CdrBufferFactory.instance()
267 bfactory.deleteObject(self.
_buffer)
331 buf_type = profile.properties.getProperty(
"buffer_type",
"ring_buffer")
332 return OpenRTM_aist.CdrBufferFactory.instance().createObject(buf_type)
369 return OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET
374 _data = cdrUnmarshal(any.to_any(self.
_dataType).typecode(),data,self.
_endian)
377 self.
_rtcout.RTC_ERROR(
"unknown endian from connector")
378 return OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET
392 self.
_listeners.connector_[OpenRTM_aist.ConnectorListenerType.ON_CONNECT].notify(self.
_profile)
404 self.
_listeners.connector_[OpenRTM_aist.ConnectorListenerType.ON_DISCONNECT].notify(self.
_profile)
def deactivate(self)
Connector deactivation.
def onDisconnect(self)
Invoke callback when connection is destroied void onDisconnect()
def __init__(self, info, provider, listeners, buffer=0)
InPortPushConnector(ConnectorInfo info, InPortProvider* provider, ConnectorListeners listeners...
InPortPushConnector class.
def onConnect(self)
Invoke callback when connection is established void onConnect()
def __del__(self)
Destructor.
def disconnect(self)
disconnect
int PORT_OK
DataPortStatus return codes.
I£îPortConnector base class.
def createBuffer(self, profile)
create buffer
def write(self, data)
Reading data.
def activate(self)
Connector activation.
def read(self, data)
Reading data.