Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #ifndef ORO_LIST_LOCK_FREE_HPP
00040 #define ORO_LIST_LOCK_FREE_HPP
00041
00042 #include <vector>
00043 #include "../os/oro_arch.h"
00044 #include "../os/CAS.hpp"
00045 #include <boost/intrusive_ptr.hpp>
00046 #include "../rtt-config.h"
00047
00048 #ifdef ORO_PRAGMA_INTERFACE
00049 #pragma interface
00050 #endif
00051
00052 namespace RTT
00053 {
00054 namespace internal {
00055 struct RTT_API IntrusiveStorage
00056 {
00057 oro_atomic_t ref;
00058 IntrusiveStorage();
00059 virtual ~IntrusiveStorage();
00060 };
00061
00062 void RTT_API intrusive_ptr_add_ref(RTT::internal::IntrusiveStorage* p );
00063 void RTT_API intrusive_ptr_release(RTT::internal::IntrusiveStorage* p );
00064
00065 }
00066 }
00067
00068
00069 namespace RTT
00070 { namespace internal {
00071
00084 template< class T>
00085 class ListLockFree
00086 {
00087 public:
00093 const unsigned int MAX_THREADS;
00094
00095 typedef T value_t;
00096 private:
00097 typedef std::vector<value_t> BufferType;
00098 typedef typename BufferType::iterator Iterator;
00099 typedef typename BufferType::const_iterator CIterator;
00100 struct Item {
00101 Item() {
00102
00103 oro_atomic_set(&count,-1);
00104 }
00105 mutable oro_atomic_t count;
00106 BufferType data;
00107 };
00108
00109 struct StorageImpl : public IntrusiveStorage
00110 {
00111 Item* items;
00112 StorageImpl(size_t alloc) : items( new Item[alloc] ) {
00113 }
00114 ~StorageImpl() {
00115 delete[] items;
00116 }
00117 Item& operator[](int i) {
00118 return items[i];
00119 }
00120 };
00121
00126 typedef boost::intrusive_ptr<StorageImpl> Storage;
00127
00128 Storage newStorage(size_t alloc, size_t items, bool init = true)
00129 {
00130 Storage st( new StorageImpl(alloc) );
00131 for (unsigned int i=0; i < alloc; ++i) {
00132 (*st)[i].data.reserve( items );
00133 }
00134
00135 if (init) {
00136 active = &(*st)[0];
00137 oro_atomic_inc( &active->count );
00138 }
00139
00140 return st;
00141 }
00142
00143 Storage bufs;
00144 Item* volatile active;
00145 Item* volatile blankp;
00146
00147
00148
00149
00150 inline size_t BufNum() const {
00151 return MAX_THREADS * 2;
00152 }
00153
00154 size_t required;
00155 public:
00164 ListLockFree(unsigned int lsize, unsigned int threads = ORONUM_OS_MAX_THREADS )
00165 : MAX_THREADS( threads ), blankp(0), required(lsize)
00166 {
00167 const unsigned int BUF_NUM = BufNum();
00168 bufs = newStorage( BUF_NUM, lsize );
00169 }
00170
00171 ~ListLockFree() {
00172 }
00173
00178 size_t capacity() const
00179 {
00180 size_t res;
00181 Storage st;
00182 Item* orig = lockAndGetActive(st);
00183 res = orig->data.capacity();
00184 oro_atomic_dec( &orig->count );
00185 return res;
00186 }
00187
00192 size_t size() const
00193 {
00194 size_t res;
00195 Storage st;
00196 Item* orig = lockAndGetActive(st);
00197 res = orig->data.size();
00198 oro_atomic_dec( &orig->count );
00199 return res;
00200 }
00201
00206 bool empty() const
00207 {
00208 bool res;
00209 Storage st;
00210 Item* orig = lockAndGetActive(st);
00211 res = orig->data.empty();
00212 oro_atomic_dec( &orig->count );
00213 return res;
00214 }
00215
00224 void grow(size_t items = 1) {
00225 required += items;
00226 if (required > this->capacity()) {
00227 this->reserve( required*2 );
00228 }
00229 }
00238 void shrink(size_t items = 1) {
00239 required -= items;
00240 }
00241
00252 void reserve(size_t lsize)
00253 {
00254 if (lsize <= this->capacity() )
00255 return;
00256
00257 const unsigned int BUF_NUM = BufNum();
00258 Storage res( newStorage(BUF_NUM, lsize, false) );
00259
00260
00261 Item* nextbuf = &(*res)[0];
00262 oro_atomic_inc( &nextbuf->count );
00263
00264
00265 Item* orig = 0;
00266
00267
00268
00269 Storage save = bufs;
00270
00271
00272
00273
00274
00275 bufs = res;
00276
00277
00278
00279
00280
00281
00282
00283
00284
00285
00286
00287 do {
00288 if (orig)
00289 oro_atomic_dec(&orig->count);
00290 orig = lockAndGetActive();
00291 nextbuf->data.clear();
00292 Iterator it( orig->data.begin() );
00293 while ( it != orig->data.end() ) {
00294 nextbuf->data.push_back( *it );
00295 ++it;
00296 }
00297
00298
00299
00300
00301 } while ( os::CAS(&active, orig, nextbuf ) == false);
00302
00303
00304 assert( pointsTo( active, bufs ) );
00305
00306 oro_atomic_dec( &orig->count );
00307 oro_atomic_dec( &orig->count );
00308 }
00309
00316 void clear()
00317 {
00318 Storage bufptr;
00319 Item* orig(0);
00320 Item* nextbuf(0);
00321 do {
00322 if (orig) {
00323 oro_atomic_dec(&orig->count);
00324 oro_atomic_dec(&nextbuf->count);
00325 }
00326 orig = lockAndGetActive(bufptr);
00327 orig->data.size();
00328 nextbuf = findEmptyBuf(bufptr);
00329 nextbuf->data.clear();
00330 } while ( os::CAS(&active, orig, nextbuf ) == false );
00331 oro_atomic_dec( &orig->count );
00332 oro_atomic_dec( &orig->count );
00333 }
00334
00344 bool append( value_t item )
00345 {
00346 Item* orig=0;
00347 Storage bufptr;
00348 Item* usingbuf(0);
00349 do {
00350 if (orig) {
00351 oro_atomic_dec(&orig->count);
00352 oro_atomic_dec(&usingbuf->count);
00353 }
00354 orig = lockAndGetActive( bufptr );
00355 if ( orig->data.size() == orig->data.capacity() ) {
00356 oro_atomic_dec( &orig->count );
00357 return false;
00358 }
00359 usingbuf = findEmptyBuf( bufptr );
00360 usingbuf->data = orig->data;
00361 usingbuf->data.push_back( item );
00362 } while ( os::CAS(&active, orig, usingbuf ) ==false);
00363 oro_atomic_dec( &orig->count );
00364 oro_atomic_dec( &orig->count );
00365 return true;
00366 }
00367
00373 value_t front() const
00374 {
00375 Storage bufptr;
00376 Item* orig = lockAndGetActive(bufptr);
00377 value_t ret(orig->data.front());
00378 oro_atomic_dec( &orig->count );
00379 return ret;
00380 }
00381
00385 value_t back() const
00386 {
00387 Storage bufptr;
00388 Item* orig = lockAndGetActive(bufptr);
00389 value_t ret(orig->data.back());
00390 oro_atomic_dec( &orig->count );
00391 return ret;
00392 }
00393
00401 size_t append(const std::vector<T>& items)
00402 {
00403 Item* usingbuf(0);
00404 Item* orig=0;
00405 int towrite = items.size();
00406 Storage bufptr;
00407 do {
00408 if (orig) {
00409 oro_atomic_dec(&orig->count);
00410 oro_atomic_dec(&usingbuf->count);
00411 }
00412
00413 orig = lockAndGetActive( bufptr );
00414 int maxwrite = orig->data.capacity() - orig->data.size();
00415 if ( maxwrite == 0 ) {
00416 oro_atomic_dec( &orig->count );
00417 return 0;
00418 }
00419 if ( towrite > maxwrite )
00420 towrite = maxwrite;
00421 usingbuf = findEmptyBuf( bufptr );
00422 usingbuf->data = orig->data;
00423 usingbuf->data.insert( usingbuf->data.end(), items.begin(), items.begin() + towrite );
00424 } while ( os::CAS(&active, orig, usingbuf ) ==false );
00425 oro_atomic_dec( &orig->count );
00426 oro_atomic_dec( &orig->count );
00427 return towrite;
00428 }
00429
00430
00438 bool erase( value_t item )
00439 {
00440 Item* orig=0;
00441 Item* nextbuf(0);
00442 Storage bufptr;
00443 do {
00444 if (orig) {
00445 oro_atomic_dec(&orig->count);
00446 oro_atomic_dec(&nextbuf->count);
00447 }
00448 orig = lockAndGetActive( bufptr );
00449
00450 nextbuf = findEmptyBuf( bufptr );
00451 Iterator it( orig->data.begin() );
00452 while (it != orig->data.end() && !( *it == item ) ) {
00453 nextbuf->data.push_back( *it );
00454 ++it;
00455 }
00456 if ( it == orig->data.end() ) {
00457 oro_atomic_dec( &orig->count );
00458 oro_atomic_dec( &nextbuf->count );
00459 return false;
00460 }
00461 ++it;
00462 while ( it != orig->data.end() ) {
00463 nextbuf->data.push_back( *it );
00464 ++it;
00465 }
00466 } while ( os::CAS(&active, orig, nextbuf ) ==false );
00467 oro_atomic_dec( &orig->count );
00468 oro_atomic_dec( &orig->count );
00469 return true;
00470 }
00471
00479 template<typename Pred>
00480 bool delete_if(Pred pred)
00481 {
00482 Item* orig=0;
00483 Item* nextbuf(0);
00484 bool removed_sth = false;
00485 Storage bufptr;
00486 do {
00487 removed_sth = false;
00488 if (orig) {
00489 oro_atomic_dec(&orig->count);
00490 oro_atomic_dec(&nextbuf->count);
00491 }
00492 orig = lockAndGetActive( bufptr );
00493
00494 nextbuf = findEmptyBuf( bufptr );
00495
00496 Iterator it(orig->data.begin());
00497 while (it != orig->data.end()) {
00498 if (!pred(*it))
00499 nextbuf->data.push_back( *it );
00500 else
00501 removed_sth = true;
00502
00503 ++it;
00504 }
00505
00506 if (!removed_sth) {
00507 oro_atomic_dec( &orig->count );
00508 oro_atomic_dec( &nextbuf->count );
00509 return false;
00510 }
00511 } while ( os::CAS(&active, orig, nextbuf ) == false );
00512 oro_atomic_dec( &orig->count );
00513 oro_atomic_dec( &orig->count );
00514 return true;
00515 }
00516
00517
00523 template<class Function>
00524 void apply(Function func )
00525 {
00526 Storage st;
00527 Item* orig = lockAndGetActive(st);
00528 Iterator it( orig->data.begin() );
00529 while ( it != orig->data.end() ) {
00530 func( *it );
00531 ++it;
00532 }
00533 oro_atomic_dec( &orig->count );
00534 }
00535
00551 template<class Function>
00552 void apply_and_blank(Function func, value_t blank )
00553 {
00554 Storage st;
00555 Item* orig = lockAndGetActive(st);
00556 Item* newp = findEmptyBuf(st);
00557 Iterator it( orig->data.begin() );
00558
00559 while ( it != orig->data.end() ) {
00560 newp->data.push_back( *it );
00561 ++it;
00562 }
00563 blankp = newp;
00564 it = blankp->data.begin();
00565
00566 while ( it != blankp->data.end() ) {
00567
00568
00569 value_t a = *it;
00570 if ( !(a == blank) )
00571 func( a );
00572 ++it;
00573 }
00574 blankp = 0;
00575
00576 oro_atomic_dec( &orig->count );
00577 oro_atomic_dec( &newp->count );
00578 }
00579
00597 bool erase_and_blank(value_t item, value_t blank )
00598 {
00599 Storage st;
00600 bool res = this->erase(item);
00601 Item* orig = lockAndGetBlank(st);
00602 if (orig) {
00603 Iterator it( orig->data.begin() );
00604
00605 while ( *it != item ) {
00606 ++it;
00607 if (it == orig->data.end() ) {
00608 oro_atomic_dec( &orig->count );
00609 return res;
00610 }
00611 }
00612 (*it) = blank;
00613 oro_atomic_dec( &orig->count );
00614 }
00615 return res;
00616 }
00617
00625 template<class Function>
00626 value_t find_if( Function func, value_t blank = value_t() )
00627 {
00628 Storage st;
00629 Item* orig = lockAndGetActive(st);
00630 Iterator it( orig->data.begin() );
00631 while ( it != orig->data.end() ) {
00632 if (func( *it ) == true ) {
00633 oro_atomic_dec( &orig->count );
00634 return *it;
00635 }
00636 ++it;
00637 }
00638 oro_atomic_dec( &orig->count );
00639 return blank;
00640 }
00641 private:
00649 Item* findEmptyBuf(Storage& bufptr) {
00650
00651
00652 Item* start = &(*bufptr)[0];
00653 while( true ) {
00654 if ( oro_atomic_inc_and_test( &start->count ) )
00655 break;
00656 oro_atomic_dec( &start->count );
00657 ++start;
00658 if (start == &(*bufptr)[0] + BufNum() )
00659 start = &(*bufptr)[0];
00660 }
00661 assert( pointsTo(start, bufptr) );
00662 start->data.clear();
00663 return start;
00664 }
00665
00670 Item* lockAndGetActive(Storage& bufptr) const {
00671
00672
00673 Item* orig=0;
00674 do {
00675 if (orig)
00676 oro_atomic_dec( &orig->count );
00677 bufptr = bufs;
00678 orig = active;
00679
00680 if ( pointsTo(orig, bufptr) )
00681 oro_atomic_inc( &orig->count );
00682 else {
00683 orig = 0;
00684 }
00685
00686
00687
00688 } while ( active != orig );
00689 assert( pointsTo(orig, bufptr) );
00690 return orig;
00691 }
00692
00698 Item* lockAndGetActive() const {
00699
00700 Item* orig=0;
00701 do {
00702 if (orig)
00703 oro_atomic_dec( &orig->count );
00704 orig = active;
00705 oro_atomic_inc( &orig->count );
00706
00707
00708
00709 } while ( active != orig );
00710 return orig;
00711 }
00712
00716 Item* lockAndGetBlank(Storage& bufptr) const {
00717 Item* orig=0;
00718 do {
00719 if (orig)
00720 oro_atomic_dec( &orig->count );
00721 bufptr = bufs;
00722 orig = blankp;
00723 if (orig == 0)
00724 return 0;
00725
00726 if ( pointsTo(orig, bufptr) )
00727 oro_atomic_inc( &orig->count );
00728 else {
00729 orig = 0;
00730 }
00731
00732
00733
00734 } while ( blankp != orig );
00735 assert( pointsTo(orig, bufptr) );
00736 return orig;
00737 }
00738
00739 inline bool pointsTo( Item* p, const Storage& bf ) const {
00740 return p >= &(*bf)[0] && p <= &(*bf)[ BufNum() - 1 ];
00741 }
00742
00743 };
00744 }
00745 }
00746
00747 #endif