PublisherNew.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: euc-jp -*-
3 
4 
17 
18 import threading
19 
20 import OpenRTM_aist
21 
22 
23 
47  """
48  """
49 
50  # Policy
51  ALL = 0
52  FIFO = 1
53  SKIP = 2
54  NEW = 3
55 
56 
70  def __init__(self):
71  self._rtcout = OpenRTM_aist.Manager.instance().getLogbuf("PublisherNew")
72  self._consumer = None
73  self._buffer = None
74  self._task = None
75  self._retcode = self.PORT_OK
76  self._retmutex = threading.RLock()
77  self._pushPolicy = self.NEW
78  self._skipn = 0
79  self._active = False
80  self._leftskip = 0
81  self._profile = None
82  self._listeners = None
83 
84 
96  def __del__(self):
97  self._rtcout.RTC_TRACE("~PublisherNew()")
98  if self._task:
99  self._task.resume()
100  self._task.finalize()
101 
102  OpenRTM_aist.PeriodicTaskFactory.instance().deleteObject(self._task)
103  del self._task
104  self._rtcout.RTC_PARANOID("task deleted.")
105 
106  # "consumer" should be deleted in the Connector
107  self._consumer = 0
108  # "buffer" should be deleted in the Connector
109  self._buffer = 0
110  return
111 
112 
120  def setPushPolicy(self, prop):
121  push_policy = prop.getProperty("publisher.push_policy","new")
122  self._rtcout.RTC_DEBUG("push_policy: %s", push_policy)
123 
124  push_policy = OpenRTM_aist.normalize([push_policy])
125 
126  if push_policy == "all":
127  self._pushPolicy = self.ALL
128 
129  elif push_policy == "fifo":
130  self._pushPolicy = self.FIFO
131 
132  elif push_policy == "skip":
133  self._pushPolicy = self.SKIP
134 
135  elif push_policy == "new":
136  self._pushPolicy = self.NEW
137 
138  else:
139  self._rtcout.RTC_ERROR("invalid push_policy value: %s", push_policy)
140  self._pushPolicy = self.NEW
141 
142  skip_count = prop.getProperty("publisher.skip_count","0")
143  self._rtcout.RTC_DEBUG("skip_count: %s", skip_count)
144 
145  skipn = [self._skipn]
146  ret = OpenRTM_aist.stringTo(skipn, skip_count)
147  if ret:
148  self._skipn = skipn[0]
149  else:
150  self._rtcout.RTC_ERROR("invalid skip_count value: %s", skip_count)
151  self._skipn = 0
152 
153  if self._skipn < 0:
154  self._rtcout.RTC_ERROR("invalid skip_count value: %d", self._skipn)
155  self._skipn = 0
156 
157  return
158 
159 
167  def createTask(self, prop):
168  factory = OpenRTM_aist.PeriodicTaskFactory.instance()
169 
170  th = factory.getIdentifiers()
171  self._rtcout.RTC_DEBUG("available task types: %s", OpenRTM_aist.flatten(th))
172 
173  self._task = factory.createObject(prop.getProperty("thread_type", "default"))
174 
175  if not self._task:
176  self._rtcout.RTC_ERROR("Task creation failed: %s",
177  prop.getProperty("thread_type", "default"))
178  return self.INVALID_ARGS
179 
180  self._rtcout.RTC_PARANOID("Task creation succeeded.")
181 
182  mprop = prop.getNode("measurement")
183 
184  # setting task function
185  self._task.setTask(self.svc)
186  self._task.setPeriod(0.0)
187  self._task.executionMeasure(OpenRTM_aist.toBool(mprop.getProperty("exec_time"),
188  "enable", "disable", True))
189  ecount = [0]
190  if OpenRTM_aist.stringTo(ecount, mprop.getProperty("exec_count")):
191  self._task.executionMeasureCount(ecount[0])
192 
193  self._task.periodicMeasure(OpenRTM_aist.toBool(mprop.getProperty("period_time"),
194  "enable", "disable", True))
195  pcount = [0]
196  if OpenRTM_aist.stringTo(pcount, mprop.getProperty("period_count")):
197  self._task.periodicMeasureCount(pcount[0])
198 
199  self._task.suspend()
200  self._task.activate()
201  self._task.suspend()
202 
203  return self.PORT_OK
204 
205 
257  def init(self, prop):
258  self._rtcout.RTC_TRACE("init()")
259  self.setPushPolicy(prop)
260  return self.createTask(prop)
261 
262 
288  def setConsumer(self, consumer):
289  self._rtcout.RTC_TRACE("setConsumer()")
290 
291  if not consumer:
292  self._rtcout.RTC_ERROR("setConsumer(consumer = 0): invalid argument.")
293  return self.INVALID_ARGS
294 
295  self._consumer = consumer
296  return self.PORT_OK
297 
298 
324  def setBuffer(self, buffer):
325  self._rtcout.RTC_TRACE("setBuffer()")
326 
327  if not buffer:
328  self._rtcout.RTC_ERROR("setBuffer(buffer == 0): invalid argument")
329  return self.INVALID_ARGS
330 
331  self._buffer = buffer
332  return self.PORT_OK
333 
334 
369  def setListener(self, info, listeners):
370  self._rtcout.RTC_TRACE("setListener()")
371 
372  if not listeners:
373  self._rtcout.RTC_ERROR("setListeners(listeners == 0): invalid argument")
374  return self.INVALID_ARGS
375 
376  self._profile = info
377  self._listeners = listeners
378 
379  return self.PORT_OK
380 
381 
459  def write(self, data, sec, usec):
460  self._rtcout.RTC_PARANOID("write()")
461 
462  if not self._consumer or not self._buffer or not self._listeners:
463  return self.PRECONDITION_NOT_MET
464 
465  if self._retcode == self.CONNECTION_LOST:
466  self._rtcout.RTC_DEBUG("write(): connection lost.")
467  return self._retcode
468 
469  if self._retcode == self.SEND_FULL:
470  self._rtcout.RTC_DEBUG("write(): InPort buffer is full.")
471  ret = self._buffer.write(data, sec, usec)
472  self._task.signal()
473  return self.BUFFER_FULL
474 
475  # why?
476  assert(self._buffer != 0)
477 
478  self.onBufferWrite(data)
479  ret = self._buffer.write(data, sec, usec)
480 
481  self._task.signal()
482  self._rtcout.RTC_DEBUG("%s = write()", OpenRTM_aist.DataPortStatus.toString(ret))
483 
484  return self.convertReturn(ret, data)
485 
486 
514  def isActive(self):
515  return self._active
516 
517 
543  def activate(self):
544  self._active = True
545  return self.PORT_OK
546 
547 
573  def deactivate(self):
574  self._active = False;
575  return self.PORT_OK
576 
577 
591  def svc(self):
592  guard = OpenRTM_aist.ScopedLock(self._retmutex)
593 
594  if self._pushPolicy == self.ALL:
595  self._retcode = self.pushAll()
596  return 0
597  elif self._pushPolicy == self.FIFO:
598  self._retcode = self.pushFifo()
599  return 0
600  elif self._pushPolicy == self.SKIP:
601  self._retcode = self.pushSkip()
602  return 0
603  elif self._pushPolicy == self.NEW:
604  self._retcode = self.pushNew()
605  return 0
606  else:
607  self._retcode = self.pushNew()
608 
609  return 0
610 
611 
615  def pushAll(self):
616  self._rtcout.RTC_TRACE("pushAll()")
617  try:
618 
619  while self._buffer.readable() > 0:
620  cdr = self._buffer.get()
621  self.onBufferRead(cdr)
622 
623  self.onSend(cdr)
624  ret = self._consumer.put(cdr)
625 
626  if ret != self.PORT_OK:
627  self._rtcout.RTC_DEBUG("%s = consumer.put()", OpenRTM_aist.DataPortStatus.toString(ret))
628  return self.invokeListener(ret, cdr)
629  self.onReceived(cdr)
630 
631  self._buffer.advanceRptr()
632 
633  return self.PORT_OK
634  except:
635  self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
636  return self.CONNECTION_LOST
637 
638  return self.PORT_ERROR
639 
640 
644  def pushFifo(self):
645  self._rtcout.RTC_TRACE("pushFifo()")
646 
647  try:
648  cdr = self._buffer.get()
649  self.onBufferRead(cdr)
650 
651  self.onSend(cdr)
652  ret = self._consumer.put(cdr)
653 
654  if ret != self.PORT_OK:
655  self._rtcout.RTC_DEBUG("%s = consumer.put()", OpenRTM_aist.DataPortStatus.toString(ret))
656  return self.invokeListener(ret, cdr)
657  self.onReceived(cdr)
658 
659  self._buffer.advanceRptr()
660 
661  return self.PORT_OK
662  except:
663  self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
664  return self.CONNECTION_LOST
665 
666  return self.PORT_ERROR
667 
668 
672  def pushSkip(self):
673  self._rtcout.RTC_TRACE("pushSkip()")
674  try:
675  ret = self.PORT_OK
676  preskip = self._buffer.readable() + self._leftskip
677  loopcnt = preskip/(self._skipn+1)
678  postskip = self._skipn - self._leftskip
679 
680  for i in range(loopcnt):
681  self._buffer.advanceRptr(postskip)
682  cdr = self._buffer.get()
683  self.onBufferRead(cdr)
684 
685  self.onSend(cdr)
686  ret = self._consumer.put(cdr)
687  if ret != self.PORT_OK:
688  self._buffer.advanceRptr(-postskip)
689  self._rtcout.RTC_DEBUG("%s = consumer.put()", OpenRTM_aist.DataPortStatus.toString(ret))
690  return self.invokeListener(ret, cdr)
691 
692  self.onReceived(cdr)
693  postskip = self._skipn + 1
694 
695  self._buffer.advanceRptr(self._buffer.readable())
696 
697  if loopcnt == 0:
698  # Not put
699  self._leftskip = preskip % (self._skipn + 1)
700  else:
701  if self._retcode != self.PORT_OK:
702  # put Error after
703  self._leftskip = 0
704  else:
705  # put OK after
706  self._leftskip = preskip % (self._skipn + 1)
707 
708  return ret
709 
710  except:
711  self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
712  return self.CONNECTION_LOST
713 
714  return self.PORT_ERROR
715 
716 
720  def pushNew(self):
721  self._rtcout.RTC_TRACE("pushNew()")
722  try:
723  self._buffer.advanceRptr(self._buffer.readable() - 1)
724 
725  cdr = self._buffer.get()
726  self.onBufferRead(cdr)
727 
728  self.onSend(cdr)
729  ret = self._consumer.put(cdr)
730 
731  if ret != self.PORT_OK:
732  self._rtcout.RTC_DEBUG("%s = consumer.put()", OpenRTM_aist.DataPortStatus.toString(ret))
733  return self.invokeListener(ret, cdr)
734 
735  self.onReceived(cdr)
736  self._buffer.advanceRptr()
737 
738  return self.PORT_OK
739 
740  except:
741  self._rtcout.RTC_ERROR(OpenRTM_aist.Logger.print_exception())
742  return self.CONNECTION_LOST
743 
744  return self.PORT_ERROR
745 
746 
804  def convertReturn(self, status, data):
805 
815  if status == OpenRTM_aist.BufferStatus.BUFFER_OK:
816  return self.PORT_OK
817 
818  elif status == OpenRTM_aist.BufferStatus.BUFFER_ERROR:
819  return self.BUFFER_ERROR
820 
821  elif status == OpenRTM_aist.BufferStatus.BUFFER_FULL:
822  self.onBufferFull(data)
823  return self.BUFFER_FULL
824 
825  elif status == OpenRTM_aist.BufferStatus.NOT_SUPPORTED:
826  return self.PORT_ERROR
827 
828  elif status == OpenRTM_aist.BufferStatus.TIMEOUT:
829  self.onBufferWriteTimeout(data)
830  return self.BUFFER_TIMEOUT
831 
832  elif status == OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET:
833  return self.PRECONDITION_NOT_MET
834 
835  else:
836  return self.PORT_ERROR
837 
838 
858  def invokeListener(self, status, data):
859  # ret:
860  # PORT_OK, PORT_ERROR, SEND_FULL, SEND_TIMEOUT, CONNECTION_LOST,
861  # UNKNOWN_ERROR
862  if status == self.PORT_ERROR:
863  self.onReceiverError(data)
864  return self.PORT_ERROR
865 
866  elif status == self.SEND_FULL:
867  self.onReceiverFull(data)
868  return self.SEND_FULL
869 
870  elif status == self.SEND_TIMEOUT:
871  self.onReceiverTimeout(data)
872  return self.SEND_TIMEOUT
873 
874  elif status == self.CONNECTION_LOST:
875  self.onReceiverError(data)
876  return self.CONNECTION_LOST
877 
878  elif status == self.UNKNOWN_ERROR:
879  self.onReceiverError(data)
880  return self.UNKNOWN_ERROR
881 
882  else:
883  self.onReceiverError(data)
884  return self.PORT_ERROR
885 
886 
896  def onBufferWrite(self, data):
897  if self._listeners is not None and self._profile is not None:
898  self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE].notify(self._profile, data)
899  return
900 
901 
911  def onBufferFull(self, data):
912  if self._listeners is not None and self._profile is not None:
913  self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_FULL].notify(self._profile, data)
914  return
915 
916 
926  def onBufferWriteTimeout(self, data):
927  if self._listeners is not None and self._profile is not None:
928  self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_WRITE_TIMEOUT].notify(self._profile, data)
929  return
930 
931 
941  def onBufferWriteOverwrite(self, data):
942  if self._listeners is not None and self._profile is not None:
943  self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_OVERWRITE].notify(self._profile, data)
944  return
945 
946 
956  def onBufferRead(self, data):
957  if self._listeners is not None and self._profile is not None:
958  self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_BUFFER_READ].notify(self._profile, data)
959  return
960 
961 
971  def onSend(self, data):
972  if self._listeners is not None and self._profile is not None:
973  self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_SEND].notify(self._profile, data)
974  return
975 
976 
986  def onReceived(self, data):
987  if self._listeners is not None and self._profile is not None:
988  self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVED].notify(self._profile, data)
989  return
990 
991 
1001  def onReceiverFull(self, data):
1002  if self._listeners is not None and self._profile is not None:
1003  self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_FULL].notify(self._profile, data)
1004  return
1005 
1006 
1016  def onReceiverTimeout(self, data):
1017  if self._listeners is not None and self._profile is not None:
1018  self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_TIMEOUT].notify(self._profile, data)
1019  return
1020 
1021 
1031  def onReceiverError(self, data):
1032  if self._listeners is not None and self._profile is not None:
1033  self._listeners.connectorData_[OpenRTM_aist.ConnectorDataListenerType.ON_RECEIVER_ERROR].notify(self._profile, data)
1034  return
1035 
1036 
1046  def onSenderError(self):
1047  if self._listeners is not None and self._profile is not None:
1048  self._listeners.connector_[OpenRTM_aist.ConnectorListenerType.ON_SENDER_ERROR].notify(self._profile)
1049  return
1050 
1051 
1052 
1054  OpenRTM_aist.PublisherFactory.instance().addFactory("new",
1056  OpenRTM_aist.Delete)
def onReceiverTimeout(self, data)
Notify an ON_RECEIVER_TIMEOUT event to listeners.
def svc(self)
Thread execution function.
def pushFifo(self)
push "fifo" policy
def onBufferWrite(self, data)
Notify an ON_BUFFER_WRITE event to listeners.
def init(self, prop)
Initialization.
def pushSkip(self)
push "skip" policy
def write(self, data, sec, usec)
Write data.
def onBufferWriteOverwrite(self, data)
Notify an ON_BUFFER_OVERWRITE event to listeners.
def convertReturn(self, status, data)
Convertion from BufferStatus to DataPortStatus.
def invokeListener(self, status, data)
Call listeners according to the DataPortStatus.
def setPushPolicy(self, prop)
Setting PushPolicyvoid PublisherNew::setPushPolicy(const coil::Properties& prop)
def setConsumer(self, consumer)
Store InPort consumer.
def pushNew(self)
push "new" policy
def createTask(self, prop)
Setting Taskbool PublisherNew::createTask(const coil::Properties& prop)
def onBufferFull(self, data)
Notify an ON_BUFFER_FULL event to listeners.
def onReceiverError(self, data)
Notify an ON_RECEIVER_ERROR event to listeners.
def pushAll(self)
push all policy
def onBufferWriteTimeout(self, data)
Notify an ON_BUFFER_WRITE_TIMEOUT event to listeners.
def onSenderError(self)
Notify an ON_SENDER_ERROR event to listeners.
def onSend(self, data)
Notify an ON_SEND event to listners.
int PORT_OK
DataPortStatus return codes.
def onBufferRead(self, data)
Notify an ON_BUFFER_READ event to listeners.
def setBuffer(self, buffer)
Setting buffer pointer.
def onReceived(self, data)
Notify an ON_RECEIVED event to listeners.
def isActive(self)
If publisher is active state.
def onReceiverFull(self, data)
Notify an ON_RECEIVER_FULL event to listeners.
def setListener(self, info, listeners)
Set the listener.


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Mon Feb 28 2022 23:01:06