45 RINGBUFFER_DEFAULT_LENGTH = 8
71 def __init__(self, length=RINGBUFFER_DEFAULT_LENGTH):
164 return OpenRTM_aist.BufferStatus.NOT_SUPPORTED
166 self.
_buffer = [
None for i
in range(n)]
169 return OpenRTM_aist.BufferStatus.BUFFER_OK
201 return OpenRTM_aist.BufferStatus.BUFFER_OK
265 return OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET
269 return OpenRTM_aist.BufferStatus.BUFFER_OK
303 return OpenRTM_aist.BufferStatus.BUFFER_OK
348 def write(self, value, sec = -1, nsec = 0):
359 if overwrite
and not timedwrite:
362 elif not overwrite
and not timedwrite:
364 return OpenRTM_aist.BufferStatus.BUFFER_FULL
366 elif not overwrite
and timedwrite:
373 if not self.
_full_cond.wait(sec + (nsec/1000000000.0)):
375 return OpenRTM_aist.BufferStatus.TIMEOUT
379 return OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET
394 return OpenRTM_aist.BufferStatus.BUFFER_OK
396 return OpenRTM_aist.BufferStatus.BUFFER_OK
507 return OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET
511 return OpenRTM_aist.BufferStatus.BUFFER_OK
540 def get(self, value=None):
546 return OpenRTM_aist.BufferStatus.BUFFER_OK
592 def read(self, value, sec = -1, nsec = 0):
605 if readback
and not timedread:
608 return OpenRTM_aist.BufferStatus.BUFFER_EMPTY
611 elif not readback
and not timedread:
613 return OpenRTM_aist.BufferStatus.BUFFER_EMPTY
615 elif not readback
and timedread:
620 if not self.
_empty_cond.wait(sec + (nsec/1000000000.0)):
622 return OpenRTM_aist.BufferStatus.TIMEOUT
626 return OpenRTM_aist.BufferStatus.PRECONDITION_NOT_MET
649 return OpenRTM_aist.BufferStatus.BUFFER_OK
709 if prop.getProperty(
"length"):
711 if OpenRTM_aist.stringTo(n, prop.getProperty(
"length")):
719 policy = OpenRTM_aist.normalize([prop.getProperty(
"write.full_policy")])
721 if policy ==
"overwrite":
725 elif policy ==
"do_nothing":
729 elif policy ==
"block":
734 if OpenRTM_aist.stringTo(tm, prop.getProperty(
"write.timeout")):
742 policy = prop.getProperty(
"read.empty_policy")
744 if policy ==
"readback":
748 elif policy ==
"do_nothing":
752 elif policy ==
"block":
756 if OpenRTM_aist.stringTo(tm, prop.getProperty(
"read.timeout")):
def advanceRptr(self, n=1)
Get the buffer length.
BufferBase abstract class.
def length(self, n=None)
Get the buffer length.
def full(self)
Check on whether the buffer is full.
def __init__(self, length=RINGBUFFER_DEFAULT_LENGTH)
Constructor.
def read(self, value, sec=-1, nsec=0)
Readout data from the buffer.
def rptr(self, n=0)
Get the buffer length.
def reset(self)
Get the buffer length.
def empty(self)
Check on whether the buffer is empty.
def init(self, prop)
void init(const coil::Properties& prop)
def readable(self)
Write data into the buffer.
def writable(self)
Write data into the buffer.
def write(self, value, sec=-1, nsec=0)
Write data into the buffer.
def wptr(self, n=0)
Get the buffer length.
def __initLength(self, prop)
void initLength(const coil::Properties& prop)
def get(self)
Get data from the buffer.
def advanceWptr(self, n=1)
Get the buffer length.
def get(self, value=None)
Write data into the buffer.
def __initReadPolicy(self, prop)
void initReadPolicy(const coil::Properties& prop)
def advanceWptr(self, n=1)
Get the buffer length.
def __initWritePolicy(self, prop)
void initWritePolicy(const coil::Properties& prop)
def put(self, data)
Write data into the buffer.
def length(self)
Get the buffer length.
def reset(self)
Get the buffer length.
def put(self, value)
Write data into the buffer.
def empty(self)
Check on whether the buffer is empty.
def full(self)
Check on whether the buffer is full.
def advanceRptr(self, n=1)
Get the buffer length.