Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifndef CLASP_MULIT_QUEUE_H_INCLUDED
00021 #define CLASP_MULIT_QUEUE_H_INCLUDED
00022
00023 #include <clasp/util/platform.h>
00024 #include <clasp/util/atomic.h>
00025
00026 namespace Clasp { namespace mt { namespace Detail {
00027 struct NodeBase {
00028 typedef Clasp::atomic<NodeBase*> AtomicPtr;
00029 typedef Clasp::atomic<int> AtomicInt;
00030 explicit NodeBase(uint32 rc) { next = 0; refs = rc; }
00031 AtomicPtr next;
00032 AtomicInt refs;
00033 };
00034
00035 template <class T>
00036 struct Node : public NodeBase {
00037 Node(uint32 rc, const T& d) : NodeBase(rc), data(d) {}
00038 T data;
00039 };
00040 struct DefaultDeleter {
00041 template <class T>
00042 void operator()(T& obj) const {
00043 (void)obj;
00044 obj.~T();
00045 }
00046 };
00047 }
00048
00050
00056 template <class T, class Deleter = Detail::DefaultDeleter>
00057 class MultiQueue {
00058 protected:
00059 typedef Detail::Node<T> Node;
00060 typedef Detail::NodeBase NodeBase;
00061 public:
00062 typedef Detail::NodeBase* ThreadId;
00064 explicit MultiQueue(uint32 m, const Deleter& d = Deleter()) : head_(m+1), maxQ_(m), deleter_(d) {
00065 tail_ = &head_;
00066 }
00067 uint32 maxThreads() const { return maxQ_; }
00068 void reserve(uint32 c) {
00069 for (uint32 i = 0; i != c; ++i) {
00070 void* m = ::operator new(sizeof(Node));
00071 freeList_.push(new (m) NodeBase(0));
00072 }
00073 }
00075 ~MultiQueue() {
00076 for (NodeBase* x = head_.next; x ; ) {
00077 Node* n = static_cast<Node*>(x);
00078 x = x->next;
00079 deleter_(n->data);
00080 freeList_.push(n);
00081 }
00082 }
00084
00088 ThreadId addThread() {
00089 --head_.refs;
00090 assert(head_.refs > 0);
00091 return &head_;
00092 }
00093 bool hasItems(ThreadId& cId) const { return cId != tail_; }
00094
00096
00100 bool tryConsume(ThreadId& cId, T& out) {
00101 if (cId != tail_) {
00102 NodeBase* n = cId;
00103 cId = cId->next;
00104 assert(cId != 0 && "MultiQueue is corrupted!");
00105 release(n);
00106 out = static_cast<Node*>(cId)->data;
00107 return true;
00108 }
00109 return false;
00110 }
00112
00115 void pop(ThreadId& cId) {
00116 assert(hasItems(cId) && "Cannot pop from empty queue!");
00117 NodeBase* n = cId;
00118 cId = cId->next;
00119 release(n);
00120 }
00121 protected:
00123
00127 void unsafePublish(const T& in, const ThreadId&) {
00128 Node* n = freeList_.allocate(in, maxQ_);
00129 publishRelaxed(n);
00130 }
00131
00133 void publish(const T& in, const ThreadId&) {
00134 Node* newNode = freeList_.allocate(in, maxQ_);
00135 NodeBase* assumedTail, *assumedNext;
00136 do {
00137 assumedTail = tail_;
00138 assumedNext = assumedTail->next;
00139 if (assumedTail != tail_) {
00140
00141 continue;
00142 }
00143 if (assumedNext != 0) {
00144
00145
00146 tail_.compare_and_swap(assumedNext, assumedTail);
00147 continue;
00148 }
00149 } while (assumedTail->next.compare_and_swap(newNode, 0) != 0);
00150
00151
00152
00153 tail_.compare_and_swap(newNode, assumedTail);
00154 }
00155
00157 void publishRelaxed(NodeBase* n) {
00158 tail_->next = n;
00159 tail_ = n;
00160 }
00161 uint32 maxQ() const { return maxQ_; }
00162 Node* allocate(uint32 maxR, const T& in) {
00163 return freeList_.allocate(in, maxR);
00164 }
00165 private:
00166 MultiQueue(const MultiQueue&);
00167 MultiQueue& operator=(const MultiQueue&);
00168
00169 struct FreeList {
00170 FreeList() { top = 0; }
00171 ~FreeList(){
00172 for (NodeBase* n = top; n != 0; ) {
00173 NodeBase* t = n;
00174 n = n->next;
00175 ::operator delete(t);
00176 }
00177 }
00178 void push(NodeBase* n) {
00179 NodeBase* assumedTop;
00180 do {
00181 assumedTop = top;
00182 n->next = assumedTop;
00183 } while (top.compare_and_swap(n, assumedTop) != assumedTop);
00184 }
00185 NodeBase* tryPop() {
00186 NodeBase* n = 0, *next = 0;
00187 do {
00188 n = top;
00189 if (!n) return 0;
00190
00191
00192
00193
00194
00195
00196
00197
00198
00199 next = n->next;
00200 } while (top.compare_and_swap(next, n) != n);
00201 return n;
00202 }
00203 Node* allocate(const T& in, uint32 maxRef) {
00204 if (NodeBase* n = tryPop()) {
00205 return new (n) Node(maxRef, in);
00206 }
00207 else {
00208 void* mem = ::operator new(sizeof(Node));
00209 return new (mem) Node(maxRef, in);
00210 }
00211 }
00212 NodeBase::AtomicPtr top;
00213 };
00214
00215 void release(NodeBase* n) {
00216 if (n != &head_ && --n->refs == 0) {
00217 head_.next = n->next;
00218 deleter_(static_cast<Node*>(n)->data);
00219 freeList_.push(n);
00220 }
00221 }
00222 NodeBase head_;
00223 NodeBase::AtomicPtr tail_;
00224 FreeList freeList_;
00225 const uint32 maxQ_;
00226 Deleter deleter_;
00227 };
00228
00229 } }
00230 #endif