data_lockfree.hpp
Go to the documentation of this file.
00001 #ifndef ROSRTT_DATA_LOCK_FREE_HPP
00002 #define ROSRTT_DATA_LOCK_FREE_HPP
00003 
00004 #include "oro_arch.h"
00005 
00006 namespace hpcl_rtt
00007 {
00008 template<class T>
00009 class DataObjectLockFree
00010 {
00011 public:
00015   typedef T DataType;
00016   typedef typename boost::shared_ptr<DataObjectLockFree<T> > shared_ptr;
00017 
00023   const unsigned int MAX_THREADS; // = 2
00024 private:
00028   const unsigned int BUF_LEN; // = MAX_THREADS+2
00029 
00037   struct DataBuf 
00038   {
00039     DataBuf() : data(), counter(), next()
00040     {
00041       oro_atomic_set(&counter, 0);
00042     }
00043     DataType data; mutable oro_atomic_t counter; DataBuf* next;
00044   };
00045 
00046   typedef DataBuf* volatile VolPtrType;
00047   typedef DataBuf  ValueType;
00048   typedef DataBuf* PtrType;
00049 
00050   VolPtrType read_ptr;
00051   VolPtrType write_ptr;
00052 
00056   DataBuf* data;
00057 public:
00058 
00065   DataObjectLockFree( const T& initial_value = T(), unsigned int max_threads = 2 )
00066       : MAX_THREADS(max_threads), BUF_LEN( max_threads + 2),
00067         read_ptr(0),
00068         write_ptr(0)
00069   {
00070         data = new DataBuf[BUF_LEN];
00071         read_ptr = &data[0];
00072         write_ptr = &data[1];
00073     data_sample(initial_value);
00074   }
00075 
00076   ~DataObjectLockFree() 
00077   {
00078     delete[] data;
00079   }
00080 
00088   virtual DataType Get() const {DataType cache; Get(cache); return cache; }
00089 
00097   virtual void Get( DataType& pull ) const
00098   {
00099     PtrType reading;
00100     // loop to combine Read/Modify of counter
00101     // This avoids a race condition where read_ptr
00102     // could become write_ptr ( then we would read corrupted data).
00103     do {
00104       reading = read_ptr;            // copy buffer location
00105       oro_atomic_inc(&reading->counter); // lock buffer, no more writes
00106       // XXX smp_mb
00107       if ( reading != read_ptr )     // if read_ptr changed,
00108         oro_atomic_dec(&reading->counter); // better to start over.
00109       else
00110         break;
00111     } while ( true );
00112     // from here on we are sure that 'reading'
00113     // is a valid buffer to read from.
00114     pull = reading->data;               // takes some time
00115     // XXX smp_mb
00116     oro_atomic_dec(&reading->counter);       // release buffer
00117   }
00118 
00124   virtual void Set( const DataType& push )
00125   {
00134     // writeout in any case
00135     write_ptr->data = push;
00136     PtrType wrote_ptr = write_ptr;
00137     // if next field is occupied (by read_ptr or counter),
00138     // go to next and check again...
00139     while ( oro_atomic_read( &write_ptr->next->counter ) != 0 || write_ptr->next == read_ptr )
00140     {
00141       write_ptr = write_ptr->next;
00142       if (write_ptr == wrote_ptr)
00143         return; // nothing found, to many readers !
00144     }
00145 
00146     // we will be able to move, so replace read_ptr
00147     read_ptr  = wrote_ptr;
00148     write_ptr = write_ptr->next; // we checked this in the while loop
00149   }
00150 
00151   virtual void data_sample( const DataType& sample ) 
00152   {
00153     // prepare the buffer.
00154     for (unsigned int i = 0; i < BUF_LEN-1; ++i) {
00155       data[i].data = sample;
00156       data[i].next = &data[i+1];
00157     }
00158     data[BUF_LEN-1].data = sample;
00159     data[BUF_LEN-1].next = &data[0];
00160   }
00161 }; 
00162 
00163 }
00164 #endif


hpcl_rtt
Author(s): sukha
autogenerated on Thu Aug 27 2015 16:43:53