$search
00001 /*************************************************************************** 00002 tag: Peter Soetens Wed Jan 18 14:11:39 CET 2006 ListLockFree.hpp 00003 00004 ListLockFree.hpp - description 00005 ------------------- 00006 begin : Wed January 18 2006 00007 copyright : (C) 2006 Peter Soetens 00008 email : peter.soetens@mech.kuleuven.be 00009 00010 *************************************************************************** 00011 * This library is free software; you can redistribute it and/or * 00012 * modify it under the terms of the GNU General Public * 00013 * License as published by the Free Software Foundation; * 00014 * version 2 of the License. * 00015 * * 00016 * As a special exception, you may use this file as part of a free * 00017 * software library without restriction. Specifically, if other files * 00018 * instantiate templates or use macros or inline functions from this * 00019 * file, or you compile this file and link it with other files to * 00020 * produce an executable, this file does not by itself cause the * 00021 * resulting executable to be covered by the GNU General Public * 00022 * License. This exception does not however invalidate any other * 00023 * reasons why the executable file might be covered by the GNU General * 00024 * Public License. * 00025 * * 00026 * This library is distributed in the hope that it will be useful, * 00027 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 00028 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * 00029 * Lesser General Public License for more details. * 00030 * * 00031 * You should have received a copy of the GNU General Public * 00032 * License along with this library; if not, write to the Free Software * 00033 * Foundation, Inc., 59 Temple Place, * 00034 * Suite 330, Boston, MA 02111-1307 USA * 00035 * * 00036 ***************************************************************************/ 00037 00038 00039 #ifndef ORO_LIST_LOCK_FREE_HPP 00040 #define ORO_LIST_LOCK_FREE_HPP 00041 00042 #include <vector> 00043 #include "../os/oro_arch.h" 00044 #include "../os/CAS.hpp" 00045 #include <boost/intrusive_ptr.hpp> 00046 #include "../rtt-config.h" 00047 00048 #ifdef ORO_PRAGMA_INTERFACE 00049 #pragma interface 00050 #endif 00051 00052 namespace RTT 00053 { 00054 namespace internal { 00055 struct RTT_API IntrusiveStorage 00056 { 00057 oro_atomic_t ref; 00058 IntrusiveStorage(); 00059 virtual ~IntrusiveStorage(); 00060 }; 00061 00062 void RTT_API intrusive_ptr_add_ref(RTT::internal::IntrusiveStorage* p ); 00063 void RTT_API intrusive_ptr_release(RTT::internal::IntrusiveStorage* p ); 00064 00065 } 00066 } 00067 00068 00069 namespace RTT 00070 { namespace internal { 00071 00084 template< class T> 00085 class ListLockFree 00086 { 00087 public: 00093 const unsigned int MAX_THREADS; 00094 00095 typedef T value_t; 00096 private: 00097 typedef std::vector<value_t> BufferType; 00098 typedef typename BufferType::iterator Iterator; 00099 typedef typename BufferType::const_iterator CIterator; 00100 struct Item { 00101 Item() { 00102 //ORO_ATOMIC_INIT(count); 00103 oro_atomic_set(&count,-1); 00104 } 00105 mutable oro_atomic_t count; // refcount 00106 BufferType data; 00107 }; 00108 00109 struct StorageImpl : public IntrusiveStorage 00110 { 00111 Item* items; 00112 StorageImpl(size_t alloc) : items( new Item[alloc] ) { 00113 } 00114 ~StorageImpl() { 00115 delete[] items; 00116 } 00117 Item& operator[](int i) { 00118 return items[i]; 00119 } 00120 }; 00121 00126 typedef boost::intrusive_ptr<StorageImpl> Storage; 00127 00128 Storage newStorage(size_t alloc, size_t items, bool init = true) 00129 { 00130 Storage st( new StorageImpl(alloc) ); 00131 for (unsigned int i=0; i < alloc; ++i) { 00132 (*st)[i].data.reserve( items ); // pre-allocate 00133 } 00134 // bootstrap the first list : 00135 if (init) { 00136 active = &(*st)[0]; 00137 oro_atomic_inc( &active->count ); 00138 } 00139 00140 return st; 00141 } 00142 00143 Storage bufs; 00144 Item* volatile active; 00145 Item* volatile blankp; 00146 00147 // each thread has one 'working' buffer, and one 'active' buffer 00148 // lock. Thus we require to allocate twice as much buffers as threads, 00149 // for all the locks to succeed in a worst case scenario. 00150 inline size_t BufNum() const { 00151 return MAX_THREADS * 2; 00152 } 00153 00154 size_t required; 00155 public: 00164 ListLockFree(unsigned int lsize, unsigned int threads = ORONUM_OS_MAX_THREADS ) 00165 : MAX_THREADS( threads ), blankp(0), required(lsize) 00166 { 00167 const unsigned int BUF_NUM = BufNum(); 00168 bufs = newStorage( BUF_NUM, lsize ); 00169 } 00170 00171 ~ListLockFree() { 00172 } 00173 00178 size_t capacity() const 00179 { 00180 size_t res; 00181 Storage st; 00182 Item* orig = lockAndGetActive(st); 00183 res = orig->data.capacity(); 00184 oro_atomic_dec( &orig->count ); // lockAndGetActive 00185 return res; 00186 } 00187 00192 size_t size() const 00193 { 00194 size_t res; 00195 Storage st; 00196 Item* orig = lockAndGetActive(st); 00197 res = orig->data.size(); 00198 oro_atomic_dec( &orig->count ); // lockAndGetActive 00199 return res; 00200 } 00201 00206 bool empty() const 00207 { 00208 bool res; 00209 Storage st; 00210 Item* orig = lockAndGetActive(st); 00211 res = orig->data.empty(); 00212 oro_atomic_dec( &orig->count ); // lockAndGetActive 00213 return res; 00214 } 00215 00224 void grow(size_t items = 1) { 00225 required += items; 00226 if (required > this->capacity()) { 00227 this->reserve( required*2 ); 00228 } 00229 } 00238 void shrink(size_t items = 1) { 00239 required -= items; 00240 } 00241 00252 void reserve(size_t lsize) 00253 { 00254 if (lsize <= this->capacity() ) 00255 return; 00256 00257 const unsigned int BUF_NUM = BufNum(); 00258 Storage res( newStorage(BUF_NUM, lsize, false) ); 00259 00260 // init the future 'active' buffer. 00261 Item* nextbuf = &(*res)[0]; 00262 oro_atomic_inc( &nextbuf->count ); 00263 00264 // temporary for current active buffer. 00265 Item* orig = 0; 00266 00267 // prevent current bufs from deletion. 00268 // will free upon return. 00269 Storage save = bufs; 00270 // active points at old, bufs points at new: 00271 // first the refcount is added to res, then 00272 // bufs' pointer is switched to res' pointer, 00273 // and stored in a temporary. Then the temp 00274 // is destructed and decrements bufs' old reference. 00275 bufs = res; 00276 // from now on, any findEmptyBuf will use the new bufs, 00277 // unless the algorithm was entered before the switch. 00278 // then, it will write the result to the old buf. 00279 // if it detects we updated active, it will find an 00280 // empty buf in the new buf. If it gets there before 00281 // our CAS, our CAS will fail and we try to recopy 00282 // everything. This retry may be unnessessary 00283 // if the data already is in the new buf, but for this 00284 // cornercase, we must be sure. 00285 00286 // copy active into new: 00287 do { 00288 if (orig) 00289 oro_atomic_dec(&orig->count); 00290 orig = lockAndGetActive(); // active is guaranteed to point in valid buffer ( save or bufs ) 00291 nextbuf->data.clear(); 00292 Iterator it( orig->data.begin() ); 00293 while ( it != orig->data.end() ) { 00294 nextbuf->data.push_back( *it ); 00295 ++it; 00296 } 00297 // see explanation above: active could have changed, 00298 // and still point in old buffer. we could check this 00299 // with pointer arithmetics, but this is not a performant 00300 // method. 00301 } while ( os::CAS(&active, orig, nextbuf ) == false); 00302 // now, 00303 // active is guaranteed to point into bufs. 00304 assert( pointsTo( active, bufs ) ); 00305 00306 oro_atomic_dec( &orig->count ); // lockAndGetActive 00307 oro_atomic_dec( &orig->count ); // ref count 00308 } 00309 00316 void clear() 00317 { 00318 Storage bufptr; 00319 Item* orig(0); 00320 Item* nextbuf(0); 00321 do { 00322 if (orig) { 00323 oro_atomic_dec(&orig->count); 00324 oro_atomic_dec(&nextbuf->count); 00325 } 00326 orig = lockAndGetActive(bufptr); 00327 orig->data.size(); 00328 nextbuf = findEmptyBuf(bufptr); // find unused Item in bufs 00329 nextbuf->data.clear(); 00330 } while ( os::CAS(&active, orig, nextbuf ) == false ); 00331 oro_atomic_dec( &orig->count ); // lockAndGetActive 00332 oro_atomic_dec( &orig->count ); // ref count 00333 } 00334 00344 bool append( value_t item ) 00345 { 00346 Item* orig=0; 00347 Storage bufptr; 00348 Item* usingbuf(0); 00349 do { 00350 if (orig) { 00351 oro_atomic_dec(&orig->count); 00352 oro_atomic_dec(&usingbuf->count); 00353 } 00354 orig = lockAndGetActive( bufptr ); 00355 if ( orig->data.size() == orig->data.capacity() ) { // check for full 00356 oro_atomic_dec( &orig->count ); 00357 return false; 00358 } 00359 usingbuf = findEmptyBuf( bufptr ); // find unused Item in bufs 00360 usingbuf->data = orig->data; 00361 usingbuf->data.push_back( item ); 00362 } while ( os::CAS(&active, orig, usingbuf ) ==false); 00363 oro_atomic_dec( &orig->count ); // lockAndGetActive() 00364 oro_atomic_dec( &orig->count ); // set list free 00365 return true; 00366 } 00367 00373 value_t front() const 00374 { 00375 Storage bufptr; 00376 Item* orig = lockAndGetActive(bufptr); 00377 value_t ret(orig->data.front()); 00378 oro_atomic_dec( &orig->count ); //lockAndGetActive 00379 return ret; 00380 } 00381 00385 value_t back() const 00386 { 00387 Storage bufptr; 00388 Item* orig = lockAndGetActive(bufptr); 00389 value_t ret(orig->data.back()); 00390 oro_atomic_dec( &orig->count ); //lockAndGetActive 00391 return ret; 00392 } 00393 00401 size_t append(const std::vector<T>& items) 00402 { 00403 Item* usingbuf(0); 00404 Item* orig=0; 00405 int towrite = items.size(); 00406 Storage bufptr; 00407 do { 00408 if (orig) { 00409 oro_atomic_dec(&orig->count); 00410 oro_atomic_dec(&usingbuf->count); 00411 } 00412 00413 orig = lockAndGetActive( bufptr ); 00414 int maxwrite = orig->data.capacity() - orig->data.size(); 00415 if ( maxwrite == 0 ) { 00416 oro_atomic_dec( &orig->count ); // lockAndGetActive() 00417 return 0; 00418 } 00419 if ( towrite > maxwrite ) 00420 towrite = maxwrite; 00421 usingbuf = findEmptyBuf( bufptr ); // find unused Item in bufs 00422 usingbuf->data = orig->data; 00423 usingbuf->data.insert( usingbuf->data.end(), items.begin(), items.begin() + towrite ); 00424 } while ( os::CAS(&active, orig, usingbuf ) ==false ); 00425 oro_atomic_dec( &orig->count ); // lockAndGetActive() 00426 oro_atomic_dec( &orig->count ); // set list free 00427 return towrite; 00428 } 00429 00430 00438 bool erase( value_t item ) 00439 { 00440 Item* orig=0; 00441 Item* nextbuf(0); 00442 Storage bufptr; 00443 do { 00444 if (orig) { 00445 oro_atomic_dec(&orig->count); 00446 oro_atomic_dec(&nextbuf->count); 00447 } 00448 orig = lockAndGetActive( bufptr ); // find active in bufptr 00449 // we do this in the loop because bufs can change. 00450 nextbuf = findEmptyBuf( bufptr ); // find unused Item in same buf. 00451 Iterator it( orig->data.begin() ); 00452 while (it != orig->data.end() && !( *it == item ) ) { 00453 nextbuf->data.push_back( *it ); 00454 ++it; 00455 } 00456 if ( it == orig->data.end() ) { 00457 oro_atomic_dec( &orig->count ); 00458 oro_atomic_dec( &nextbuf->count ); 00459 return false; // item not found. 00460 } 00461 ++it; // skip item. 00462 while ( it != orig->data.end() ) { 00463 nextbuf->data.push_back( *it ); 00464 ++it; 00465 } 00466 } while ( os::CAS(&active, orig, nextbuf ) ==false ); 00467 oro_atomic_dec( &orig->count ); // lockAndGetActive 00468 oro_atomic_dec( &orig->count ); // ref count 00469 return true; 00470 } 00471 00479 template<typename Pred> 00480 bool delete_if(Pred pred) 00481 { 00482 Item* orig=0; 00483 Item* nextbuf(0); 00484 bool removed_sth = false; 00485 Storage bufptr; 00486 do { 00487 removed_sth = false; 00488 if (orig) { 00489 oro_atomic_dec(&orig->count); 00490 oro_atomic_dec(&nextbuf->count); 00491 } 00492 orig = lockAndGetActive( bufptr ); // find active in bufptr 00493 // we do this in the loop because bufs can change. 00494 nextbuf = findEmptyBuf( bufptr ); // find unused Item in same buf. 00495 00496 Iterator it(orig->data.begin()); 00497 while (it != orig->data.end()) { 00498 if (!pred(*it)) 00499 nextbuf->data.push_back( *it ); 00500 else 00501 removed_sth = true; 00502 00503 ++it; 00504 } 00505 00506 if (!removed_sth) { 00507 oro_atomic_dec( &orig->count ); 00508 oro_atomic_dec( &nextbuf->count ); 00509 return false; // no matching item found. 00510 } 00511 } while ( os::CAS(&active, orig, nextbuf ) == false ); 00512 oro_atomic_dec( &orig->count ); // lockAndGetActive 00513 oro_atomic_dec( &orig->count ); // ref count 00514 return true; 00515 } 00516 00517 00523 template<class Function> 00524 void apply(Function func ) 00525 { 00526 Storage st; 00527 Item* orig = lockAndGetActive(st); 00528 Iterator it( orig->data.begin() ); 00529 while ( it != orig->data.end() ) { 00530 func( *it ); 00531 ++it; 00532 } 00533 oro_atomic_dec( &orig->count ); //lockAndGetActive 00534 } 00535 00551 template<class Function> 00552 void apply_and_blank(Function func, value_t blank ) 00553 { 00554 Storage st; 00555 Item* orig = lockAndGetActive(st); 00556 Item* newp = findEmptyBuf(st); 00557 Iterator it( orig->data.begin() ); 00558 // first copy the whole list. 00559 while ( it != orig->data.end() ) { 00560 newp->data.push_back( *it ); 00561 ++it; 00562 } 00563 blankp = newp; 00564 it = blankp->data.begin(); 00565 // iterate over copy and skip blanks. 00566 while ( it != blankp->data.end() ) { 00567 // XXX Race condition: 'it' can be blanked after 00568 // comparison or even during func. 00569 value_t a = *it; 00570 if ( !(a == blank) ) 00571 func( a ); 00572 ++it; 00573 } 00574 blankp = 0; 00575 00576 oro_atomic_dec( &orig->count ); //lockAndGetActive 00577 oro_atomic_dec( &newp->count ); //findEmptyBuf 00578 } 00579 00597 bool erase_and_blank(value_t item, value_t blank ) 00598 { 00599 Storage st; 00600 bool res = this->erase(item); 00601 Item* orig = lockAndGetBlank(st); 00602 if (orig) { 00603 Iterator it( orig->data.begin() ); 00604 // item may still not be present in the blank-list. 00605 while ( *it != item ) { 00606 ++it; 00607 if (it == orig->data.end() ) { 00608 oro_atomic_dec( &orig->count ); //lockAndGetBlank 00609 return res; 00610 } 00611 } 00612 (*it) = blank; 00613 oro_atomic_dec( &orig->count ); //lockAndGetBlank 00614 } 00615 return res; 00616 } 00617 00625 template<class Function> 00626 value_t find_if( Function func, value_t blank = value_t() ) 00627 { 00628 Storage st; 00629 Item* orig = lockAndGetActive(st); 00630 Iterator it( orig->data.begin() ); 00631 while ( it != orig->data.end() ) { 00632 if (func( *it ) == true ) { 00633 oro_atomic_dec( &orig->count ); //lockAndGetActive 00634 return *it; 00635 } 00636 ++it; 00637 } 00638 oro_atomic_dec( &orig->count ); //lockAndGetActive 00639 return blank; 00640 } 00641 private: 00649 Item* findEmptyBuf(Storage& bufptr) { 00650 // These two functions are copy/pasted from BufferLockFree. 00651 // If MAX_THREADS is large enough, this will always succeed : 00652 Item* start = &(*bufptr)[0]; 00653 while( true ) { 00654 if ( oro_atomic_inc_and_test( &start->count ) ) 00655 break; 00656 oro_atomic_dec( &start->count ); 00657 ++start; 00658 if (start == &(*bufptr)[0] + BufNum() ) 00659 start = &(*bufptr)[0]; // in case of races, rewind 00660 } 00661 assert( pointsTo(start, bufptr) ); 00662 start->data.clear(); // this calls the destructors of T. 00663 return start; // unique pointer across all threads 00664 } 00665 00670 Item* lockAndGetActive(Storage& bufptr) const { 00671 // This is a kind-of smart-pointer implementation 00672 // We could move it into Item itself and overload operator= 00673 Item* orig=0; 00674 do { 00675 if (orig) 00676 oro_atomic_dec( &orig->count ); 00677 bufptr = bufs; 00678 orig = active; 00679 // also check that orig points into bufptr. 00680 if ( pointsTo(orig, bufptr) ) 00681 oro_atomic_inc( &orig->count ); 00682 else { 00683 orig = 0; 00684 } 00685 // this synchronisation point is 'aggressive' (a _sufficient_ condition) 00686 // if active is still equal to orig, the increase of orig->count is 00687 // surely valid, since no contention (change of active) occured. 00688 } while ( active != orig ); 00689 assert( pointsTo(orig, bufptr) ); 00690 return orig; 00691 } 00692 00698 Item* lockAndGetActive() const { 00699 // only operates on active's refcount. 00700 Item* orig=0; 00701 do { 00702 if (orig) 00703 oro_atomic_dec( &orig->count ); 00704 orig = active; 00705 oro_atomic_inc( &orig->count ); 00706 // this synchronisation point is 'aggressive' (a _sufficient_ condition) 00707 // if active is still equal to orig, the increase of orig->count is 00708 // surely valid, since no contention (change of active) occured. 00709 } while ( active != orig ); 00710 return orig; 00711 } 00712 00716 Item* lockAndGetBlank(Storage& bufptr) const { 00717 Item* orig=0; 00718 do { 00719 if (orig) 00720 oro_atomic_dec( &orig->count ); 00721 bufptr = bufs; 00722 orig = blankp; 00723 if (orig == 0) 00724 return 0; // no blankp. 00725 // also check that orig points into bufptr. 00726 if ( pointsTo(orig, bufptr) ) 00727 oro_atomic_inc( &orig->count ); 00728 else { 00729 orig = 0; 00730 } 00731 // this synchronisation point is 'aggressive' (a _sufficient_ condition) 00732 // if active is still equal to orig, the increase of orig->count is 00733 // surely valid, since no contention (change of active) occured. 00734 } while ( blankp != orig ); 00735 assert( pointsTo(orig, bufptr) ); 00736 return orig; 00737 } 00738 00739 inline bool pointsTo( Item* p, const Storage& bf ) const { 00740 return p >= &(*bf)[0] && p <= &(*bf)[ BufNum() - 1 ]; 00741 } 00742 00743 }; 00744 } 00745 } 00746 00747 #endif