RingBuffer.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: euc-jp -*-
3 
4 
17 
18 import threading
19 import OpenRTM_aist
20 
21 
22 
42  """
43  """
44 
45  RINGBUFFER_DEFAULT_LENGTH = 8
46 
47 
71  def __init__(self, length=RINGBUFFER_DEFAULT_LENGTH):
72  self._overwrite = True
73  self._readback = True
74  self._timedwrite = False
75  self._timedread = False
78  self._length = length
79  self._wpos = 0
80  self._rpos = 0
81  self._fillcount = 0
82  self._wcount = 0
83  self._buffer = [None for i in range(self._length)]
84  self._pos_mutex = threading.RLock()
85  self._full_mutex = threading.RLock()
86  self._empty_mutex = threading.RLock()
87  self._full_cond = threading.Condition(self._full_mutex)
88  self._empty_cond = threading.Condition(self._empty_mutex)
89 
90 
91  self.reset()
92 
93 
94 
134  def init(self, prop):
135  self.__initLength(prop)
136  self.__initWritePolicy(prop)
137  self.__initReadPolicy(prop)
138 
139 
140 
158  def length(self, n = None):
159  if n is None:
160  guard = OpenRTM_aist.ScopedLock(self._pos_mutex)
161  return self._length
162 
163  if n < 1:
164  return OpenRTM_aist.BufferStatus.NOT_SUPPORTED
165 
166  self._buffer = [None for i in range(n)]
167  self._length = n
168  self.reset()
169  return OpenRTM_aist.BufferStatus.BUFFER_OK
170 
171 
172 
195  def reset(self):
196  guard = OpenRTM_aist.ScopedLock(self._pos_mutex)
197  self._fillcount = 0
198  self._wcount = 0
199  self._wpos = 0
200  self._rpos = 0
201  return OpenRTM_aist.BufferStatus.BUFFER_OK
202 
203 
204 
225  def wptr(self, n = 0):
226  guard = OpenRTM_aist.ScopedLock(self._pos_mutex)
227  return self._buffer[(self._wpos + n + self._length) % self._length]
228 
229 
230 
254  def advanceWptr(self, n = 1):
255  # n > 0 :
256  # n satisfies n <= writable elements
257  # n <= m_length - m_fillcout
258  # n < 0 : -n = n'
259  # n satisfies n'<= readable elements
260  # n'<= m_fillcount
261  # n >= - m_fillcount
262  guard = OpenRTM_aist.ScopedLock(self._pos_mutex)
263  if (n > 0 and n > (self._length - self._fillcount)) or \
264  (n < 0 and n < (-self._fillcount)):
265  return OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET
266 
267  self._wpos = (self._wpos + n + self._length) % self._length
268  self._fillcount += n
269  return OpenRTM_aist.BufferStatus.BUFFER_OK
270 
271 
272 
290 
291  #
292  # @param value Target data to write.
293  #
294  # @return BUFFER_OK: Successful
295  # BUFFER_ERROR: Failed
296  #
297  # @endif
298  #
299  # ReturnCode put(const DataType& value)
300  def put(self, value):
301  guard = OpenRTM_aist.ScopedLock(self._pos_mutex)
302  self._buffer[self._wpos] = value
303  return OpenRTM_aist.BufferStatus.BUFFER_OK
304 
305 
348  def write(self, value, sec = -1, nsec = 0):
349  try:
350  self._full_cond.acquire()
351  if self.full():
352  timedwrite = self._timedwrite # default is False
353  overwrite = self._overwrite # default is True
354 
355  if not (sec < 0): # if second arg is set -> block mode
356  timedwrite = True
357  overwrite = False
358 
359  if overwrite and not timedwrite: # "overwrite" mode
360  self.advanceRptr()
361 
362  elif not overwrite and not timedwrite: # "do_nothiong" mode
363  self._full_cond.release()
364  return OpenRTM_aist.BufferStatus.BUFFER_FULL
365 
366  elif not overwrite and timedwrite: # "block" mode
367 
368  if sec < 0:
369  sec = self._wtimeout.sec()
370  nsec = self._wtimeout.usec() * 1000
371 
372  # true: signaled, false: timeout
373  if not self._full_cond.wait(sec + (nsec/1000000000.0)):
374  self._full_cond.release()
375  return OpenRTM_aist.BufferStatus.TIMEOUT
376 
377  else: # unknown condition
378  self._full_cond.release()
379  return OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET
380 
381  self._full_cond.release()
382 
383  self.put(value)
384 
385  self._empty_cond.acquire()
386  empty = self.empty()
387  if empty:
388  self.advanceWptr(1)
389  self._empty_cond.notify()
390  else:
391  self.advanceWptr(1)
392  self._empty_cond.release()
393 
394  return OpenRTM_aist.BufferStatus.BUFFER_OK
395  except:
396  return OpenRTM_aist.BufferStatus.BUFFER_OK
397 
398 
399 
421  def writable(self):
422  guard = OpenRTM_aist.ScopedLock(self._pos_mutex)
423  return self._length - self._fillcount
424 
425 
426 
446  def full(self):
447  guard = OpenRTM_aist.ScopedLock(self._pos_mutex)
448  return self._length == self._fillcount
449 
450 
451 
470  def rptr(self, n = 0):
471  guard = OpenRTM_aist.ScopedLock(self._pos_mutex)
472  return self._buffer[(self._rpos + n + self._length) % self._length]
473 
474 
475 
497  def advanceRptr(self, n = 1):
498  # n > 0 :
499  # n satisfies n <= readable elements
500  # n <= m_fillcout
501  # n < 0 : -n = n'
502  # n satisfies n'<= m_length - m_fillcount
503  # n >= m_fillcount - m_length
504  guard = OpenRTM_aist.ScopedLock(self._pos_mutex)
505  if (n > 0 and n > self._fillcount) or \
506  (n < 0 and n < (self._fillcount - self._length)):
507  return OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET
508 
509  self._rpos = (self._rpos + n + self._length) % self._length
510  self._fillcount -= n
511  return OpenRTM_aist.BufferStatus.BUFFER_OK
512 
513 
514 
515 
540  def get(self, value=None):
541  guard = OpenRTM_aist.ScopedLock(self._pos_mutex)
542  if value is None:
543  return self._buffer[self._rpos]
544 
545  value[0] = self._buffer[self._rpos]
546  return OpenRTM_aist.BufferStatus.BUFFER_OK
547 
548 
549 
592  def read(self, value, sec = -1, nsec = 0):
593  self._empty_cond.acquire()
594 
595  if self.empty():
596  timedread = self._timedread
597  readback = self._readback
598 
599  if not (sec < 0): # if second arg is set -> block mode
600  timedread = True
601  readback = False
602  sec = self._rtimeout.sec()
603  nsec = self._rtimeout.usec() * 1000
604 
605  if readback and not timedread: # "readback" mode
606  if not self._wcount > 0:
607  self._empty_cond.release()
608  return OpenRTM_aist.BufferStatus.BUFFER_EMPTY
609  self.advanceRptr(-1)
610 
611  elif not readback and not timedread: # "do_nothiong" mode
612  self._empty_cond.release()
613  return OpenRTM_aist.BufferStatus.BUFFER_EMPTY
614 
615  elif not readback and timedread: # "block" mode
616  if sec < 0:
617  sec = self._rtimeout.sec()
618  nsec = self._rtimeout.usec() * 1000
619  # true: signaled, false: timeout
620  if not self._empty_cond.wait(sec + (nsec/1000000000.0)):
621  self._empty_cond.release()
622  return OpenRTM_aist.BufferStatus.TIMEOUT
623 
624  else: # unknown condition
625  self._empty_cond.release()
626  return OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET
627 
628  self._empty_cond.release()
629 
630  val = self.get()
631 
632  if len(value) > 0:
633  value[0] = val
634  else:
635  value.append(val)
636 
637  self._full_cond.acquire()
638  full_ = self.full()
639 
640  if full_:
641  self.advanceRptr()
642  self._full_cond.notify()
643  else:
644  self.advanceRptr()
645 
646  self._full_cond.release()
647 
648 
649  return OpenRTM_aist.BufferStatus.BUFFER_OK
650 
651 
652 
677  def readable(self):
678  guard = OpenRTM_aist.ScopedLock(self._pos_mutex)
679  return self._fillcount
680 
681 
682 
702  def empty(self):
703  guard = OpenRTM_aist.ScopedLock(self._pos_mutex)
704  return self._fillcount == 0
705 
706 
707 
708  def __initLength(self, prop):
709  if prop.getProperty("length"):
710  n = [0]
711  if OpenRTM_aist.stringTo(n, prop.getProperty("length")):
712  n = n[0]
713  if n > 0:
714  self.length(n)
715 
716 
717 
718  def __initWritePolicy(self, prop):
719  policy = OpenRTM_aist.normalize([prop.getProperty("write.full_policy")])
720 
721  if policy == "overwrite":
722  self._overwrite = True
723  self._timedwrite = False
724 
725  elif policy == "do_nothing":
726  self._overwrite = False
727  self._timedwrite = False
728 
729  elif policy == "block":
730  self._overwrite = False
731  self._timedwrite = True
732 
733  tm = [0.0]
734  if OpenRTM_aist.stringTo(tm, prop.getProperty("write.timeout")):
735  tm = tm[0]
736  if not (tm < 0):
737  self._wtimeout.set_time(tm)
738 
739 
740 
741  def __initReadPolicy(self, prop):
742  policy = prop.getProperty("read.empty_policy")
743 
744  if policy == "readback":
745  self._readback = True
746  self._timedread = False
747 
748  elif policy == "do_nothing":
749  self._readback = False
750  self._timedread = False
751 
752  elif policy == "block":
753  self._readback = False
754  self._timedread = True
755  tm = [0.0]
756  if OpenRTM_aist.stringTo(tm, prop.getProperty("read.timeout")):
757  self._rtimeout.set_time(tm[0])
def advanceRptr(self, n=1)
Get the buffer length.
Definition: BufferBase.py:326
BufferBase abstract class.
Definition: BufferBase.py:51
def length(self, n=None)
Get the buffer length.
Definition: RingBuffer.py:158
def full(self)
Check on whether the buffer is full.
Definition: RingBuffer.py:446
def __init__(self, length=RINGBUFFER_DEFAULT_LENGTH)
Constructor.
Definition: RingBuffer.py:71
def read(self, value, sec=-1, nsec=0)
Readout data from the buffer.
Definition: RingBuffer.py:592
def rptr(self, n=0)
Get the buffer length.
Definition: RingBuffer.py:470
def reset(self)
Get the buffer length.
Definition: RingBuffer.py:195
def empty(self)
Check on whether the buffer is empty.
Definition: BufferBase.py:418
def init(self, prop)
void init(const coil::Properties& prop)
Definition: RingBuffer.py:134
def readable(self)
Write data into the buffer.
Definition: RingBuffer.py:677
def writable(self)
Write data into the buffer.
Definition: RingBuffer.py:421
def write(self, value, sec=-1, nsec=0)
Write data into the buffer.
Definition: RingBuffer.py:348
def wptr(self, n=0)
Get the buffer length.
Definition: RingBuffer.py:225
def __initLength(self, prop)
void initLength(const coil::Properties& prop)
Definition: RingBuffer.py:708
def get(self)
Get data from the buffer.
Definition: BufferBase.py:347
def advanceWptr(self, n=1)
Get the buffer length.
Definition: RingBuffer.py:254
def get(self, value=None)
Write data into the buffer.
Definition: RingBuffer.py:540
def __initReadPolicy(self, prop)
void initReadPolicy(const coil::Properties& prop)
Definition: RingBuffer.py:741
def advanceWptr(self, n=1)
Get the buffer length.
Definition: BufferBase.py:189
def __initWritePolicy(self, prop)
void initWritePolicy(const coil::Properties& prop)
Definition: RingBuffer.py:718
def put(self, data)
Write data into the buffer.
Definition: BufferBase.py:209
def length(self)
Get the buffer length.
Definition: BufferBase.py:117
def reset(self)
Get the buffer length.
Definition: BufferBase.py:142
def put(self, value)
Write data into the buffer.
Definition: RingBuffer.py:300
def empty(self)
Check on whether the buffer is empty.
Definition: RingBuffer.py:702
def full(self)
Check on whether the buffer is full.
Definition: BufferBase.py:280
def advanceRptr(self, n=1)
Get the buffer length.
Definition: RingBuffer.py:497


openrtm_aist_python
Author(s): Shinji Kurihara
autogenerated on Mon Feb 28 2022 23:01:06