71 self.
_rtcout = OpenRTM_aist.Manager.instance().getLogbuf(
"PublisherNew")
97 self._rtcout.RTC_TRACE(
"~PublisherNew()")
100 self._task.finalize()
102 OpenRTM_aist.PeriodicTaskFactory.instance().deleteObject(self.
_task)
104 self._rtcout.RTC_PARANOID(
"task deleted.")
121 push_policy = prop.getProperty(
"publisher.push_policy",
"new")
122 self._rtcout.RTC_DEBUG(
"push_policy: %s", push_policy)
124 push_policy = OpenRTM_aist.normalize([push_policy])
126 if push_policy ==
"all":
129 elif push_policy ==
"fifo":
132 elif push_policy ==
"skip":
135 elif push_policy ==
"new":
139 self._rtcout.RTC_ERROR(
"invalid push_policy value: %s", push_policy)
142 skip_count = prop.getProperty(
"publisher.skip_count",
"0")
143 self._rtcout.RTC_DEBUG(
"skip_count: %s", skip_count)
146 ret = OpenRTM_aist.stringTo(skipn, skip_count)
150 self._rtcout.RTC_ERROR(
"invalid skip_count value: %s", skip_count)
154 self._rtcout.RTC_ERROR(
"invalid skip_count value: %d", self.
_skipn)
168 factory = OpenRTM_aist.PeriodicTaskFactory.instance()
170 th = factory.getIdentifiers()
171 self._rtcout.RTC_DEBUG(
"available task types: %s", OpenRTM_aist.flatten(th))
173 self.
_task = factory.createObject(prop.getProperty(
"thread_type",
"default"))
176 self._rtcout.RTC_ERROR(
"Task creation failed: %s",
177 prop.getProperty(
"thread_type",
"default"))
180 self._rtcout.RTC_PARANOID(
"Task creation succeeded.")
182 mprop = prop.getNode(
"measurement")
185 self._task.setTask(self.
svc)
186 self._task.setPeriod(0.0)
187 self._task.executionMeasure(OpenRTM_aist.toBool(mprop.getProperty(
"exec_time"),
188 "enable",
"disable",
True))
190 if OpenRTM_aist.stringTo(ecount, mprop.getProperty(
"exec_count")):
191 self._task.executionMeasureCount(ecount[0])
193 self._task.periodicMeasure(OpenRTM_aist.toBool(mprop.getProperty(
"period_time"),
194 "enable",
"disable",
True))
196 if OpenRTM_aist.stringTo(pcount, mprop.getProperty(
"period_count")):
197 self._task.periodicMeasureCount(pcount[0])
200 self._task.activate()
258 self._rtcout.RTC_TRACE(
"init()")
289 self._rtcout.RTC_TRACE(
"setConsumer()")
292 self._rtcout.RTC_ERROR(
"setConsumer(consumer = 0): invalid argument.")
325 self._rtcout.RTC_TRACE(
"setBuffer()")
328 self._rtcout.RTC_ERROR(
"setBuffer(buffer == 0): invalid argument")
370 self._rtcout.RTC_TRACE(
"setListener()")
373 self._rtcout.RTC_ERROR(
"setListeners(listeners == 0): invalid argument")
460 self._rtcout.RTC_PARANOID(
"write()")
466 self._rtcout.RTC_DEBUG(
"write(): connection lost.")
470 self._rtcout.RTC_DEBUG(
"write(): InPort buffer is full.")
471 ret = self._buffer.write(data, sec, usec)
479 ret = self._buffer.write(data, sec, usec)
482 self._rtcout.RTC_DEBUG(
"%s = write()", OpenRTM_aist.DataPortStatus.toString(ret))
616 self._rtcout.RTC_TRACE(
"pushAll()")
619 while self._buffer.readable() > 0:
620 cdr = self._buffer.get()
624 ret = self._consumer.put(cdr)
627 self._rtcout.RTC_DEBUG(
"%s = consumer.put()", OpenRTM_aist.DataPortStatus.toString(ret))
631 self._buffer.advanceRptr()
635 self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
645 self._rtcout.RTC_TRACE(
"pushFifo()")
648 cdr = self._buffer.get()
652 ret = self._consumer.put(cdr)
655 self._rtcout.RTC_DEBUG(
"%s = consumer.put()", OpenRTM_aist.DataPortStatus.toString(ret))
659 self._buffer.advanceRptr()
663 self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
673 self._rtcout.RTC_TRACE(
"pushSkip()")
676 preskip = self._buffer.readable() + self.
_leftskip 677 loopcnt = preskip/(self.
_skipn+1)
680 for i
in range(loopcnt):
681 self._buffer.advanceRptr(postskip)
682 cdr = self._buffer.get()
686 ret = self._consumer.put(cdr)
688 self._buffer.advanceRptr(-postskip)
689 self._rtcout.RTC_DEBUG(
"%s = consumer.put()", OpenRTM_aist.DataPortStatus.toString(ret))
693 postskip = self.
_skipn + 1
695 self._buffer.advanceRptr(self._buffer.readable())
711 self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
721 self._rtcout.RTC_TRACE(
"pushNew()")
723 self._buffer.advanceRptr(self._buffer.readable() - 1)
725 cdr = self._buffer.get()
729 ret = self._consumer.put(cdr)
732 self._rtcout.RTC_DEBUG(
"%s = consumer.put()", OpenRTM_aist.DataPortStatus.toString(ret))
736 self._buffer.advanceRptr()
741 self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
815 if status == OpenRTM_aist.BufferStatus.BUFFER_OK:
818 elif status == OpenRTM_aist.BufferStatus.BUFFER_ERROR:
821 elif status == OpenRTM_aist.BufferStatus.BUFFER_FULL:
825 elif status == OpenRTM_aist.BufferStatus.NOT_SUPPORTED:
828 elif status == OpenRTM_aist.BufferStatus.TIMEOUT:
832 elif status == OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET:
898 self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE].notify(self.
_profile, data)
913 self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_FULL].notify(self.
_profile, data)
928 self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE_TIMEOUT].notify(self.
_profile, data)
943 self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_OVERWRITE].notify(self.
_profile, data)
958 self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_READ].notify(self.
_profile, data)
973 self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_SEND].notify(self.
_profile, data)
988 self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVED].notify(self.
_profile, data)
1003 self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_FULL].notify(self.
_profile, data)
1018 self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_TIMEOUT].notify(self.
_profile, data)
1033 self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_ERROR].notify(self.
_profile, data)
1048 self._listeners.connector_[OpenRTM_aist.ConnectorListenerType.ON_SENDER_ERROR].notify(self.
_profile)
1054 OpenRTM_aist.PublisherFactory.instance().addFactory(
"new",
1056 OpenRTM_aist.Delete)
def onReceiverTimeout(self, data)
Notify an ON_RECEIVER_TIMEOUT event to listeners.
def svc(self)
Thread execution function.
def pushFifo(self)
push "fifo" policy
def onBufferWrite(self, data)
Notify an ON_BUFFER_WRITE event to listeners.
def init(self, prop)
Initialization.
def pushSkip(self)
push "skip" policy
def activate(self)
activation
def write(self, data, sec, usec)
Write data.
def onBufferWriteOverwrite(self, data)
Notify an ON_BUFFER_OVERWRITE event to listeners.
def convertReturn(self, status, data)
Convertion from BufferStatus to DataPortStatus.
def invokeListener(self, status, data)
Call listeners according to the DataPortStatus.
def setPushPolicy(self, prop)
Setting PushPolicyvoid PublisherNew::setPushPolicy(const coil::Properties& prop)
def setConsumer(self, consumer)
Store InPort consumer.
def pushNew(self)
push "new" policy
def createTask(self, prop)
Setting Taskbool PublisherNew::createTask(const coil::Properties& prop)
def onBufferFull(self, data)
Notify an ON_BUFFER_FULL event to listeners.
def deactivate(self)
deactivation
def onReceiverError(self, data)
Notify an ON_RECEIVER_ERROR event to listeners.
def pushAll(self)
push all policy
def onBufferWriteTimeout(self, data)
Notify an ON_BUFFER_WRITE_TIMEOUT event to listeners.
def onSenderError(self)
Notify an ON_SENDER_ERROR event to listeners.
def onSend(self, data)
Notify an ON_SEND event to listners.
int PORT_OK
DataPortStatus return codes.
def onBufferRead(self, data)
Notify an ON_BUFFER_READ event to listeners.
def setBuffer(self, buffer)
Setting buffer pointer.
def onReceived(self, data)
Notify an ON_RECEIVED event to listeners.
def isActive(self)
If publisher is active state.
def onReceiverFull(self, data)
Notify an ON_RECEIVER_FULL event to listeners.
def __init__(self)
Constructor.
def __del__(self)
Destructor.
def setListener(self, info, listeners)
Set the listener.