45 RINGBUFFER_DEFAULT_LENGTH = 8
71 def __init__(self, length=RINGBUFFER_DEFAULT_LENGTH):
164 return OpenRTM_aist.BufferStatus.NOT_SUPPORTED
166 self.
_buffer = [
None for i
in range(n)]
169 return OpenRTM_aist.BufferStatus.BUFFER_OK
201 return OpenRTM_aist.BufferStatus.BUFFER_OK
265 return OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET
269 return OpenRTM_aist.BufferStatus.BUFFER_OK
303 return OpenRTM_aist.BufferStatus.BUFFER_OK
348 def write(self, value, sec = -1, nsec = 0):
350 self._full_cond.acquire()
359 if overwrite
and not timedwrite:
362 elif not overwrite
and not timedwrite:
363 self._full_cond.release()
364 return OpenRTM_aist.BufferStatus.BUFFER_FULL
366 elif not overwrite
and timedwrite:
369 sec = self._wtimeout.sec()
370 nsec = self._wtimeout.usec() * 1000
373 if not self._full_cond.wait(sec + (nsec/1000000000.0)):
374 self._full_cond.release()
375 return OpenRTM_aist.BufferStatus.TIMEOUT
378 self._full_cond.release()
379 return OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET
381 self._full_cond.release()
385 self._empty_cond.acquire()
389 self._empty_cond.notify()
392 self._empty_cond.release()
394 return OpenRTM_aist.BufferStatus.BUFFER_OK
396 return OpenRTM_aist.BufferStatus.BUFFER_OK
507 return OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET
511 return OpenRTM_aist.BufferStatus.BUFFER_OK
540 def get(self, value=None):
546 return OpenRTM_aist.BufferStatus.BUFFER_OK
592 def read(self, value, sec = -1, nsec = 0):
593 self._empty_cond.acquire()
602 sec = self._rtimeout.sec()
603 nsec = self._rtimeout.usec() * 1000
605 if readback
and not timedread:
607 self._empty_cond.release()
608 return OpenRTM_aist.BufferStatus.BUFFER_EMPTY
611 elif not readback
and not timedread:
612 self._empty_cond.release()
613 return OpenRTM_aist.BufferStatus.BUFFER_EMPTY
615 elif not readback
and timedread:
617 sec = self._rtimeout.sec()
618 nsec = self._rtimeout.usec() * 1000
620 if not self._empty_cond.wait(sec + (nsec/1000000000.0)):
621 self._empty_cond.release()
622 return OpenRTM_aist.BufferStatus.TIMEOUT
625 self._empty_cond.release()
626 return OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET
628 self._empty_cond.release()
637 self._full_cond.acquire()
642 self._full_cond.notify()
646 self._full_cond.release()
649 return OpenRTM_aist.BufferStatus.BUFFER_OK
709 if prop.getProperty(
"length"):
711 if OpenRTM_aist.stringTo(n, prop.getProperty(
"length")):
719 policy = OpenRTM_aist.normalize([prop.getProperty(
"write.full_policy")])
721 if policy ==
"overwrite":
725 elif policy ==
"do_nothing":
729 elif policy ==
"block":
734 if OpenRTM_aist.stringTo(tm, prop.getProperty(
"write.timeout")):
737 self._wtimeout.set_time(tm)
742 policy = prop.getProperty(
"read.empty_policy")
744 if policy ==
"readback":
748 elif policy ==
"do_nothing":
752 elif policy ==
"block":
756 if OpenRTM_aist.stringTo(tm, prop.getProperty(
"read.timeout")):
757 self._rtimeout.set_time(tm[0])
def advanceRptr(self, n=1)
Get the buffer length.
BufferBase abstract class.
def length(self, n=None)
Get the buffer length.
def full(self)
Check on whether the buffer is full.
def __init__(self, length=RINGBUFFER_DEFAULT_LENGTH)
Constructor.
def read(self, value, sec=-1, nsec=0)
Readout data from the buffer.
def rptr(self, n=0)
Get the buffer length.
def reset(self)
Get the buffer length.
def empty(self)
Check on whether the buffer is empty.
def init(self, prop)
void init(const coil::Properties& prop)
def readable(self)
Write data into the buffer.
def writable(self)
Write data into the buffer.
def write(self, value, sec=-1, nsec=0)
Write data into the buffer.
def wptr(self, n=0)
Get the buffer length.
def __initLength(self, prop)
void initLength(const coil::Properties& prop)
def get(self)
Get data from the buffer.
def advanceWptr(self, n=1)
Get the buffer length.
def get(self, value=None)
Write data into the buffer.
def __initReadPolicy(self, prop)
void initReadPolicy(const coil::Properties& prop)
def advanceWptr(self, n=1)
Get the buffer length.
def __initWritePolicy(self, prop)
void initWritePolicy(const coil::Properties& prop)
def put(self, data)
Write data into the buffer.
def length(self)
Get the buffer length.
def reset(self)
Get the buffer length.
def put(self, value)
Write data into the buffer.
def empty(self)
Check on whether the buffer is empty.
def full(self)
Check on whether the buffer is full.
def advanceRptr(self, n=1)
Get the buffer length.