AtomicMWMRQueue.hpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: The SourceWorks Tue Sep 7 00:55:18 CEST 2010 AtomicQueue.hpp
3 
4  AtomicQueue.hpp - description
5  -------------------
6  begin : Tue September 07 2010
7  copyright : (C) 2010 The SourceWorks
8  email : peter@thesourceworks.com
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 
39 #ifndef ORO_CORELIB_ATOMIC_MWMR_QUEUE_HPP
40 #define ORO_CORELIB_ATOMIC_MWMR_QUEUE_HPP
41 
42 #include "AtomicQueue.hpp"
43 #include "../os/CAS.hpp"
44 #include <utility>
45 
46 namespace RTT
47 {
48  namespace internal {
70  template<class T>
71  class AtomicMWMRQueue : public AtomicQueue<T>
72  {
73  const int _size;
74  typedef T C;
75  typedef volatile C* CachePtrType;
76  typedef C* volatile CacheObjType;
77  typedef C ValueType;
78  typedef C* PtrType;
79 
80  union SIndexes
81  {
82  unsigned long _value;
83  unsigned short _index[2];
84  };
85 
90  CachePtrType _buf;
91 
96  volatile SIndexes _indxes;
97 
105  CachePtrType recover_r() const
106  {
107  // The implementation starts from the read pointer,
108  // and wraps around until all fields were scanned.
109  // As such, the out-of-order elements will at least
110  // be returned in their relative order.
111  SIndexes start;
112  start._value = _indxes._value;
113  unsigned short r = start._index[1];
114  while( r != _size) {
115  if (_buf[r])
116  return &_buf[r];
117  ++r;
118  }
119  r = 0;
120  while( r != start._index[1]) {
121  if (_buf[r])
122  return &_buf[r];
123  ++r;
124  }
125  return 0;
126  }
127 
132  CachePtrType propose_w()
133  {
134  SIndexes oldval, newval;
135  do {
136  oldval._value = _indxes._value; /*Points to a free writable pointer.*/
137  newval._value = oldval._value; /*Points to the next writable pointer.*/
138  // check for full on a *Copy* of oldval:
139  if ( (newval._index[0] == newval._index[1] - 1) || (newval._index[0] == newval._index[1] + _size - 1) )
140  {
141  // note: in case of high contention, there might be existing empty fields
142  // in _buf that aren't used.
143  return 0;
144  }
145  ++newval._index[0];
146  if ( newval._index[0] == _size )
147  newval._index[0] = 0;
148  // if ptr is unchanged, replace it with newval.
149  } while ( !os::CAS( &_indxes._value, oldval._value, newval._value) );
150 
151  // the returned field may contain data, in that case, the caller needs to retry.
152  return &_buf[ oldval._index[0] ];
153  }
158  CachePtrType propose_r()
159  {
160  SIndexes oldval, newval;
161  do {
162  oldval._value = _indxes._value;
163  newval._value = oldval._value;
164  // check for empty on a *Copy* of oldval:
165  if ( newval._index[0] == newval._index[1] )
166  {
167  // seldom: R and W are indicating empty, but 'lost' fields
168  // are to be picked up. Return these
169  // that would have been read eventually after some writes.
170  return recover_r();
171  }
172  ++newval._index[1];
173  if ( newval._index[1] == _size )
174  newval._index[1] = 0;
175 
176  } while ( !os::CAS( &_indxes._value, oldval._value, newval._value) );
177  // the returned field may contain *no* data, in that case, the caller needs to retry.
178  // as such r will advance until it hits a data sample or write pointer.
179  return &_buf[oldval._index[1] ];
180  }
181 
182  // non-copyable !
184  public:
186 
191  AtomicMWMRQueue( unsigned int size )
192  : _size(size+1)
193  {
194  _buf= new C[_size];
195  this->clear();
196  }
197 
199  {
200  delete[] _buf;
201  }
202 
207  bool isFull() const
208  {
209  // two cases where the queue is full :
210  // if wptr is one behind rptr or if wptr is at end
211  // and rptr at beginning.
212  SIndexes val;
213  val._value = _indxes._value;
214  return val._index[0] == val._index[1] - 1 || val._index[0] == val._index[1] + _size - 1;
215  }
216 
221  bool isEmpty() const
222  {
223  // empty if nothing to read.
224  SIndexes val;
225  val._value = _indxes._value;
226  return val._index[0] == val._index[1] && recover_r() == 0;
227  }
228 
232  size_type capacity() const
233  {
234  return _size -1;
235  }
236 
242  size_type size() const
243  {
244  int c = 0, ret = 0;
245  while (c != _size ) {
246  if (_buf[c++] )
247  ++ret;
248  }
249  return ret;
250  //int c = (_indxes._index[0] - _indxes._index[1]);
251  //return c >= 0 ? c : c + _size;
252  }
253 
259  bool enqueue(const T& value)
260  {
261  if ( value == 0 )
262  return false;
263  CachePtrType loc;
264  C null = 0;
265  do {
266  loc = propose_w();
267  if ( loc == 0 )
268  return false; //full
269  // if loc contains a zero, write it, otherwise, re-try.
270  } while( !os::CAS(loc, null, value));
271  return true;
272  }
273 
279  bool dequeue( T& result )
280  {
281  CachePtrType loc;
282  C null = 0;
283  do {
284  loc = propose_r();
285  if ( loc == 0 )
286  return false; // empty
287  result = *loc;
288  // if loc still contains result, clear it, otherwise, re-try.
289  } while( result == 0 || !os::CAS(loc, result, null) );
290  assert(result);
291  return true;
292  }
293 
297  const T front() const
298  {
299  return _buf[_indxes._index[1] ];
300  }
301 
305  void clear()
306  {
307  for(int i = 0 ; i != _size; ++i) {
308  _buf[i] = 0;
309  }
310  _indxes._value = 0;
311  }
312 
313  };
314 
315 }}
316 
317 #endif
bool CAS(volatile T *addr, const V &expected, const W &value)
Definition: CAS.hpp:54
CachePtrType recover_r() const
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:53
AtomicQueue< T >::size_type size_type
AtomicMWMRQueue(const AtomicQueue< T > &)


rtt
Author(s): RTT Developers
autogenerated on Tue Jun 25 2019 19:33:20