71 self.
_rtcout = OpenRTM_aist.Manager.instance().getLogbuf(
"PublisherNew")
97 self.
_rtcout.RTC_TRACE(
"~PublisherNew()")
100 self.
_task.finalize()
102 OpenRTM_aist.PeriodicTaskFactory.instance().deleteObject(self.
_task)
104 self.
_rtcout.RTC_PARANOID(
"task deleted.")
121 push_policy = prop.getProperty(
"publisher.push_policy",
"new")
122 self.
_rtcout.RTC_DEBUG(
"push_policy: %s", push_policy)
124 push_policy = OpenRTM_aist.normalize([push_policy])
126 if push_policy ==
"all":
129 elif push_policy ==
"fifo":
132 elif push_policy ==
"skip":
135 elif push_policy ==
"new":
139 self.
_rtcout.RTC_ERROR(
"invalid push_policy value: %s", push_policy)
142 skip_count = prop.getProperty(
"publisher.skip_count",
"0")
143 self.
_rtcout.RTC_DEBUG(
"skip_count: %s", skip_count)
146 ret = OpenRTM_aist.stringTo(skipn, skip_count)
150 self.
_rtcout.RTC_ERROR(
"invalid skip_count value: %s", skip_count)
154 self.
_rtcout.RTC_ERROR(
"invalid skip_count value: %d", self.
_skipn)
168 factory = OpenRTM_aist.PeriodicTaskFactory.instance()
170 th = factory.getIdentifiers()
171 self.
_rtcout.RTC_DEBUG(
"available task types: %s", OpenRTM_aist.flatten(th))
173 self.
_task = factory.createObject(prop.getProperty(
"thread_type",
"default"))
176 self.
_rtcout.RTC_ERROR(
"Task creation failed: %s",
177 prop.getProperty(
"thread_type",
"default"))
180 self.
_rtcout.RTC_PARANOID(
"Task creation succeeded.")
182 mprop = prop.getNode(
"measurement")
186 self.
_task.setPeriod(0.0)
187 self.
_task.executionMeasure(OpenRTM_aist.toBool(mprop.getProperty(
"exec_time"),
188 "enable",
"disable",
True))
190 if OpenRTM_aist.stringTo(ecount, mprop.getProperty(
"exec_count")):
191 self.
_task.executionMeasureCount(ecount[0])
193 self.
_task.periodicMeasure(OpenRTM_aist.toBool(mprop.getProperty(
"period_time"),
194 "enable",
"disable",
True))
196 if OpenRTM_aist.stringTo(pcount, mprop.getProperty(
"period_count")):
197 self.
_task.periodicMeasureCount(pcount[0])
258 self.
_rtcout.RTC_TRACE(
"init()")
289 self.
_rtcout.RTC_TRACE(
"setConsumer()")
292 self.
_rtcout.RTC_ERROR(
"setConsumer(consumer = 0): invalid argument.")
325 self.
_rtcout.RTC_TRACE(
"setBuffer()")
328 self.
_rtcout.RTC_ERROR(
"setBuffer(buffer == 0): invalid argument")
370 self.
_rtcout.RTC_TRACE(
"setListener()")
373 self.
_rtcout.RTC_ERROR(
"setListeners(listeners == 0): invalid argument")
460 self.
_rtcout.RTC_PARANOID(
"write()")
466 self.
_rtcout.RTC_DEBUG(
"write(): connection lost.")
470 self.
_rtcout.RTC_DEBUG(
"write(): InPort buffer is full.")
482 self.
_rtcout.RTC_DEBUG(
"%s = write()", OpenRTM_aist.DataPortStatus.toString(ret))
616 self.
_rtcout.RTC_TRACE(
"pushAll()")
619 while self.
_buffer.readable() > 0:
627 self.
_rtcout.RTC_DEBUG(
"%s = consumer.put()", OpenRTM_aist.DataPortStatus.toString(ret))
635 self.
_rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
645 self.
_rtcout.RTC_TRACE(
"pushFifo()")
655 self.
_rtcout.RTC_DEBUG(
"%s = consumer.put()", OpenRTM_aist.DataPortStatus.toString(ret))
663 self.
_rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
673 self.
_rtcout.RTC_TRACE(
"pushSkip()")
677 loopcnt = preskip/(self.
_skipn+1)
680 for i
in range(loopcnt):
681 self.
_buffer.advanceRptr(postskip)
688 self.
_buffer.advanceRptr(-postskip)
689 self.
_rtcout.RTC_DEBUG(
"%s = consumer.put()", OpenRTM_aist.DataPortStatus.toString(ret))
693 postskip = self.
_skipn + 1
711 self.
_rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
721 self.
_rtcout.RTC_TRACE(
"pushNew()")
732 self.
_rtcout.RTC_DEBUG(
"%s = consumer.put()", OpenRTM_aist.DataPortStatus.toString(ret))
741 self.
_rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
815 if status == OpenRTM_aist.BufferStatus.BUFFER_OK:
818 elif status == OpenRTM_aist.BufferStatus.BUFFER_ERROR:
821 elif status == OpenRTM_aist.BufferStatus.BUFFER_FULL:
825 elif status == OpenRTM_aist.BufferStatus.NOT_SUPPORTED:
828 elif status == OpenRTM_aist.BufferStatus.TIMEOUT:
832 elif status == OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET:
898 self.
_listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE].notify(self.
_profile, data)
913 self.
_listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_FULL].notify(self.
_profile, data)
928 self.
_listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE_TIMEOUT].notify(self.
_profile, data)
943 self.
_listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_OVERWRITE].notify(self.
_profile, data)
958 self.
_listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_READ].notify(self.
_profile, data)
973 self.
_listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_SEND].notify(self.
_profile, data)
988 self.
_listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVED].notify(self.
_profile, data)
1003 self.
_listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_FULL].notify(self.
_profile, data)
1018 self.
_listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_TIMEOUT].notify(self.
_profile, data)
1033 self.
_listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_ERROR].notify(self.
_profile, data)
1048 self.
_listeners.connector_[OpenRTM_aist.ConnectorListenerType.ON_SENDER_ERROR].notify(self.
_profile)
1054 OpenRTM_aist.PublisherFactory.instance().addFactory(
"new",
1056 OpenRTM_aist.Delete)
def onReceiverTimeout(self, data)
Notify an ON_RECEIVER_TIMEOUT event to listeners.
def svc(self)
Thread execution function.
def pushFifo(self)
push "fifo" policy
def onBufferWrite(self, data)
Notify an ON_BUFFER_WRITE event to listeners.
def init(self, prop)
Initialization.
def pushSkip(self)
push "skip" policy
def activate(self)
activation
def write(self, data, sec, usec)
Write data.
def onBufferWriteOverwrite(self, data)
Notify an ON_BUFFER_OVERWRITE event to listeners.
def convertReturn(self, status, data)
Convertion from BufferStatus to DataPortStatus.
def invokeListener(self, status, data)
Call listeners according to the DataPortStatus.
def setPushPolicy(self, prop)
Setting PushPolicyvoid PublisherNew::setPushPolicy(const coil::Properties& prop)
def setConsumer(self, consumer)
Store InPort consumer.
def pushNew(self)
push "new" policy
def createTask(self, prop)
Setting Taskbool PublisherNew::createTask(const coil::Properties& prop)
def onBufferFull(self, data)
Notify an ON_BUFFER_FULL event to listeners.
def deactivate(self)
deactivation
def onReceiverError(self, data)
Notify an ON_RECEIVER_ERROR event to listeners.
def pushAll(self)
push all policy
def onBufferWriteTimeout(self, data)
Notify an ON_BUFFER_WRITE_TIMEOUT event to listeners.
def onSenderError(self)
Notify an ON_SENDER_ERROR event to listeners.
def onSend(self, data)
Notify an ON_SEND event to listners.
int PORT_OK
DataPortStatus return codes.
def onBufferRead(self, data)
Notify an ON_BUFFER_READ event to listeners.
def setBuffer(self, buffer)
Setting buffer pointer.
def onReceived(self, data)
Notify an ON_RECEIVED event to listeners.
def isActive(self)
If publisher is active state.
def onReceiverFull(self, data)
Notify an ON_RECEIVER_FULL event to listeners.
def __init__(self)
Constructor.
def __del__(self)
Destructor.
def setListener(self, info, listeners)
Set the listener.