38 #ifndef ORO_BUFFER_LOCK_FREE_HPP 39 #define ORO_BUFFER_LOCK_FREE_HPP 41 #include "../os/oro_arch.h" 42 #include "../os/Atomic.hpp" 43 #include "../os/CAS.hpp" 45 #include "../internal/AtomicMWSRQueue.hpp" 46 #include "../internal/AtomicMWMRQueue.hpp" 47 #include "../internal/TsPool.hpp" 50 #ifdef ORO_PRAGMA_INTERFACE 101 : MAX_THREADS(options.max_threads())
102 , mcircular(options.circular()), initialized(false)
103 , bufs((!options.circular() && !options.multiple_readers()) ?
106 , mpool(new
internal::
TsPool<Item>(bufsize + options.max_threads()))
117 : MAX_THREADS(options.max_threads())
118 , mcircular(options.circular()), initialized(false)
119 , bufs((!options.circular() && !options.multiple_readers()) ?
122 , mpool(new
internal::
TsPool<Item>(bufsize + options.max_threads()))
138 if (!initialized || reset) {
189 return droppedSamples.
read();
194 if (!mcircular && (
capacity() == (size_type)bufs->
size() )) {
195 droppedSamples.
inc();
202 droppedSamples.
inc();
206 if (bufs->
dequeue( mitem ) == false ) {
207 droppedSamples.
inc();
216 if (bufs->
enqueue( mitem ) == false ) {
222 droppedSamples.
inc();
230 droppedSamples.
inc();
238 }
while ( bufs->
enqueue( mitem ) == false );
244 size_type
Push(
const std::vector<value_t>& items)
247 int towrite = items.size();
248 size_type written = 0;
249 typename std::vector<value_t>::const_iterator it;
250 for( it = items.begin(); it != items.end(); ++it) {
251 if ( this->
Push( *it ) == false ) {
256 droppedSamples.
add(towrite - written);
264 if (bufs->
dequeue( ipop ) == false )
272 size_type
Pop(std::vector<value_t>& items )
277 items.push_back( *ipop );
287 if (bufs->
dequeue( ipop ) == false )
void data_sample(const T &sample)
size_type Push(const std::vector< value_t > &items)
boost::call_traits< T >::reference reference_t
virtual bool data_sample(param_t sample, bool reset=true)
virtual size_type capacity() const =0
boost::call_traits< T >::param_type param_t
size_type Pop(std::vector< value_t > &items)
internal::TsPool< Item > *const mpool
FlowStatus Pop(reference_t item)
virtual bool isEmpty() const =0
virtual bool dequeue(T &result)=0
BufferBase::size_type size_type
internal::AtomicQueue< Item * > *const bufs
BufferInterface< T >::size_type size_type
virtual bool enqueue(const T &value)=0
RTT::os::AtomicInt droppedSamples
BufferLockFree(unsigned int bufsize, param_t initial_value, const Options &options=Options())
const unsigned int MAX_THREADS
The maximum number of threads.
bool deallocate(T *Value)
BufferLockFree(unsigned int bufsize, const Options &options=Options())
virtual value_t data_sample() const
BufferBase::Options Options
virtual size_type size() const =0
size_type capacity() const
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
BufferInterface< T >::param_t param_t
void Release(value_t *item)
virtual bool isFull() const =0
BufferInterface< T >::reference_t reference_t
value_t * PopWithoutRelease()
virtual size_type dropped() const