DataObjectLockFree.hpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: Peter Soetens Mon Jan 19 14:11:26 CET 2004 DataObjectLockFree.hpp
3 
4  DataObjectLockFree.hpp - description
5  -------------------
6  begin : Mon January 19 2004
7  copyright : (C) 2004 Peter Soetens
8  email : peter.soetens@mech.kuleuven.ac.be
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 #ifndef CORELIB_DATAOBJECT_LOCK_FREE_HPP
39 #define CORELIB_DATAOBJECT_LOCK_FREE_HPP
40 
41 
42 #include "../os/oro_arch.h"
43 #include "DataObjectInterface.hpp"
44 #include "../Logger.hpp"
45 #include "../types/Types.hpp"
46 #include "../internal/DataSourceTypeInfo.hpp"
47 
48 namespace RTT
49 { namespace base {
50 
79  template<class T>
81  : public DataObjectInterface<T>
82  {
83  public:
87 
88  typedef typename DataObjectBase::Options Options;
89 
93  const unsigned int MAX_THREADS;
94 
95  private:
99  const unsigned int BUF_LEN; // = MAX_THREADS+2
100 
108  struct DataBuf {
110  : data(), status(NoData), next()
111  {
114  }
115  value_t data;
119  };
120 
121  typedef DataBuf* volatile VolPtrType;
123  typedef DataBuf* PtrType;
124 
125  VolPtrType read_ptr;
126  VolPtrType write_ptr;
127 
132 
134 
135  public:
136 
141  DataObjectLockFree( const Options &options = Options() )
142  : MAX_THREADS(options.max_threads()), BUF_LEN( options.max_threads() + 2),
143  read_ptr(0),
144  write_ptr(0),
145  initialized(false)
146  {
147  data = new DataBuf[BUF_LEN];
148  read_ptr = &data[0];
149  write_ptr = &data[1];
150  }
151 
158  DataObjectLockFree( param_t initial_value, const Options &options = Options() )
159  : MAX_THREADS(options.max_threads()), BUF_LEN( options.max_threads() + 2),
160  read_ptr(0),
161  write_ptr(0),
162  initialized(false)
163  {
164  data = new DataBuf[BUF_LEN];
165  read_ptr = &data[0];
166  write_ptr = &data[1];
167  data_sample(initial_value);
168  }
169 
171  delete[] data;
172  }
173 
181  virtual value_t Get() const {
182  value_t cache = value_t();
183  Get(cache);
184  return cache;
185  }
186 
197  virtual FlowStatus Get( reference_t pull, bool copy_old_data, bool copy_sample ) const
198  {
199  if (!initialized && !copy_sample) {
200  return NoData;
201  }
202 
203  PtrType reading;
204  // loop to combine Read/Modify of counter
205  // This avoids a race condition where read_ptr
206  // could become write_ptr ( then we would read corrupted data).
207  do {
208  reading = read_ptr; // copy buffer location
209  oro_atomic_inc(&reading->read_counter); // lock buffer, no more writes
210  // XXX smp_mb
211  if ( reading != read_ptr ) // if read_ptr changed,
212  oro_atomic_dec(&reading->read_counter); // better to start over.
213  else
214  break;
215  } while ( true );
216  // from here on we are sure that 'reading'
217  // is a valid buffer to read from.
218 
219  // compare-and-swap FlowStatus field to make sure that only one reader
220  // returns NewData
221  FlowStatus result;
222  do {
223  result = reading->status;
224  } while((result != NoData) && !os::CAS(&reading->status, result, OldData));
225 
226  if ((result == NewData) ||
227  ((result == OldData) && copy_old_data) || copy_sample) {
228  pull = reading->data; // takes some time
229  }
230 
231  // XXX smp_mb
232  oro_atomic_dec(&reading->read_counter); // release buffer
233  return result;
234  }
235 
245  virtual FlowStatus Get( reference_t pull, bool copy_old_data = true ) const
246  {
247  return Get( pull, copy_old_data, /* copy_sample = */ false );
248  }
249 
255  virtual bool Set( param_t push )
256  {
257  if (!initialized) {
258  log(Error) << "You set a lock-free data object of type " << internal::DataSourceTypeInfo<T>::getType() << " without initializing it with a data sample. "
259  << "This might not be real-time safe." << endlog();
260  data_sample(value_t(), true);
261  }
262 
263  PtrType writing = write_ptr; // copy buffer location
264  if (!oro_atomic_inc_and_test(&writing->write_lock)) {
265  // abort, another thread already successfully locked this buffer element
266  oro_atomic_dec(&writing->write_lock);
267  return false;
268  }
269 
270  // Additional check that resolves the following race condition:
271  // - writer A copies the write_ptr and acquires the lock writing->write_lock
272  // - writer B copies the write_ptr
273  // - writer A continues to update and increments the write_ptr
274  // - writer A releases the lock writing->write_lock
275  // - writer B acquires the lock successfully, but for the same buffer element that already
276  // has been written by writer A and can potentially be accessed by readers now!
277  if ( writing != write_ptr ) {
278  // abort, another thread already updated the write_ptr, which could imply that read_ptr == writing now
279  oro_atomic_dec(&writing->write_lock);
280  return false;
281  }
282  // from here on we are sure that 'writing'
283  // is a valid buffer to write to and we
284  // have exclusive access
285 
286  // copy sample
287  writing->data = push;
288  writing->status = NewData;
289 
290  // if next field is occupied (by read_ptr or counter),
291  // go to next and check again...
292  PtrType next_write_ptr = writing->next;
293  while ( oro_atomic_read( &next_write_ptr->read_counter ) != 0 ||
294  next_write_ptr == read_ptr )
295  {
296  next_write_ptr = next_write_ptr->next;
297  if (next_write_ptr == writing) {
298  oro_atomic_dec(&writing->write_lock);
299  return false; // nothing found, too many readers !
300  }
301  }
302 
303  // we will be able to move, so replace read_ptr
304  read_ptr = writing;
305  write_ptr = next_write_ptr; // we checked this in the while loop
306  oro_atomic_dec(&writing->write_lock);
307  return true;
308  }
309 
310  virtual bool data_sample( param_t sample, bool reset = true ) {
311  if (!initialized || reset) {
312  // prepare the buffer.
313  for (unsigned int i = 0; i < BUF_LEN; ++i) {
314  data[i].data = sample;
315  data[i].status = NoData;
316  data[i].next = &data[i+1];
317  }
318  data[BUF_LEN-1].next = &data[0];
319  initialized = true;
320  return true;
321  } else {
322  return initialized;
323  }
324  }
325 
329  virtual value_t data_sample() const {
330  value_t sample;
331  (void) Get(sample, /* copy_old_data = */ true, /* copy_sample = */ true);
332  return sample;
333  }
334 
335  // This is actually a copy of Get(), but it only sets the status to NoData once a valid buffer has been found.
336  // Subsequent read() calls will read from the same buffer and will return NoData until a new sample has been written.
337  virtual void clear() {
338  if (!initialized) return;
339 
340  PtrType reading;
341  // loop to combine Read/Modify of counter
342  // This avoids a race condition where read_ptr
343  // could become write_ptr ( then we would read corrupted data).
344  do {
345  reading = read_ptr; // copy buffer location
346  oro_atomic_inc(&reading->read_counter); // lock buffer, no more writes
347  // XXX smp_mb
348  if ( reading != read_ptr ) // if read_ptr changed,
349  oro_atomic_dec(&reading->read_counter); // better to start over.
350  else
351  break;
352  } while ( true );
353  // from here on we are sure that 'reading'
354  // is a valid buffer to read from.
355 
356  // compare-and-swap FlowStatus field to avoid the race condition
357  // where a reader replaces it by OldData
358  FlowStatus result;
359  do {
360  result = reading->status;
361  } while(!os::CAS(&reading->status, result, NoData));
362 
363  // XXX smp_mb
364  oro_atomic_dec(&reading->read_counter); // release buffer
365  }
366  };
367 }}
368 
369 #endif
virtual value_t Get() const
DataObjectInterface< T >::value_t value_t
virtual bool Set(param_t push)
virtual FlowStatus Get(reference_t pull, bool copy_old_data, bool copy_sample) const
virtual value_t data_sample() const
FlowStatus
Definition: FlowStatus.hpp:56
int oro_atomic_inc_and_test(oro_atomic_t *a)
virtual bool data_sample(param_t sample, bool reset=true)
DataObjectLockFree(param_t initial_value, const Options &options=Options())
int oro_atomic_read(oro_atomic_t *a)
bool CAS(volatile T *addr, const V &expected, const W &value)
Definition: CAS.hpp:54
virtual FlowStatus Get(reference_t pull, bool copy_old_data=true) const
void oro_atomic_inc(oro_atomic_t *a)
This DataObject is a Lock-Free implementation, such that reads and writes can happen concurrently wit...
DataObjectInterface< T >::reference_t reference_t
DataObjectBase::Options Options
boost::call_traits< T >::param_type param_t
DataObjectInterface< T >::param_t param_t
const unsigned int MAX_THREADS
The maximum number of threads.
A DataObjectInterface implements multi-threaded read/write solutions.
void oro_atomic_set(oro_atomic_t *a, int n)
static const std::string & getType()
boost::call_traits< T >::reference reference_t
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:53
static Logger & log()
Definition: Logger.hpp:350
void oro_atomic_dec(oro_atomic_t *a)
static Logger::LogFunction endlog()
Definition: Logger.hpp:362
DataObjectLockFree(const Options &options=Options())


rtt
Author(s): RTT Developers
autogenerated on Tue Jun 25 2019 19:33:23