PublisherPeriodic.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: euc-jp -*-
3 
4 
17 
18 import threading
19 from omniORB import any
20 
21 import OpenRTM_aist
22 
23 
24 
46  """
47  """
48 
49  # Policy
50  ALL = 0
51  FIFO = 1
52  SKIP = 2
53  NEW = 3
54 
55 
72  def __init__(self):
73  self._rtcout = OpenRTM_aist.Manager.instance().getLogbuf("PublisherPeriodic")
74  self._consumer = None
75  self._buffer = None
76  self._task = None
77  self._retcode = self.PORT_OK
78  self._retmutex = threading.RLock()
79  self._pushPolicy = self.NEW
80  self._skipn = 0
81  self._active = False
82  self._readback = False
83  self._leftskip = 0
84  self._profile = None
85  self._listeners = None
86 
87  return
88 
89 
100  def __del__(self):
101  self._rtcout.RTC_TRACE("~PublisherPeriodic()")
102  if self._task:
103  self._task.resume()
104  self._task.finalize()
105  self._rtcout.RTC_PARANOID("task finalized.")
106 
107  OpenRTM_aist.PeriodicTaskFactory.instance().deleteObject(self._task)
108  del self._task
109  self._rtcout.RTC_PARANOID("task deleted.")
110 
111  # "consumer" should be deleted in the Connector
112  self._consumer = None
113  # "buffer" should be deleted in the Connector
114  self._buffer = None
115  return
116 
117 
125  def setPushPolicy(self, prop):
126  push_policy = prop.getProperty("publisher.push_policy","new")
127  self._rtcout.RTC_DEBUG("push_policy: %s", push_policy)
128 
129  push_policy = OpenRTM_aist.normalize([push_policy])
130 
131  if push_policy == "all":
132  self._pushPolicy = self.ALL
133 
134  elif push_policy == "fifo":
135  self._pushPolicy = self.FIFO
136 
137  elif push_policy == "skip":
138  self._pushPolicy = self.SKIP
139 
140  elif push_policy == "new":
141  self._pushPolicy = self.NEW
142 
143  else:
144  self._rtcout.RTC_ERROR("invalid push_policy value: %s", push_policy)
145  self._pushPolicy = self.NEW
146 
147  skip_count = prop.getProperty("publisher.skip_count","0")
148  self._rtcout.RTC_DEBUG("skip_count: %s", skip_count)
149 
150  skipn = [self._skipn]
151  ret = OpenRTM_aist.stringTo(skipn, skip_count)
152  if ret:
153  self._skipn = skipn[0]
154  else:
155  self._rtcout.RTC_ERROR("invalid skip_count value: %s", skip_count)
156  self._skipn = 0
157 
158  if self._skipn < 0:
159  self._rtcout.RTC_ERROR("invalid skip_count value: %d", self._skipn)
160  self._skipn = 0
161 
162  return
163 
164 
172  def createTask(self, prop):
173  factory = OpenRTM_aist.PeriodicTaskFactory.instance()
174 
175  th = factory.getIdentifiers()
176  self._rtcout.RTC_DEBUG("available task types: %s", OpenRTM_aist.flatten(th))
177 
178  self._task = factory.createObject(prop.getProperty("thread_type", "default"))
179  if not self._task:
180  self._rtcout.RTC_ERROR("Task creation failed: %s",
181  prop.getProperty("thread_type", "default"))
182  return self.INVALID_ARGS
183 
184  self._rtcout.RTC_PARANOID("Task creation succeeded.")
185 
186  # setting task function
187  self._task.setTask(self.svc)
188 
189  # Task execution rate
190  rate = prop.getProperty("publisher.push_rate")
191 
192  if rate != "":
193  hz = float(rate)
194  if hz == 0:
195  hz = 1000.0
196  self._rtcout.RTC_DEBUG("Task period %f [Hz]", hz)
197  else:
198  hz = 1000.0
199 
200  self._task.setPeriod(1.0/hz)
201 
202  # Measurement setting
203  mprop = prop.getNode("measurement")
204 
205  self._task.executionMeasure(OpenRTM_aist.toBool(mprop.getProperty("exec_time"),
206  "enable", "disable", True))
207 
208  ecount = [0]
209  if OpenRTM_aist.stringTo(ecount, mprop.getProperty("exec_count")):
210  self._task.executionMeasureCount(ecount[0])
211 
212  self._task.periodicMeasure(OpenRTM_aist.toBool(mprop.getProperty("period_time"),
213  "enable", "disable", True))
214 
215  pcount = [0]
216  if OpenRTM_aist.stringTo(pcount, mprop.getProperty("period_count")):
217  self._task.periodicMeasureCount(pcount[0])
218 
219  # Start task in suspended mode
220  self._task.suspend()
221  self._task.activate()
222  self._task.suspend()
223 
224  return self.PORT_OK
225 
226 
288  def init(self, prop):
289  self._rtcout.RTC_TRACE("init()")
290  self.setPushPolicy(prop)
291  return self.createTask(prop)
292 
293 
320  def setConsumer(self, consumer):
321  self._rtcout.RTC_TRACE("setConsumer()")
322 
323  if not consumer:
324  self._rtcout.RTC_ERROR("setConsumer(consumer = 0): invalid argument.")
325  return self.INVALID_ARGS
326 
327  self._consumer = consumer
328  return self.PORT_OK
329 
330 
356  def setBuffer(self, buffer):
357  self._rtcout.RTC_TRACE("setBuffer()")
358 
359  if not buffer:
360  self._rtcout.RTC_ERROR("setBuffer(buffer == 0): invalid argument")
361  return self.INVALID_ARGS
362 
363  self._buffer = buffer
364  return self.PORT_OK
365 
366 
402  def setListener(self, info, listeners):
403  self._rtcout.RTC_TRACE("setListeners()")
404 
405  if not listeners:
406  self._rtcout.RTC_ERROR("setListeners(listeners == 0): invalid argument")
407  return self.INVALID_ARGS
408 
409  self._profile = info
410  self._listeners = listeners
411  return self.PORT_OK
412 
413 
492  def write(self, data, sec, usec):
493  self._rtcout.RTC_PARANOID("write()")
494 
495  if not self._consumer or not self._buffer or not self._listeners:
496  return self.PRECONDITION_NOT_MET
497 
498  if self._retcode == self.CONNECTION_LOST:
499  self._rtcout.RTC_DEBUG("write(): connection lost.")
500  return self._retcode
501 
502  if self._retcode == self.SEND_FULL:
503  self._rtcout.RTC_DEBUG("write(): InPort buffer is full.")
504  self._buffer.write(data,sec,usec)
505  return self.BUFFER_FULL
506 
507  self.onBufferWrite(data)
508  ret = self._buffer.write(data, sec, usec)
509  self._rtcout.RTC_DEBUG("%s = write()", OpenRTM_aist.DataPortStatus.toString(ret))
510  self._task.resume()
511  return self.convertReturn(ret, data)
512 
513 
541  def isActive(self):
542  return self._active
543 
544 
570  def activate(self):
571  if not self._task or not self._buffer:
572  return self.PRECONDITION_NOT_MET
573  self._active = True
574  self._task.resume()
575  return self.PORT_OK
576 
577 
603  def deactivate(self):
604  if not self._task:
605  return self.PRECONDITION_NOT_MET
606  self._active = False
607  self._task.suspend()
608  return self.PORT_OK
609 
610 
619  def svc(self):
620  guard = OpenRTM_aist.ScopedLock(self._retmutex)
621 
622  if self._pushPolicy == self.ALL:
623  self._retcode = self.pushAll()
624  return 0
625 
626  elif self._pushPolicy == self.FIFO:
627  self._retcode = self.pushFifo()
628  return 0
629 
630  elif self._pushPolicy == self.SKIP:
631  self._retcode = self.pushSkip()
632  return 0
633 
634  elif self._pushPolicy == self.NEW:
635  self._retcode = self.pushNew()
636  return 0
637 
638  else:
639  self._retcode = self.pushNew()
640 
641  return 0
642 
643 
647  def pushAll(self):
648  self._rtcout.RTC_TRACE("pushAll()")
649 
650  if not self._buffer:
651  return self.PRECONDITION_NOT_MET
652 
653  if self.bufferIsEmpty():
654  return self.BUFFER_EMPTY
655 
656  while self._buffer.readable() > 0:
657  cdr = self._buffer.get()
658  self.onBufferRead(cdr)
659 
660  self.onSend(cdr)
661  ret = self._consumer.put(cdr)
662 
663  if ret != self.PORT_OK:
664  self._rtcout.RTC_DEBUG("%s = consumer.put()", OpenRTM_aist.DataPortStatus.toString(ret))
665  return self.invokeListener(ret, cdr)
666 
667  self.onReceived(cdr)
668  self._buffer.advanceRptr()
669 
670  return self.PORT_OK
671 
672 
673 
677  def pushFifo(self):
678  self._rtcout.RTC_TRACE("pushFifo()")
679  if not self._buffer:
680  return self.PRECONDITION_NOT_MET
681 
682  if self.bufferIsEmpty():
683  return self.BUFFER_EMPTY
684 
685  cdr = self._buffer.get()
686  self.onBufferRead(cdr)
687 
688  self.onSend(cdr)
689  ret = self._consumer.put(cdr)
690 
691  if ret != self.PORT_OK:
692  self._rtcout.RTC_DEBUG("%s = consumer.put()",OpenRTM_aist.DataPortStatus.toString(ret))
693  return self.invokeListener(ret, cdr)
694 
695  self.onReceived(cdr)
696  self._buffer.advanceRptr()
697 
698  return self.PORT_OK
699 
700 
701 
705  def pushSkip(self):
706  self._rtcout.RTC_TRACE("pushSkip()")
707  if not self._buffer:
708  return self.PRECONDITION_NOT_MET
709 
710  if self.bufferIsEmpty():
711  return self.BUFFER_EMPTY
712 
713  ret = self.PORT_OK
714  preskip = self._buffer.readable() + self._leftskip
715  loopcnt = preskip / (self._skipn + 1)
716  postskip = self._skipn - self._leftskip
717  for i in range(loopcnt):
718  self._buffer.advanceRptr(postskip)
719  cdr = self._buffer.get()
720  self.onBufferRead(cdr)
721 
722  self.onSend(cdr)
723  ret = self._consumer.put(cdr)
724  if ret != self.PORT_OK:
725  self._buffer.advanceRptr(-postskip)
726  self._rtcout.RTC_DEBUG("%s = consumer.put()",OpenRTM_aist.DataPortStatus.toString(ret))
727  return self.invokeListener(ret, cdr)
728  self.onReceived(cdr)
729  postskip = self._skipn + 1
730 
731  self._buffer.advanceRptr(self._buffer.readable())
732  self._leftskip = preskip % (self._skipn + 1)
733 
734  return ret
735 
736 
737 
741  def pushNew(self):
742  self._rtcout.RTC_TRACE("pushNew()")
743  if not self._buffer:
744  return self.PRECONDITION_NOT_MET
745 
746  if self.bufferIsEmpty():
747  return self.BUFFER_EMPTY
748 
749  # In case of the periodic/push_new policy, the buffer should
750  # allow readback. But, readback flag should be set as "true"
751  # after written at least one datum into the buffer.
752  self._readback = True
753 
754  self._buffer.advanceRptr(self._buffer.readable() - 1)
755 
756  cdr = self._buffer.get()
757  self.onBufferRead(cdr)
758 
759  self.onSend(cdr)
760  ret = self._consumer.put(cdr)
761 
762  if ret != self.PORT_OK:
763  self._rtcout.RTC_DEBUG("%s = consumer.put()", OpenRTM_aist.DataPortStatus.toString(ret))
764  return self.invokeListener(ret, cdr)
765 
766  self.onReceived(cdr)
767 
768  self._buffer.advanceRptr()
769  return self.PORT_OK
770 
771 
829  def convertReturn(self, status, data):
830  if status == OpenRTM_aist.BufferStatus.BUFFER_OK:
831  return self.PORT_OK
832 
833  elif status == OpenRTM_aist.BufferStatus.BUFFER_ERROR:
834  return self.BUFFER_ERROR
835 
836  elif status == OpenRTM_aist.BufferStatus.BUFFER_FULL:
837  self.onBufferFull(data)
838  return self.BUFFER_FULL
839 
840  elif status == OpenRTM_aist.BufferStatus.NOT_SUPPORTED:
841  return self.PORT_ERROR
842 
843  elif status == OpenRTM_aist.BufferStatus.TIMEOUT:
844  self.onBufferWriteTimeout(data)
845  return self.BUFFER_TIMEOUT
846 
847  elif status == OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET:
848  return self.PRECONDITION_NOT_MET
849 
850  else:
851  return self.PORT_ERROR
852 
853  return self.PORT_ERROR
854 
855 
875  def invokeListener(self, status, data):
876  # ret:
877  # PORT_OK, PORT_ERROR, SEND_FULL, SEND_TIMEOUT, CONNECTION_LOST,
878  # UNKNOWN_ERROR
879  if status == self.PORT_ERROR:
880  self.onReceiverError(data)
881  return self.PORT_ERROR
882 
883  elif status == self.SEND_FULL:
884  self.onReceiverFull(data)
885  return self.SEND_FULL
886 
887  elif status == self.SEND_TIMEOUT:
888  self.onReceiverTimeout(data)
889  return self.SEND_TIMEOUT
890 
891  elif status == self.CONNECTION_LOST:
892  self.onReceiverError(data)
893  return self.CONNECTION_LOST
894 
895  elif status == self.UNKNOWN_ERROR:
896  self.onReceiverError(data)
897  return self.UNKNOWN_ERROR
898 
899  else:
900  self.onReceiverError(data)
901  return self.PORT_ERROR
902 
903 
913  def onBufferWrite(self, data):
914  if self._listeners is not None and self._profile is not None:
915  self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE].notify(self._profile, data)
916  return
917 
918 
928  def onBufferFull(self, data):
929  if self._listeners is not None and self._profile is not None:
930  self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_FULL].notify(self._profile, data)
931  return
932 
933 
943  def onBufferWriteTimeout(self, data):
944  if self._listeners is not None and self._profile is not None:
945  self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE_TIMEOUT].notify(self._profile, data)
946  return
947 
948 
958  def onBufferRead(self, data):
959  if self._listeners is not None and self._profile is not None:
960  self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_READ].notify(self._profile, data)
961  return
962 
963 
973  def onSend(self, data):
974  if self._listeners is not None and self._profile is not None:
975  self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_SEND].notify(self._profile, data)
976  return
977 
978 
988  def onReceived(self, data):
989  if self._listeners is not None and self._profile is not None:
990  self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVED].notify(self._profile, data)
991  return
992 
993 
1003  def onReceiverFull(self, data):
1004  if self._listeners is not None and self._profile is not None:
1005  self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_FULL].notify(self._profile, data)
1006  return
1007 
1008 
1018  def onReceiverTimeout(self, data):
1019  if self._listeners is not None and self._profile is not None:
1020  self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_TIMEOUT].notify(self._profile, data)
1021  return
1022 
1023 
1033  def onReceiverError(self, data):
1034  if self._listeners is not None and self._profile is not None:
1035  self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_ERROR].notify(self._profile, data)
1036  return
1037 
1038 
1039 
1047  def onBufferEmpty(self):
1048  if self._listeners is not None and self._profile is not None:
1049  self._listeners.connector_[OpenRTM_aist.ConnectorListenerType.ON_BUFFER_EMPTY].notify(self._profile)
1050  return
1051 
1052 
1060  def onSenderEmpty(self):
1061  if self._listeners is not None and self._profile is not None:
1062  self._listeners.connector_[OpenRTM_aist.ConnectorListenerType.ON_SENDER_EMPTY].notify(self._profile)
1063  return
1064 
1065 
1073  def onSenderError(self):
1074  if self._listeners is not None and self._profile is not None:
1075  self._listeners.connector_[OpenRTM_aist.ConnectorListenerType.ON_SENDER_ERROR].notify(self._profile)
1076  return
1077 
1078 
1079 
1087  def bufferIsEmpty(self):
1088  if self._buffer and self._buffer.empty() and not self._readback:
1089  self._rtcout.RTC_DEBUG("buffer empty")
1090  self.onBufferEmpty()
1091  self.onSenderEmpty()
1092  return True
1093 
1094  return False
1095 
1096 
1097 
1099  OpenRTM_aist.PublisherFactory.instance().addFactory("periodic",
1101  OpenRTM_aist.Delete)
def onReceiverError(self, data)
Notify an ON_RECEIVER_ERROR event to listeners.
def onReceived(self, data)
Notify an ON_RECEIVED event to listeners.
def bufferIsEmpty(self)
Whether a buffer is empty.
def invokeListener(self, status, data)
Call listeners according to the DataPortStatus.
def setListener(self, info, listeners)
Set the listener.
def onReceiverFull(self, data)
Notify an ON_RECEIVER_FULL event to listeners.
def write(self, data, sec, usec)
Write data.
def onBufferWriteTimeout(self, data)
Notify an ON_BUFFER_WRITE_TIMEOUT event to listeners.
def onSend(self, data)
Notify an ON_SEND event to listners.
def svc(self)
Thread execution function A task execution function to be executed by coil::PeriodicTask.
def isActive(self)
If publisher is active state.
def onSenderError(self)
Notify an ON_SENDER_ERROR event to listenersinline void onSenderError()
def onReceiverTimeout(self, data)
Notify an ON_RECEIVER_TIMEOUT event to listeners.
def onBufferEmpty(self)
Notify an ON_BUFFER_EMPTY event to listenersinline void onBufferEmpty()
def onBufferRead(self, data)
Notify an ON_BUFFER_READ event to listeners.
int PORT_OK
DataPortStatus return codes.
def setConsumer(self, consumer)
Store InPort consumer.
def setBuffer(self, buffer)
Setting buffer pointer.
def convertReturn(self, status, data)
Convertion from BufferStatus to DataPortStatus.
def onBufferWrite(self, data)
Notify an ON_BUFFER_WRITE event to listeners.
def setPushPolicy(self, prop)
Setting PushPolicyvoid PublisherNew::setPushPolicy(const coil::Properties& prop)
def onBufferFull(self, data)
Notify an ON_BUFFER_FULL event to listeners.
def createTask(self, prop)
Setting Taskbool PublisherNew::createTask(const coil::Properties& prop)
def onSenderEmpty(self)
Notify an ON_SENDER_EMPTY event to listenersinline void onSenderEmpty()


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Mon Feb 28 2022 23:01:06