multi_queue.h
Go to the documentation of this file.
00001 // 
00002 // Copyright (c) 2010-2012, Benjamin Kaufmann
00003 // 
00004 // This file is part of Clasp. See http://www.cs.uni-potsdam.de/clasp/ 
00005 // 
00006 // Clasp is free software; you can redistribute it and/or modify
00007 // it under the terms of the GNU General Public License as published by
00008 // the Free Software Foundation; either version 2 of the License, or
00009 // (at your option) any later version.
00010 // 
00011 // Clasp is distributed in the hope that it will be useful,
00012 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00013 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014 // GNU General Public License for more details.
00015 // 
00016 // You should have received a copy of the GNU General Public License
00017 // along with Clasp; if not, write to the Free Software
00018 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00019 //
00020 #ifndef CLASP_MULIT_QUEUE_H_INCLUDED
00021 #define CLASP_MULIT_QUEUE_H_INCLUDED
00022 
00023 #include <clasp/util/platform.h>
00024 #include <clasp/util/atomic.h>
00025 
00026 namespace Clasp { namespace mt { namespace Detail {
00027 struct NodeBase {
00028         typedef Clasp::atomic<NodeBase*> AtomicPtr;
00029         typedef Clasp::atomic<int>       AtomicInt;
00030         explicit NodeBase(uint32 rc) { next = 0; refs = rc; }
00031         AtomicPtr next;
00032         AtomicInt refs;
00033 };
00034 
00035 template <class T>
00036 struct Node : public NodeBase {
00037         Node(uint32 rc, const T& d) : NodeBase(rc), data(d) {}
00038         T data;
00039 };
00040 struct DefaultDeleter { 
00041         template <class T> 
00042         void operator()(T& obj) const {
00043                 (void)obj;
00044                 obj.~T();
00045         } 
00046 };
00047 }
00048 
00050 
00056 template <class T, class Deleter = Detail::DefaultDeleter>
00057 class MultiQueue {
00058 protected:
00059         typedef Detail::Node<T>   Node;
00060         typedef Detail::NodeBase  NodeBase;
00061 public:
00062         typedef Detail::NodeBase* ThreadId;
00064         explicit MultiQueue(uint32 m, const Deleter& d = Deleter()) : head_(m+1), maxQ_(m), deleter_(d) {
00065                 tail_         = &head_;
00066         }
00067         uint32 maxThreads() const { return maxQ_; }
00068         void reserve(uint32 c) {
00069                 for (uint32 i = 0; i != c; ++i) {
00070                         void* m = ::operator new(sizeof(Node));
00071                         freeList_.push(new (m) NodeBase(0));
00072                 }
00073         }
00075         ~MultiQueue() {
00076                 for (NodeBase* x = head_.next; x ; ) {
00077                         Node* n = static_cast<Node*>(x);
00078                         x = x->next;
00079                         deleter_(n->data);
00080                         freeList_.push(n);
00081                 }
00082         }
00084 
00088         ThreadId addThread() {
00089                 --head_.refs;
00090                 assert(head_.refs > 0);
00091                 return &head_;
00092         }
00093         bool hasItems(ThreadId& cId) const { return cId != tail_; }
00094         
00096 
00100         bool tryConsume(ThreadId& cId, T& out) {
00101                 if (cId != tail_) {
00102                         NodeBase* n = cId;
00103                         cId         = cId->next;
00104                         assert(cId != 0 && "MultiQueue is corrupted!");
00105                         release(n);
00106                         out = static_cast<Node*>(cId)->data;
00107                         return true;
00108                 }
00109                 return false;
00110         }
00112 
00115         void pop(ThreadId& cId) {
00116                 assert(hasItems(cId) && "Cannot pop from empty queue!");
00117                 NodeBase* n = cId;
00118                 cId         = cId->next;
00119                 release(n);
00120         }
00121 protected:
00123 
00127         void unsafePublish(const T& in, const ThreadId&) {
00128                 Node* n     = freeList_.allocate(in, maxQ_);
00129                 publishRelaxed(n);
00130         }
00131 
00133         void publish(const T& in, const ThreadId&) {
00134                 Node* newNode = freeList_.allocate(in, maxQ_);
00135                 NodeBase* assumedTail, *assumedNext;
00136                 do {
00137                         assumedTail = tail_;
00138                         assumedNext = assumedTail->next;
00139                         if (assumedTail != tail_) { 
00140                                 // tail has changed - try again
00141                                 continue; 
00142                         }
00143                         if (assumedNext != 0) {
00144                                 // someone has added a new node but has not yet
00145                                 // moved the tail - assist him and start over
00146                                 tail_.compare_and_swap(assumedNext, assumedTail); 
00147                                 continue;
00148                         }
00149                 } while (assumedTail->next.compare_and_swap(newNode, 0) != 0);
00150                 // Now that we managed to link a new node to what we think is the current tail
00151                 // we try to update the tail. If the tail is still what we think it is, 
00152                 // it is moved - otherwise some other thread already did that for us.
00153                 tail_.compare_and_swap(newNode, assumedTail);
00154         }
00155 
00157         void publishRelaxed(NodeBase* n) {
00158                 tail_->next = n;
00159                 tail_       = n;
00160         }
00161         uint32 maxQ() const { return maxQ_; }
00162         Node*  allocate(uint32 maxR, const T& in) {
00163                 return freeList_.allocate(in, maxR);
00164         }
00165 private:
00166         MultiQueue(const MultiQueue&);
00167         MultiQueue& operator=(const MultiQueue&);
00168         // Stack of free nodes
00169         struct FreeList {
00170                 FreeList() { top = 0; }
00171                 ~FreeList(){
00172                         for (NodeBase* n = top; n != 0; ) {
00173                                 NodeBase* t = n;
00174                                 n = n->next;
00175                                 ::operator delete(t);
00176                         }
00177                 }
00178                 void push(NodeBase* n) {
00179                         NodeBase* assumedTop;
00180                         do {
00181                                 assumedTop = top;
00182                                 n->next    = assumedTop;
00183                         } while (top.compare_and_swap(n, assumedTop) != assumedTop);
00184                 }
00185                 NodeBase* tryPop() {
00186                         NodeBase* n = 0, *next = 0;
00187                         do {
00188                                 n = top;
00189                                 if (!n) return 0;
00190                                 // NOTE: 
00191                                 // If the queue is used correctly, n is
00192                                 // safe and n->next is ABA-safe at this point.
00193                                 // The ref-counting in the queue makes sure
00194                                 // that a node (here n) cannot be added back
00195                                 // to the free list while another thread 
00196                                 // is still in tryPop() - that thread had
00197                                 // not yet the chance to decrease the node's
00198                                 // ref count.
00199                                 next = n->next;
00200                         } while (top.compare_and_swap(next, n) != n);
00201                         return n;
00202                 }
00203                 Node* allocate(const T& in, uint32 maxRef) {
00204                         if (NodeBase* n = tryPop()) {
00205                                 return new (n) Node(maxRef, in);
00206                         }
00207                         else { 
00208                                 void* mem = ::operator new(sizeof(Node));
00209                                 return new (mem) Node(maxRef, in);
00210                         }
00211                 }
00212                 NodeBase::AtomicPtr top;
00213         };
00214         
00215         void release(NodeBase* n) {
00216                 if (n != &head_ && --n->refs == 0) {
00217                         head_.next = n->next;
00218                         deleter_(static_cast<Node*>(n)->data);
00219                         freeList_.push(n);
00220                 }
00221         }
00222         NodeBase            head_;
00223         NodeBase::AtomicPtr tail_;
00224         FreeList            freeList_;
00225         const uint32        maxQ_;
00226         Deleter             deleter_;
00227 };
00228 
00229 } } // end namespace Clasp::mt
00230 #endif


clasp
Author(s): Benjamin Kaufmann
autogenerated on Thu Aug 27 2015 12:41:39