00001 /* 00002 * data_lockfree.hpp - micros lock-free structure 00003 * Copyright (C) 2015 Zaile Jiang 00004 * 00005 * This program is free software; you can redistribute it and/or 00006 * modify it under the terms of the GNU General Public License 00007 * as published by the Free Software Foundation; either version 2 00008 * of the License, or (at your option) any later version. 00009 * 00010 * This program is distributed in the hope that it will be useful, 00011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00013 * GNU General Public License for more details. 00014 * 00015 * You should have received a copy of the GNU General Public License 00016 * along with this program; if not, write to the Free Software 00017 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 00018 */ 00019 #ifndef MICROSRTT_DATA_LOCK_FREE_HPP 00020 #define MICROSRTT_DATA_LOCK_FREE_HPP 00021 00022 #include "micros_rtt/oro/oro_arch.h" 00023 00024 namespace micros_rtt 00025 { 00026 template<class T> 00027 class DataObjectLockFree 00028 { 00029 public: 00033 typedef T DataType; 00034 typedef typename boost::shared_ptr<DataObjectLockFree<T> > shared_ptr; 00035 00041 const unsigned int MAX_THREADS; // = 2 00042 private: 00046 const unsigned int BUF_LEN; // = MAX_THREADS+2 00047 00055 struct DataBuf 00056 { 00057 DataBuf() : data(), counter(), next() 00058 { 00059 oro_atomic_set(&counter, 0); 00060 } 00061 DataType data; mutable oro_atomic_t counter; DataBuf* next; 00062 }; 00063 00064 typedef DataBuf* volatile VolPtrType; 00065 typedef DataBuf ValueType; 00066 typedef DataBuf* PtrType; 00067 00068 VolPtrType read_ptr; 00069 VolPtrType write_ptr; 00070 00074 DataBuf* data; 00075 public: 00076 00083 DataObjectLockFree( const T& initial_value = T(), unsigned int max_threads = 2 ) 00084 : MAX_THREADS(max_threads), BUF_LEN( max_threads + 2), 00085 read_ptr(0), 00086 write_ptr(0) 00087 { 00088 data = new DataBuf[BUF_LEN]; 00089 read_ptr = &data[0]; 00090 write_ptr = &data[1]; 00091 data_sample(initial_value); 00092 } 00093 00094 ~DataObjectLockFree() 00095 { 00096 delete[] data; 00097 } 00098 00106 virtual DataType Get() const {DataType cache; Get(cache); return cache; } 00107 00115 virtual void Get( DataType& pull ) const 00116 { 00117 PtrType reading; 00118 // loop to combine Read/Modify of counter 00119 // This avoids a race condition where read_ptr 00120 // could become write_ptr ( then we would read corrupted data). 00121 do { 00122 reading = read_ptr; // copy buffer location 00123 oro_atomic_inc(&reading->counter); // lock buffer, no more writes 00124 // XXX smp_mb 00125 if ( reading != read_ptr ) // if read_ptr changed, 00126 oro_atomic_dec(&reading->counter); // better to start over. 00127 else 00128 break; 00129 } while ( true ); 00130 // from here on we are sure that 'reading' 00131 // is a valid buffer to read from. 00132 pull = reading->data; // takes some time 00133 // XXX smp_mb 00134 oro_atomic_dec(&reading->counter); // release buffer 00135 } 00136 00142 virtual void Set( const DataType& push ) 00143 { 00152 // writeout in any case 00153 write_ptr->data = push; 00154 PtrType wrote_ptr = write_ptr; 00155 // if next field is occupied (by read_ptr or counter), 00156 // go to next and check again... 00157 while ( oro_atomic_read( &write_ptr->next->counter ) != 0 || write_ptr->next == read_ptr ) 00158 { 00159 write_ptr = write_ptr->next; 00160 if (write_ptr == wrote_ptr) 00161 return; // nothing found, to many readers ! 00162 } 00163 00164 // we will be able to move, so replace read_ptr 00165 read_ptr = wrote_ptr; 00166 write_ptr = write_ptr->next; // we checked this in the while loop 00167 } 00168 00169 virtual void data_sample( const DataType& sample ) 00170 { 00171 // prepare the buffer. 00172 for (unsigned int i = 0; i < BUF_LEN-1; ++i) { 00173 data[i].data = sample; 00174 data[i].next = &data[i+1]; 00175 } 00176 data[BUF_LEN-1].data = sample; 00177 data[BUF_LEN-1].next = &data[0]; 00178 } 00179 }; 00180 00181 } 00182 #endif