39 #ifndef ORO_LIST_LOCK_FREE_HPP 40 #define ORO_LIST_LOCK_FREE_HPP 43 #include "../os/oro_arch.h" 44 #include "../os/CAS.hpp" 45 #include <boost/intrusive_ptr.hpp> 46 #include "../rtt-config.h" 48 #ifdef ORO_PRAGMA_INTERFACE 98 typedef typename BufferType::iterator
Iterator;
99 typedef typename BufferType::const_iterator
CIterator;
126 typedef boost::intrusive_ptr<StorageImpl>
Storage;
128 Storage
newStorage(
size_t alloc,
size_t items,
bool init =
true)
130 Storage st(
new StorageImpl(alloc) );
131 for (
unsigned int i=0; i < alloc; ++i) {
132 (*st)[i].data.reserve( items );
151 return MAX_THREADS * 2;
165 : MAX_THREADS(
threads ), blankp(0), required(lsize)
167 const unsigned int BUF_NUM = BufNum();
168 bufs = newStorage( BUF_NUM, lsize );
182 Item* orig = lockAndGetActive(st);
183 res = orig->data.capacity();
196 Item* orig = lockAndGetActive(st);
197 res = orig->data.size();
210 Item* orig = lockAndGetActive(st);
211 res = orig->data.empty();
226 if (required > this->capacity()) {
227 this->reserve( required*2 );
254 if (lsize <= this->capacity() )
257 const unsigned int BUF_NUM = BufNum();
258 Storage res( newStorage(BUF_NUM, lsize,
false) );
261 Item* nextbuf = &(*res)[0];
290 orig = lockAndGetActive();
291 nextbuf->data.clear();
292 Iterator it( orig->data.begin() );
293 while ( it != orig->data.end() ) {
294 nextbuf->data.push_back( *it );
301 }
while (
os::CAS(&active, orig, nextbuf ) ==
false);
304 assert( pointsTo( active, bufs ) );
326 orig = lockAndGetActive(bufptr);
328 nextbuf = findEmptyBuf(bufptr);
329 nextbuf->data.clear();
330 }
while (
os::CAS(&active, orig, nextbuf ) ==
false );
354 orig = lockAndGetActive( bufptr );
355 if ( orig->data.size() == orig->data.capacity() ) {
359 usingbuf = findEmptyBuf( bufptr );
360 usingbuf->data = orig->data;
361 usingbuf->data.push_back( item );
362 }
while (
os::CAS(&active, orig, usingbuf ) ==
false);
376 Item* orig = lockAndGetActive(bufptr);
377 value_t ret(orig->data.front());
388 Item* orig = lockAndGetActive(bufptr);
389 value_t ret(orig->data.back());
401 size_t append(
const std::vector<T>& items)
405 int towrite = items.size();
413 orig = lockAndGetActive( bufptr );
414 int maxwrite = orig->data.capacity() - orig->data.size();
415 if ( maxwrite == 0 ) {
419 if ( towrite > maxwrite )
421 usingbuf = findEmptyBuf( bufptr );
422 usingbuf->data = orig->data;
423 usingbuf->data.insert( usingbuf->data.end(), items.begin(), items.begin() + towrite );
424 }
while (
os::CAS(&active, orig, usingbuf ) ==false );
448 orig = lockAndGetActive( bufptr );
450 nextbuf = findEmptyBuf( bufptr );
451 Iterator it( orig->data.begin() );
452 while (it != orig->data.end() && !( *it == item ) ) {
453 nextbuf->data.push_back( *it );
456 if ( it == orig->data.end() ) {
462 while ( it != orig->data.end() ) {
463 nextbuf->data.push_back( *it );
466 }
while (
os::CAS(&active, orig, nextbuf ) ==
false );
479 template<
typename Pred>
484 bool removed_sth =
false;
492 orig = lockAndGetActive( bufptr );
494 nextbuf = findEmptyBuf( bufptr );
496 Iterator it(orig->data.begin());
497 while (it != orig->data.end()) {
499 nextbuf->data.push_back( *it );
511 }
while (
os::CAS(&active, orig, nextbuf ) ==
false );
523 template<
class Function>
527 Item* orig = lockAndGetActive(st);
528 Iterator it( orig->data.begin() );
529 while ( it != orig->data.end() ) {
551 template<
class Function>
555 Item* orig = lockAndGetActive(st);
556 Item* newp = findEmptyBuf(st);
557 Iterator it( orig->data.begin() );
559 while ( it != orig->data.end() ) {
560 newp->data.push_back( *it );
564 it = blankp->data.begin();
566 while ( it != blankp->data.end() ) {
600 bool res = this->erase(item);
601 Item* orig = lockAndGetBlank(st);
603 Iterator it( orig->data.begin() );
605 while ( *it != item ) {
607 if (it == orig->data.end() ) {
625 template<
class Function>
626 value_t
find_if( Function func, value_t blank = value_t() )
629 Item* orig = lockAndGetActive(st);
630 Iterator it( orig->data.begin() );
631 while ( it != orig->data.end() ) {
632 if (func( *it ) == true ) {
652 Item* start = &(*bufptr)[0];
658 if (start == &(*bufptr)[0] + BufNum() )
659 start = &(*bufptr)[0];
661 assert( pointsTo(start, bufptr) );
680 if ( pointsTo(orig, bufptr) )
688 }
while ( active != orig );
689 assert( pointsTo(orig, bufptr) );
709 }
while ( active != orig );
726 if ( pointsTo(orig, bufptr) )
734 }
while ( blankp != orig );
735 assert( pointsTo(orig, bufptr) );
739 inline bool pointsTo( Item* p,
const Storage& bf )
const {
740 return p >= &(*bf)[0] && p <= &(*bf)[ BufNum() - 1 ];
const unsigned int MAX_THREADS
The maximum number of threads.
BufferType::const_iterator CIterator
Item * findEmptyBuf(Storage &bufptr)
void reserve(size_t lsize)
size_t append(const std::vector< T > &items)
void RTT_API intrusive_ptr_add_ref(RTT::internal::IntrusiveStorage *p)
int oro_atomic_inc_and_test(oro_atomic_t *a)
Item * lockAndGetActive(Storage &bufptr) const
StorageImpl(size_t alloc)
std::vector< value_t > BufferType
bool erase_and_blank(value_t item, value_t blank)
void RTT_API intrusive_ptr_release(RTT::internal::IntrusiveStorage *p)
BufferType::iterator Iterator
bool CAS(volatile T *addr, const V &expected, const W &value)
void apply(Function func)
void oro_atomic_inc(oro_atomic_t *a)
void shrink(size_t items=1)
void apply_and_blank(Function func, value_t blank)
ListLockFree(unsigned int lsize, unsigned int threads=ORONUM_OS_MAX_THREADS)
value_t find_if(Function func, value_t blank=value_t())
Item * lockAndGetActive() const
void oro_atomic_set(oro_atomic_t *a, int n)
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
boost::intrusive_ptr< StorageImpl > Storage
void grow(size_t items=1)
void oro_atomic_dec(oro_atomic_t *a)
Item * lockAndGetBlank(Storage &bufptr) const
bool append(value_t item)
Storage newStorage(size_t alloc, size_t items, bool init=true)
bool delete_if(Pred pred)
bool pointsTo(Item *p, const Storage &bf) const