00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #ifndef ORO_CORELIB_ATOMIC_MWSR_QUEUE_HPP
00040 #define ORO_CORELIB_ATOMIC_MWSR_QUEUE_HPP
00041
00042 #include "../os/CAS.hpp"
00043 #include <utility>
00044
00045 namespace RTT
00046 {
00047 namespace internal
00048 {
00058 template<class T>
00059 class AtomicMWSRQueue
00060 {
00061
00062 const int _size;
00063 typedef T C;
00064 typedef volatile C* CachePtrType;
00065 typedef C* volatile CacheObjType;
00066 typedef C ValueType;
00067 typedef C* PtrType;
00068
00076 union SIndexes
00077 {
00078 unsigned long _value;
00079 unsigned short _index[2];
00080 };
00081
00086 CachePtrType _buf;
00087
00092 volatile SIndexes _indxes;
00093
00098 CachePtrType advance_w()
00099 {
00100 SIndexes oldval, newval;
00101 do
00102 {
00103 oldval._value = _indxes._value;
00104 newval._value = oldval._value;
00105
00106 if ((newval._index[0] == newval._index[1] - 1) || (newval._index[0] == newval._index[1] + _size - 1))
00107 {
00108 return 0;
00109 }
00110 newval._index[0]++;
00111 if (newval._index[0] >= _size)
00112 newval._index[0] = 0;
00113
00114 } while (!os::CAS(&_indxes._value, oldval._value, newval._value));
00115
00116
00117
00118
00119
00120
00121 return &_buf[oldval._index[0]];
00122 }
00123
00128 bool advance_r(T& result)
00129 {
00130 SIndexes oldval, newval;
00131
00132 oldval._value = _indxes._value;
00133 result = _buf[oldval._index[1]];
00134
00135 if ( !result )
00136 return false;
00137
00138 _buf[oldval._index[1]] = 0;
00139
00140
00141 do
00142 {
00143
00144
00145 oldval._value = _indxes._value;
00146 newval._value = oldval._value;
00147 ++newval._index[1];
00148 if (newval._index[1] >= _size)
00149 newval._index[1] = 0;
00150
00151
00152
00153 } while (!os::CAS(&_indxes._value, oldval._value, newval._value));
00154
00155 return true;
00156 }
00157
00158
00159 AtomicMWSRQueue(const AtomicMWSRQueue<T>&);
00160 public:
00161 typedef unsigned int size_type;
00162
00167 AtomicMWSRQueue(unsigned int size) :
00168 _size(size + 1)
00169 {
00170 _buf = new C[_size];
00171 this->clear();
00172 }
00173
00174 ~AtomicMWSRQueue()
00175 {
00176 delete[] _buf;
00177 }
00178
00183 bool isFull() const
00184 {
00185
00186
00187
00188 SIndexes val;
00189 val._value = _indxes._value;
00190 return val._index[0] == val._index[1] - 1 || val._index[0] == val._index[1] + _size - 1;
00191 }
00192
00197 bool isEmpty() const
00198 {
00199
00200 SIndexes val;
00201 val._value = _indxes._value;
00202 return val._index[0] == val._index[1];
00203 }
00204
00208 size_type capacity() const
00209 {
00210 return _size - 1;
00211 }
00212
00216 size_type size() const
00217 {
00218 SIndexes val;
00219 val._value = _indxes._value;
00220 int c = (val._index[0] - val._index[1]);
00221 return c >= 0 ? c : c + _size;
00222 }
00223
00229 bool enqueue(const T& value)
00230 {
00231 if (value == 0)
00232 return false;
00233 CachePtrType loc = advance_w();
00234 if (loc == 0)
00235 return false;
00236 *loc = value;
00237 return true;
00238 }
00239
00247 bool dequeue(T& result)
00248 {
00249 T tmpresult;
00250 if (advance_r(tmpresult) ) {
00251 result = tmpresult;
00252 return true;
00253 }
00254 return false;
00255 }
00256
00260 const T front() const
00261 {
00262 return _buf[_indxes._index[1]];
00263 }
00264
00268 void clear()
00269 {
00270 for (int i = 0; i != _size; ++i)
00271 {
00272 _buf[i] = 0;
00273 }
00274 _indxes._value = 0;
00275 }
00276
00277 };
00278
00279 }
00280 }
00281 #endif