00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #ifndef ORO_LIST_LOCK_FREE_HPP
00040 #define ORO_LIST_LOCK_FREE_HPP
00041
00042 #include <vector>
00043 #include "../os/oro_arch.h"
00044 #include "../os/CAS.hpp"
00045 #include <boost/intrusive_ptr.hpp>
00046 #include "../rtt-config.h"
00047
00048 #ifdef ORO_PRAGMA_INTERFACE
00049 #pragma interface
00050 #endif
00051
00052 namespace RTT
00053 {
00054 namespace internal {
00055 struct RTT_API IntrusiveStorage
00056 {
00057 oro_atomic_t ref;
00058 IntrusiveStorage();
00059 virtual ~IntrusiveStorage();
00060 };
00061
00062 void RTT_API intrusive_ptr_add_ref(RTT::internal::IntrusiveStorage* p );
00063 void RTT_API intrusive_ptr_release(RTT::internal::IntrusiveStorage* p );
00064
00065 }
00066 }
00067
00068
00069 namespace RTT
00070 { namespace internal {
00071
00084 template< class T>
00085 class ListLockFree
00086 {
00087 public:
00093 const unsigned int MAX_THREADS;
00094
00095 typedef T value_t;
00096 private:
00097 typedef std::vector<value_t> BufferType;
00098 typedef typename BufferType::iterator Iterator;
00099 typedef typename BufferType::const_iterator CIterator;
00100 struct Item {
00101 Item() {
00102
00103 oro_atomic_set(&count,-1);
00104 }
00105 mutable oro_atomic_t count;
00106 BufferType data;
00107 };
00108
00109 struct StorageImpl : public IntrusiveStorage
00110 {
00111 Item* items;
00112 StorageImpl(size_t alloc) : items( new Item[alloc] ) {
00113 }
00114 ~StorageImpl() {
00115 delete[] items;
00116 }
00117 Item& operator[](int i) {
00118 return items[i];
00119 }
00120 };
00121
00126 typedef boost::intrusive_ptr<StorageImpl> Storage;
00127
00128 Storage newStorage(size_t alloc, size_t items, bool init = true)
00129 {
00130 Storage st( new StorageImpl(alloc) );
00131 for (unsigned int i=0; i < alloc; ++i) {
00132 (*st)[i].data.reserve( items );
00133 }
00134
00135 if (init) {
00136 active = &(*st)[0];
00137 oro_atomic_inc( &active->count );
00138 }
00139
00140 return st;
00141 }
00142
00143 Storage bufs;
00144 Item* volatile active;
00145 Item* volatile blankp;
00146
00147
00148
00149
00150 inline size_t BufNum() const {
00151 return MAX_THREADS * 2;
00152 }
00153
00154 size_t required;
00155 public:
00164 ListLockFree(unsigned int lsize, unsigned int threads = ORONUM_OS_MAX_THREADS )
00165 : MAX_THREADS( threads ), blankp(0), required(lsize)
00166 {
00167 const unsigned int BUF_NUM = BufNum();
00168 bufs = newStorage( BUF_NUM, lsize );
00169 }
00170
00171 ~ListLockFree() {
00172 }
00173
00178 size_t capacity() const
00179 {
00180 size_t res;
00181 Storage st;
00182 Item* orig = lockAndGetActive(st);
00183 res = orig->data.capacity();
00184 oro_atomic_dec( &orig->count );
00185 return res;
00186 }
00187
00192 size_t size() const
00193 {
00194 size_t res;
00195 Storage st;
00196 Item* orig = lockAndGetActive(st);
00197 res = orig->data.size();
00198 oro_atomic_dec( &orig->count );
00199 return res;
00200 }
00201
00206 bool empty() const
00207 {
00208 bool res;
00209 Storage st;
00210 Item* orig = lockAndGetActive(st);
00211 res = orig->data.empty();
00212 oro_atomic_dec( &orig->count );
00213 return res;
00214 }
00215
00224 void grow(size_t items = 1) {
00225 required += items;
00226 if (required > this->capacity()) {
00227 this->reserve( required*2 );
00228 }
00229 }
00238 void shrink(size_t items = 1) {
00239 required -= items;
00240 }
00241
00252 void reserve(size_t lsize)
00253 {
00254 if (lsize <= this->capacity() )
00255 return;
00256
00257 const unsigned int BUF_NUM = BufNum();
00258 Storage res( newStorage(BUF_NUM, lsize, false) );
00259
00260
00261 Item* nextbuf = &(*res)[0];
00262 oro_atomic_inc( &nextbuf->count );
00263
00264
00265 Item* orig = 0;
00266
00267
00268
00269 Storage save = bufs;
00270
00271
00272
00273
00274
00275 bufs = res;
00276
00277
00278
00279
00280
00281
00282
00283
00284
00285
00286
00287 do {
00288 if (orig)
00289 oro_atomic_dec(&orig->count);
00290 orig = lockAndGetActive();
00291 nextbuf->data.clear();
00292 Iterator it( orig->data.begin() );
00293 while ( it != orig->data.end() ) {
00294 nextbuf->data.push_back( *it );
00295 ++it;
00296 }
00297
00298
00299
00300
00301 } while ( os::CAS(&active, orig, nextbuf ) == false);
00302
00303
00304 assert( pointsTo( active, bufs ) );
00305
00306 oro_atomic_dec( &orig->count );
00307 oro_atomic_dec( &orig->count );
00308 }
00309
00316 void clear()
00317 {
00318 Storage bufptr;
00319 Item* orig(0);
00320 Item* nextbuf(0);
00321 int items = 0;
00322 do {
00323 if (orig) {
00324 oro_atomic_dec(&orig->count);
00325 oro_atomic_dec(&nextbuf->count);
00326 }
00327 orig = lockAndGetActive(bufptr);
00328 items = orig->data.size();
00329 nextbuf = findEmptyBuf(bufptr);
00330 nextbuf->data.clear();
00331 } while ( os::CAS(&active, orig, nextbuf ) == false );
00332 oro_atomic_dec( &orig->count );
00333 oro_atomic_dec( &orig->count );
00334 }
00335
00345 bool append( value_t item )
00346 {
00347 Item* orig=0;
00348 Storage bufptr;
00349 Item* usingbuf(0);
00350 do {
00351 if (orig) {
00352 oro_atomic_dec(&orig->count);
00353 oro_atomic_dec(&usingbuf->count);
00354 }
00355 orig = lockAndGetActive( bufptr );
00356 if ( orig->data.size() == orig->data.capacity() ) {
00357 oro_atomic_dec( &orig->count );
00358 return false;
00359 }
00360 usingbuf = findEmptyBuf( bufptr );
00361 usingbuf->data = orig->data;
00362 usingbuf->data.push_back( item );
00363 } while ( os::CAS(&active, orig, usingbuf ) ==false);
00364 oro_atomic_dec( &orig->count );
00365 oro_atomic_dec( &orig->count );
00366 return true;
00367 }
00368
00374 value_t front() const
00375 {
00376 Storage bufptr;
00377 Item* orig = lockAndGetActive(bufptr);
00378 value_t ret(orig->data.front());
00379 oro_atomic_dec( &orig->count );
00380 return ret;
00381 }
00382
00386 value_t back() const
00387 {
00388 Storage bufptr;
00389 Item* orig = lockAndGetActive(bufptr);
00390 value_t ret(orig->data.back());
00391 oro_atomic_dec( &orig->count );
00392 return ret;
00393 }
00394
00402 size_t append(const std::vector<T>& items)
00403 {
00404 Item* usingbuf(0);
00405 Item* orig=0;
00406 int towrite = items.size();
00407 Storage bufptr;
00408 do {
00409 if (orig) {
00410 oro_atomic_dec(&orig->count);
00411 oro_atomic_dec(&usingbuf->count);
00412 }
00413
00414 orig = lockAndGetActive( bufptr );
00415 int maxwrite = orig->data.capacity() - orig->data.size();
00416 if ( maxwrite == 0 ) {
00417 oro_atomic_dec( &orig->count );
00418 return 0;
00419 }
00420 if ( towrite > maxwrite )
00421 towrite = maxwrite;
00422 usingbuf = findEmptyBuf( bufptr );
00423 usingbuf->data = orig->data;
00424 usingbuf->data.insert( usingbuf->data.end(), items.begin(), items.begin() + towrite );
00425 } while ( os::CAS(&active, orig, usingbuf ) ==false );
00426 oro_atomic_dec( &orig->count );
00427 oro_atomic_dec( &orig->count );
00428 return towrite;
00429 }
00430
00431
00439 bool erase( value_t item )
00440 {
00441 Item* orig=0;
00442 Item* nextbuf(0);
00443 Storage bufptr;
00444 do {
00445 if (orig) {
00446 oro_atomic_dec(&orig->count);
00447 oro_atomic_dec(&nextbuf->count);
00448 }
00449 orig = lockAndGetActive( bufptr );
00450
00451 nextbuf = findEmptyBuf( bufptr );
00452 Iterator it( orig->data.begin() );
00453 while (it != orig->data.end() && !( *it == item ) ) {
00454 nextbuf->data.push_back( *it );
00455 ++it;
00456 }
00457 if ( it == orig->data.end() ) {
00458 oro_atomic_dec( &orig->count );
00459 oro_atomic_dec( &nextbuf->count );
00460 return false;
00461 }
00462 ++it;
00463 while ( it != orig->data.end() ) {
00464 nextbuf->data.push_back( *it );
00465 ++it;
00466 }
00467 } while ( os::CAS(&active, orig, nextbuf ) ==false );
00468 oro_atomic_dec( &orig->count );
00469 oro_atomic_dec( &orig->count );
00470 return true;
00471 }
00472
00480 template<typename Pred>
00481 bool delete_if(Pred pred)
00482 {
00483 Item* orig=0;
00484 Item* nextbuf(0);
00485 bool removed_sth = false;
00486 Storage bufptr;
00487 do {
00488 removed_sth = false;
00489 if (orig) {
00490 oro_atomic_dec(&orig->count);
00491 oro_atomic_dec(&nextbuf->count);
00492 }
00493 orig = lockAndGetActive( bufptr );
00494
00495 nextbuf = findEmptyBuf( bufptr );
00496
00497 Iterator it(orig->data.begin());
00498 while (it != orig->data.end()) {
00499 if (!pred(*it))
00500 nextbuf->data.push_back( *it );
00501 else
00502 removed_sth = true;
00503
00504 ++it;
00505 }
00506
00507 if (!removed_sth) {
00508 oro_atomic_dec( &orig->count );
00509 oro_atomic_dec( &nextbuf->count );
00510 return false;
00511 }
00512 } while ( os::CAS(&active, orig, nextbuf ) == false );
00513 oro_atomic_dec( &orig->count );
00514 oro_atomic_dec( &orig->count );
00515 return true;
00516 }
00517
00518
00524 template<class Function>
00525 void apply(Function func )
00526 {
00527 Storage st;
00528 Item* orig = lockAndGetActive(st);
00529 Iterator it( orig->data.begin() );
00530 while ( it != orig->data.end() ) {
00531 func( *it );
00532 ++it;
00533 }
00534 oro_atomic_dec( &orig->count );
00535 }
00536
00552 template<class Function>
00553 void apply_and_blank(Function func, value_t blank )
00554 {
00555 Storage st;
00556 Item* orig = lockAndGetActive(st);
00557 Item* newp = findEmptyBuf(st);
00558 Iterator it( orig->data.begin() );
00559
00560 while ( it != orig->data.end() ) {
00561 newp->data.push_back( *it );
00562 ++it;
00563 }
00564 blankp = newp;
00565 it = blankp->data.begin();
00566
00567 while ( it != blankp->data.end() ) {
00568
00569
00570 value_t a = *it;
00571 if ( !(a == blank) )
00572 func( a );
00573 ++it;
00574 }
00575 blankp = 0;
00576
00577 oro_atomic_dec( &orig->count );
00578 oro_atomic_dec( &newp->count );
00579 }
00580
00598 bool erase_and_blank(value_t item, value_t blank )
00599 {
00600 Storage st;
00601 bool res = this->erase(item);
00602 Item* orig = lockAndGetBlank(st);
00603 if (orig) {
00604 Iterator it( orig->data.begin() );
00605
00606 while ( *it != item ) {
00607 ++it;
00608 if (it == orig->data.end() ) {
00609 oro_atomic_dec( &orig->count );
00610 return res;
00611 }
00612 }
00613 (*it) = blank;
00614 oro_atomic_dec( &orig->count );
00615 }
00616 return res;
00617 }
00618
00626 template<class Function>
00627 value_t find_if( Function func, value_t blank = value_t() )
00628 {
00629 Storage st;
00630 Item* orig = lockAndGetActive(st);
00631 Iterator it( orig->data.begin() );
00632 while ( it != orig->data.end() ) {
00633 if (func( *it ) == true ) {
00634 oro_atomic_dec( &orig->count );
00635 return *it;
00636 }
00637 ++it;
00638 }
00639 oro_atomic_dec( &orig->count );
00640 return blank;
00641 }
00642 private:
00650 Item* findEmptyBuf(Storage& bufptr) {
00651
00652
00653 Item* start = &(*bufptr)[0];
00654 while( true ) {
00655 if ( oro_atomic_inc_and_test( &start->count ) )
00656 break;
00657 oro_atomic_dec( &start->count );
00658 ++start;
00659 if (start == &(*bufptr)[0] + BufNum() )
00660 start = &(*bufptr)[0];
00661 }
00662 assert( pointsTo(start, bufptr) );
00663 start->data.clear();
00664 return start;
00665 }
00666
00671 Item* lockAndGetActive(Storage& bufptr) const {
00672
00673
00674 Item* orig=0;
00675 do {
00676 if (orig)
00677 oro_atomic_dec( &orig->count );
00678 bufptr = bufs;
00679 orig = active;
00680
00681 if ( pointsTo(orig, bufptr) )
00682 oro_atomic_inc( &orig->count );
00683 else {
00684 orig = 0;
00685 }
00686
00687
00688
00689 } while ( active != orig );
00690 assert( pointsTo(orig, bufptr) );
00691 return orig;
00692 }
00693
00699 Item* lockAndGetActive() const {
00700
00701 Item* orig=0;
00702 do {
00703 if (orig)
00704 oro_atomic_dec( &orig->count );
00705 orig = active;
00706 oro_atomic_inc( &orig->count );
00707
00708
00709
00710 } while ( active != orig );
00711 return orig;
00712 }
00713
00717 Item* lockAndGetBlank(Storage& bufptr) const {
00718 Item* orig=0;
00719 do {
00720 if (orig)
00721 oro_atomic_dec( &orig->count );
00722 bufptr = bufs;
00723 orig = blankp;
00724 if (orig == 0)
00725 return 0;
00726
00727 if ( pointsTo(orig, bufptr) )
00728 oro_atomic_inc( &orig->count );
00729 else {
00730 orig = 0;
00731 }
00732
00733
00734
00735 } while ( blankp != orig );
00736 assert( pointsTo(orig, bufptr) );
00737 return orig;
00738 }
00739
00740 inline bool pointsTo( Item* p, const Storage& bf ) const {
00741 return p >= &(*bf)[0] && p <= &(*bf)[ BufNum() - 1 ];
00742 }
00743
00744 };
00745 }
00746 }
00747
00748 #endif