20 from omniORB
import any
23 import OpenRTM__POA,OpenRTM
81 OpenRTM_aist.InPortProvider.__init__(self)
95 orb = OpenRTM_aist.Manager.instance().getORB()
98 orb.object_to_string(self.
_objref)))
118 oid = OpenRTM_aist.Manager.instance().getPOA.servant_to_id(self)
119 OpenRTM_aist.Manager.instance().getPOA.deactivate_object(oid)
123 def init(self, prop):
159 self._rtcout.RTC_PARANOID(
"InPortCorbaCdrProvider.put()")
163 return OpenRTM.PORT_ERROR
165 self._rtcout.RTC_PARANOID(
"received data size: %d", len(data))
170 return OpenRTM.PORT_ERROR
172 ret = self._connector.write(data)
177 self._rtcout.RTC_TRACE(OpenRTM_aist.Logger.print_exception())
178 return OpenRTM.UNKNOWN_ERROR
179 return OpenRTM.UNKNOWN_ERROR
183 if status == OpenRTM_aist.BufferStatus.BUFFER_OK:
185 return OpenRTM.PORT_OK
187 elif status == OpenRTM_aist.BufferStatus.BUFFER_ERROR:
189 return OpenRTM.PORT_ERROR
191 elif status == OpenRTM_aist.BufferStatus.BUFFER_FULL:
194 return OpenRTM.BUFFER_FULL
196 elif status == OpenRTM_aist.BufferStatus.BUFFER_EMPTY:
197 return OpenRTM.BUFFER_EMPTY
199 elif status == OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET:
201 return OpenRTM.PORT_ERROR
203 elif status == OpenRTM_aist.BufferStatus.TIMEOUT:
206 return OpenRTM.BUFFER_TIMEOUT
210 return OpenRTM.UNKNOWN_ERROR
219 self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE].notify(self.
_profile, data)
226 self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_FULL].notify(self.
_profile, data)
233 self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE_TIMEOUT].notify(self.
_profile, data)
239 self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_OVERWRITE].notify(self.
_profile, data)
246 self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVED].notify(self.
_profile, data)
253 self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_FULL].notify(self.
_profile, data)
260 self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_TIMEOUT].notify(self.
_profile, data)
267 self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_ERROR].notify(self.
_profile, data)
272 factory = OpenRTM_aist.InPortProviderFactory.instance()
273 factory.addFactory(
"corba_cdr",
276 def setInterfaceType(self, interface_type)
def put(self, data)
[CORBA interface] Write data into the buffer
def onReceiverTimeout(self, data)
inline void onReceiverTimeout(const cdrMemoryStream& data)
def onReceiverFull(self, data)
inline void onReceiverFull(const cdrMemoryStream& data)
def __del__(self)
Destructor.
def convertReturn(self, status, data)
def onBufferFull(self, data)
inline void onBufferFull(const cdrMemoryStream& data)
def InPortCorbaCdrProviderInit()
InPortCorbaCdrProvider class.
def init(self, prop)
virtual void init(coil::Properties& prop);
def onBufferWriteTimeout(self, data)
inline void onBufferWriteTimeout(const cdrMemoryStream& data)
def setBuffer(self, buffer)
virtual void setBuffer(BufferBase<cdrMemoryStream>* buffer);
def onReceived(self, data)
inline void onReceived(const cdrMemoryStream& data)
def onBufferWriteOverwrite(self, data)
inline void onBufferWriteOverwrite(const cdrMemoryStream& data)
def onReceiverError(self, data)
inline void onReceiverError(const cdrMemoryStream& data)
def setListener(self, info, listeners)
def onBufferWrite(self, data)
Connector data listener functions.
def __init__(self)
Constructor.
def newNV(name, value)
Create NameVale.