$search
00001 /*************************************************************************** 00002 tag: The SourceWorks Tue Sep 7 00:55:18 CEST 2010 AtomicQueue.hpp 00003 00004 AtomicQueue.hpp - description 00005 ------------------- 00006 begin : Tue September 07 2010 00007 copyright : (C) 2010 The SourceWorks 00008 email : peter@thesourceworks.com 00009 00010 *************************************************************************** 00011 * This library is free software; you can redistribute it and/or * 00012 * modify it under the terms of the GNU General Public * 00013 * License as published by the Free Software Foundation; * 00014 * version 2 of the License. * 00015 * * 00016 * As a special exception, you may use this file as part of a free * 00017 * software library without restriction. Specifically, if other files * 00018 * instantiate templates or use macros or inline functions from this * 00019 * file, or you compile this file and link it with other files to * 00020 * produce an executable, this file does not by itself cause the * 00021 * resulting executable to be covered by the GNU General Public * 00022 * License. This exception does not however invalidate any other * 00023 * reasons why the executable file might be covered by the GNU General * 00024 * Public License. * 00025 * * 00026 * This library is distributed in the hope that it will be useful, * 00027 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 00028 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * 00029 * Lesser General Public License for more details. * 00030 * * 00031 * You should have received a copy of the GNU General Public * 00032 * License along with this library; if not, write to the Free Software * 00033 * Foundation, Inc., 59 Temple Place, * 00034 * Suite 330, Boston, MA 02111-1307 USA * 00035 * * 00036 ***************************************************************************/ 00037 00038 00039 #ifndef ORO_CORELIB_ATOMIC_QUEUE_HPP 00040 #define ORO_CORELIB_ATOMIC_QUEUE_HPP 00041 00042 #include "../os/CAS.hpp" 00043 #include <utility> 00044 00045 namespace RTT 00046 { 00047 namespace internal { 00069 template<class T> 00070 class AtomicQueue 00071 { 00072 const int _size; 00073 typedef T C; 00074 typedef volatile C* CachePtrType; 00075 typedef C* volatile CacheObjType; 00076 typedef C ValueType; 00077 typedef C* PtrType; 00078 00079 union SIndexes 00080 { 00081 unsigned long _value; 00082 unsigned short _index[2]; 00083 }; 00084 00089 CachePtrType _buf; 00090 00095 volatile SIndexes _indxes; 00096 00104 CachePtrType recover_r() const 00105 { 00106 // The implementation starts from the read pointer, 00107 // and wraps around until all fields were scanned. 00108 // As such, the out-of-order elements will at least 00109 // be returned in their relative order. 00110 SIndexes start; 00111 start._value = _indxes._value; 00112 unsigned short r = start._index[1]; 00113 while( r != _size) { 00114 if (_buf[r]) 00115 return &_buf[r]; 00116 ++r; 00117 } 00118 r = 0; 00119 while( r != start._index[1]) { 00120 if (_buf[r]) 00121 return &_buf[r]; 00122 ++r; 00123 } 00124 return 0; 00125 } 00126 00131 CachePtrType propose_w() 00132 { 00133 SIndexes oldval, newval; 00134 do { 00135 oldval._value = _indxes._value; /*Points to a free writable pointer.*/ 00136 newval._value = oldval._value; /*Points to the next writable pointer.*/ 00137 // check for full on a *Copy* of oldval: 00138 if ( (newval._index[0] == newval._index[1] - 1) || (newval._index[0] == newval._index[1] + _size - 1) ) 00139 { 00140 // note: in case of high contention, there might be existing empty fields 00141 // in _buf that aren't used. 00142 return 0; 00143 } 00144 ++newval._index[0]; 00145 if ( newval._index[0] == _size ) 00146 newval._index[0] = 0; 00147 // if ptr is unchanged, replace it with newval. 00148 } while ( !os::CAS( &_indxes._value, oldval._value, newval._value) ); 00149 00150 // the returned field may contain data, in that case, the caller needs to retry. 00151 return &_buf[ oldval._index[0] ]; 00152 } 00157 CachePtrType propose_r() 00158 { 00159 SIndexes oldval, newval; 00160 do { 00161 oldval._value = _indxes._value; 00162 newval._value = oldval._value; 00163 // check for empty on a *Copy* of oldval: 00164 if ( newval._index[0] == newval._index[1] ) 00165 { 00166 // seldom: R and W are indicating empty, but 'lost' fields 00167 // are to be picked up. Return these 00168 // that would have been read eventually after some writes. 00169 return recover_r(); 00170 } 00171 ++newval._index[1]; 00172 if ( newval._index[1] == _size ) 00173 newval._index[1] = 0; 00174 00175 } while ( !os::CAS( &_indxes._value, oldval._value, newval._value) ); 00176 // the returned field may contain *no* data, in that case, the caller needs to retry. 00177 // as such r will advance until it hits a data sample or write pointer. 00178 return &_buf[oldval._index[1] ]; 00179 } 00180 00181 // non-copyable ! 00182 AtomicQueue( const AtomicQueue<T>& ); 00183 public: 00184 typedef unsigned int size_type; 00185 00190 AtomicQueue( unsigned int size ) 00191 : _size(size+1) 00192 { 00193 _buf= new C[_size]; 00194 this->clear(); 00195 } 00196 00197 ~AtomicQueue() 00198 { 00199 delete[] _buf; 00200 } 00201 00206 bool isFull() const 00207 { 00208 // two cases where the queue is full : 00209 // if wptr is one behind rptr or if wptr is at end 00210 // and rptr at beginning. 00211 SIndexes val; 00212 val._value = _indxes._value; 00213 return val._index[0] == val._index[1] - 1 || val._index[0] == val._index[1] + _size - 1; 00214 } 00215 00220 bool isEmpty() const 00221 { 00222 // empty if nothing to read. 00223 SIndexes val; 00224 val._value = _indxes._value; 00225 return val._index[0] == val._index[1] && recover_r() == 0; 00226 } 00227 00231 size_type capacity() const 00232 { 00233 return _size -1; 00234 } 00235 00241 size_type size() const 00242 { 00243 int c = 0, ret = 0; 00244 while (c != _size ) { 00245 if (_buf[c++] ) 00246 ++ret; 00247 } 00248 return ret; 00249 //int c = (_indxes._index[0] - _indxes._index[1]); 00250 //return c >= 0 ? c : c + _size; 00251 } 00252 00258 bool enqueue(const T& value) 00259 { 00260 if ( value == 0 ) 00261 return false; 00262 CachePtrType loc; 00263 C null = 0; 00264 do { 00265 loc = propose_w(); 00266 if ( loc == 0 ) 00267 return false; //full 00268 // if loc contains a zero, write it, otherwise, re-try. 00269 } while( !os::CAS(loc, null, value)); 00270 return true; 00271 } 00272 00278 bool dequeue( T& result ) 00279 { 00280 CachePtrType loc; 00281 C null = 0; 00282 do { 00283 loc = propose_r(); 00284 if ( loc == 0 ) 00285 return false; // empty 00286 result = *loc; 00287 // if loc still contains result, clear it, otherwise, re-try. 00288 } while( result == 0 || !os::CAS(loc, result, null) ); 00289 assert(result); 00290 return true; 00291 } 00292 00296 const T front() const 00297 { 00298 return _buf[_indxes._index[1] ]; 00299 } 00300 00304 void clear() 00305 { 00306 for(int i = 0 ; i != _size; ++i) { 00307 _buf[i] = 0; 00308 } 00309 _indxes._value = 0; 00310 } 00311 00312 }; 00313 00314 }} 00315 00316 #endif