19 from omniORB
import any
73 self.
_rtcout = OpenRTM_aist.Manager.instance().getLogbuf(
"PublisherPeriodic")
101 self.
_rtcout.RTC_TRACE(
"~PublisherPeriodic()")
104 self.
_task.finalize()
105 self.
_rtcout.RTC_PARANOID(
"task finalized.")
107 OpenRTM_aist.PeriodicTaskFactory.instance().deleteObject(self.
_task)
109 self.
_rtcout.RTC_PARANOID(
"task deleted.")
126 push_policy = prop.getProperty(
"publisher.push_policy",
"new")
127 self.
_rtcout.RTC_DEBUG(
"push_policy: %s", push_policy)
129 push_policy = OpenRTM_aist.normalize([push_policy])
131 if push_policy ==
"all":
134 elif push_policy ==
"fifo":
137 elif push_policy ==
"skip":
140 elif push_policy ==
"new":
144 self.
_rtcout.RTC_ERROR(
"invalid push_policy value: %s", push_policy)
147 skip_count = prop.getProperty(
"publisher.skip_count",
"0")
148 self.
_rtcout.RTC_DEBUG(
"skip_count: %s", skip_count)
151 ret = OpenRTM_aist.stringTo(skipn, skip_count)
155 self.
_rtcout.RTC_ERROR(
"invalid skip_count value: %s", skip_count)
159 self.
_rtcout.RTC_ERROR(
"invalid skip_count value: %d", self.
_skipn)
173 factory = OpenRTM_aist.PeriodicTaskFactory.instance()
175 th = factory.getIdentifiers()
176 self.
_rtcout.RTC_DEBUG(
"available task types: %s", OpenRTM_aist.flatten(th))
178 self.
_task = factory.createObject(prop.getProperty(
"thread_type",
"default"))
180 self.
_rtcout.RTC_ERROR(
"Task creation failed: %s",
181 prop.getProperty(
"thread_type",
"default"))
184 self.
_rtcout.RTC_PARANOID(
"Task creation succeeded.")
190 rate = prop.getProperty(
"publisher.push_rate")
196 self.
_rtcout.RTC_DEBUG(
"Task period %f [Hz]", hz)
200 self.
_task.setPeriod(1.0/hz)
203 mprop = prop.getNode(
"measurement")
205 self.
_task.executionMeasure(OpenRTM_aist.toBool(mprop.getProperty(
"exec_time"),
206 "enable",
"disable",
True))
209 if OpenRTM_aist.stringTo(ecount, mprop.getProperty(
"exec_count")):
210 self.
_task.executionMeasureCount(ecount[0])
212 self.
_task.periodicMeasure(OpenRTM_aist.toBool(mprop.getProperty(
"period_time"),
213 "enable",
"disable",
True))
216 if OpenRTM_aist.stringTo(pcount, mprop.getProperty(
"period_count")):
217 self.
_task.periodicMeasureCount(pcount[0])
289 self.
_rtcout.RTC_TRACE(
"init()")
321 self.
_rtcout.RTC_TRACE(
"setConsumer()")
324 self.
_rtcout.RTC_ERROR(
"setConsumer(consumer = 0): invalid argument.")
357 self.
_rtcout.RTC_TRACE(
"setBuffer()")
360 self.
_rtcout.RTC_ERROR(
"setBuffer(buffer == 0): invalid argument")
403 self.
_rtcout.RTC_TRACE(
"setListeners()")
406 self.
_rtcout.RTC_ERROR(
"setListeners(listeners == 0): invalid argument")
493 self.
_rtcout.RTC_PARANOID(
"write()")
499 self.
_rtcout.RTC_DEBUG(
"write(): connection lost.")
503 self.
_rtcout.RTC_DEBUG(
"write(): InPort buffer is full.")
509 self.
_rtcout.RTC_DEBUG(
"%s = write()", OpenRTM_aist.DataPortStatus.toString(ret))
648 self.
_rtcout.RTC_TRACE(
"pushAll()")
656 while self.
_buffer.readable() > 0:
664 self.
_rtcout.RTC_DEBUG(
"%s = consumer.put()", OpenRTM_aist.DataPortStatus.toString(ret))
678 self.
_rtcout.RTC_TRACE(
"pushFifo()")
692 self.
_rtcout.RTC_DEBUG(
"%s = consumer.put()",OpenRTM_aist.DataPortStatus.toString(ret))
706 self.
_rtcout.RTC_TRACE(
"pushSkip()")
715 loopcnt = preskip / (self.
_skipn + 1)
717 for i
in range(loopcnt):
718 self.
_buffer.advanceRptr(postskip)
725 self.
_buffer.advanceRptr(-postskip)
726 self.
_rtcout.RTC_DEBUG(
"%s = consumer.put()",OpenRTM_aist.DataPortStatus.toString(ret))
729 postskip = self.
_skipn + 1
742 self.
_rtcout.RTC_TRACE(
"pushNew()")
763 self.
_rtcout.RTC_DEBUG(
"%s = consumer.put()", OpenRTM_aist.DataPortStatus.toString(ret))
830 if status == OpenRTM_aist.BufferStatus.BUFFER_OK:
833 elif status == OpenRTM_aist.BufferStatus.BUFFER_ERROR:
836 elif status == OpenRTM_aist.BufferStatus.BUFFER_FULL:
840 elif status == OpenRTM_aist.BufferStatus.NOT_SUPPORTED:
843 elif status == OpenRTM_aist.BufferStatus.TIMEOUT:
847 elif status == OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET:
915 self.
_listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE].notify(self.
_profile, data)
930 self.
_listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_FULL].notify(self.
_profile, data)
945 self.
_listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE_TIMEOUT].notify(self.
_profile, data)
960 self.
_listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_READ].notify(self.
_profile, data)
975 self.
_listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_SEND].notify(self.
_profile, data)
990 self.
_listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVED].notify(self.
_profile, data)
1005 self.
_listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_FULL].notify(self.
_profile, data)
1020 self.
_listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_TIMEOUT].notify(self.
_profile, data)
1035 self.
_listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_ERROR].notify(self.
_profile, data)
1049 self.
_listeners.connector_[OpenRTM_aist.ConnectorListenerType.ON_BUFFER_EMPTY].notify(self.
_profile)
1062 self.
_listeners.connector_[OpenRTM_aist.ConnectorListenerType.ON_SENDER_EMPTY].notify(self.
_profile)
1075 self.
_listeners.connector_[OpenRTM_aist.ConnectorListenerType.ON_SENDER_ERROR].notify(self.
_profile)
1089 self.
_rtcout.RTC_DEBUG(
"buffer empty")
1099 OpenRTM_aist.PublisherFactory.instance().addFactory(
"periodic",
1101 OpenRTM_aist.Delete)
def onReceiverError(self, data)
Notify an ON_RECEIVER_ERROR event to listeners.
def pushSkip(self)
push "skip" policy
def pushFifo(self)
push "fifo" policy
def init(self, prop)
Initialization.
def deactivate(self)
deactivation
def onReceived(self, data)
Notify an ON_RECEIVED event to listeners.
def __del__(self)
Destructor.
def activate(self)
activation
def bufferIsEmpty(self)
Whether a buffer is empty.
def PublisherPeriodicInit()
def invokeListener(self, status, data)
Call listeners according to the DataPortStatus.
def setListener(self, info, listeners)
Set the listener.
def onReceiverFull(self, data)
Notify an ON_RECEIVER_FULL event to listeners.
def write(self, data, sec, usec)
Write data.
def onBufferWriteTimeout(self, data)
Notify an ON_BUFFER_WRITE_TIMEOUT event to listeners.
def onSend(self, data)
Notify an ON_SEND event to listners.
def svc(self)
Thread execution function A task execution function to be executed by coil::PeriodicTask.
def pushNew(self)
push "new" policy
def isActive(self)
If publisher is active state.
def onSenderError(self)
Notify an ON_SENDER_ERROR event to listenersinline void onSenderError()
def onReceiverTimeout(self, data)
Notify an ON_RECEIVER_TIMEOUT event to listeners.
def onBufferEmpty(self)
Notify an ON_BUFFER_EMPTY event to listenersinline void onBufferEmpty()
def onBufferRead(self, data)
Notify an ON_BUFFER_READ event to listeners.
int PORT_OK
DataPortStatus return codes.
def setConsumer(self, consumer)
Store InPort consumer.
def setBuffer(self, buffer)
Setting buffer pointer.
def convertReturn(self, status, data)
Convertion from BufferStatus to DataPortStatus.
def onBufferWrite(self, data)
Notify an ON_BUFFER_WRITE event to listeners.
def setPushPolicy(self, prop)
Setting PushPolicyvoid PublisherNew::setPushPolicy(const coil::Properties& prop)
def pushAll(self)
push all policy
def __init__(self)
Constructor.
def onBufferFull(self, data)
Notify an ON_BUFFER_FULL event to listeners.
def createTask(self, prop)
Setting Taskbool PublisherNew::createTask(const coil::Properties& prop)
def onSenderEmpty(self)
Notify an ON_SENDER_EMPTY event to listenersinline void onSenderEmpty()