00001 #ifndef ROSRTT_DATA_LOCK_FREE_HPP 00002 #define ROSRTT_DATA_LOCK_FREE_HPP 00003 00004 #include "oro_arch.h" 00005 00006 namespace hpcl_rtt 00007 { 00008 template<class T> 00009 class DataObjectLockFree 00010 { 00011 public: 00015 typedef T DataType; 00016 typedef typename boost::shared_ptr<DataObjectLockFree<T> > shared_ptr; 00017 00023 const unsigned int MAX_THREADS; // = 2 00024 private: 00028 const unsigned int BUF_LEN; // = MAX_THREADS+2 00029 00037 struct DataBuf 00038 { 00039 DataBuf() : data(), counter(), next() 00040 { 00041 oro_atomic_set(&counter, 0); 00042 } 00043 DataType data; mutable oro_atomic_t counter; DataBuf* next; 00044 }; 00045 00046 typedef DataBuf* volatile VolPtrType; 00047 typedef DataBuf ValueType; 00048 typedef DataBuf* PtrType; 00049 00050 VolPtrType read_ptr; 00051 VolPtrType write_ptr; 00052 00056 DataBuf* data; 00057 public: 00058 00065 DataObjectLockFree( const T& initial_value = T(), unsigned int max_threads = 2 ) 00066 : MAX_THREADS(max_threads), BUF_LEN( max_threads + 2), 00067 read_ptr(0), 00068 write_ptr(0) 00069 { 00070 data = new DataBuf[BUF_LEN]; 00071 read_ptr = &data[0]; 00072 write_ptr = &data[1]; 00073 data_sample(initial_value); 00074 } 00075 00076 ~DataObjectLockFree() 00077 { 00078 delete[] data; 00079 } 00080 00088 virtual DataType Get() const {DataType cache; Get(cache); return cache; } 00089 00097 virtual void Get( DataType& pull ) const 00098 { 00099 PtrType reading; 00100 // loop to combine Read/Modify of counter 00101 // This avoids a race condition where read_ptr 00102 // could become write_ptr ( then we would read corrupted data). 00103 do { 00104 reading = read_ptr; // copy buffer location 00105 oro_atomic_inc(&reading->counter); // lock buffer, no more writes 00106 // XXX smp_mb 00107 if ( reading != read_ptr ) // if read_ptr changed, 00108 oro_atomic_dec(&reading->counter); // better to start over. 00109 else 00110 break; 00111 } while ( true ); 00112 // from here on we are sure that 'reading' 00113 // is a valid buffer to read from. 00114 pull = reading->data; // takes some time 00115 // XXX smp_mb 00116 oro_atomic_dec(&reading->counter); // release buffer 00117 } 00118 00124 virtual void Set( const DataType& push ) 00125 { 00134 // writeout in any case 00135 write_ptr->data = push; 00136 PtrType wrote_ptr = write_ptr; 00137 // if next field is occupied (by read_ptr or counter), 00138 // go to next and check again... 00139 while ( oro_atomic_read( &write_ptr->next->counter ) != 0 || write_ptr->next == read_ptr ) 00140 { 00141 write_ptr = write_ptr->next; 00142 if (write_ptr == wrote_ptr) 00143 return; // nothing found, to many readers ! 00144 } 00145 00146 // we will be able to move, so replace read_ptr 00147 read_ptr = wrote_ptr; 00148 write_ptr = write_ptr->next; // we checked this in the while loop 00149 } 00150 00151 virtual void data_sample( const DataType& sample ) 00152 { 00153 // prepare the buffer. 00154 for (unsigned int i = 0; i < BUF_LEN-1; ++i) { 00155 data[i].data = sample; 00156 data[i].next = &data[i+1]; 00157 } 00158 data[BUF_LEN-1].data = sample; 00159 data[BUF_LEN-1].next = &data[0]; 00160 } 00161 }; 00162 00163 } 00164 #endif