BufferLockFree.hpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: Peter Soetens Thu Jan 13 10:24:51 CET 2005 BufferLockFree.hpp
3 
4  BufferLockFree.hpp - description
5  -------------------
6  begin : Thu January 13 2005
7  copyright : (C) 2005 Peter Soetens
8  email : peter.soetens@mech.kuleuven.ac.be
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 #ifndef ORO_BUFFER_LOCK_FREE_HPP
39 #define ORO_BUFFER_LOCK_FREE_HPP
40 
41 #include "../os/oro_arch.h"
42 #include "../os/Atomic.hpp"
43 #include "../os/CAS.hpp"
44 #include "BufferInterface.hpp"
45 #include "../internal/AtomicMWSRQueue.hpp"
46 #include "../internal/AtomicMWMRQueue.hpp"
47 #include "../internal/TsPool.hpp"
48 #include <vector>
49 
50 #ifdef ORO_PRAGMA_INTERFACE
51 #pragma interface
52 #endif
53 
54 namespace RTT
55 { namespace base {
56 
57 
69  template<class T>
71  : public BufferInterface<T>
72  {
73  public:
74  typedef typename BufferBase::Options Options;
78  typedef T value_t;
79 
83  const unsigned int MAX_THREADS;
84 
85  private:
86  typedef value_t Item;
87  const bool mcircular;
89 
92 
94 
95  public:
100  BufferLockFree( unsigned int bufsize, const Options &options = Options() )
101  : MAX_THREADS(options.max_threads())
102  , mcircular(options.circular()), initialized(false)
103  , bufs((!options.circular() && !options.multiple_readers()) ?
104  static_cast<internal::AtomicQueue<Item*> *>(new internal::AtomicMWSRQueue<Item*>(bufsize)) :
105  static_cast<internal::AtomicQueue<Item*> *>(new internal::AtomicMWMRQueue<Item*>(bufsize)))
106  , mpool(new internal::TsPool<Item>(bufsize + options.max_threads()))
107  , droppedSamples(0)
108  {
109  }
110 
116  BufferLockFree( unsigned int bufsize, param_t initial_value, const Options &options = Options() )
117  : MAX_THREADS(options.max_threads())
118  , mcircular(options.circular()), initialized(false)
119  , bufs((!options.circular() && !options.multiple_readers()) ?
120  static_cast<internal::AtomicQueue<Item*> *>(new internal::AtomicMWSRQueue<Item*>(bufsize)) :
121  static_cast<internal::AtomicQueue<Item*> *>(new internal::AtomicMWMRQueue<Item*>(bufsize)))
122  , mpool(new internal::TsPool<Item>(bufsize + options.max_threads()))
123  , droppedSamples(0)
124  {
125  data_sample( initial_value );
126  }
127 
129  // free all items still in the buffer.
130  clear();
131 
132  delete mpool;
133  delete bufs;
134  }
135 
136  virtual bool data_sample( param_t sample, bool reset = true )
137  {
138  if (!initialized || reset) {
139  mpool->data_sample(sample);
140  initialized = true;
141  return true;
142  } else {
143  return initialized;
144  }
145 
146  }
147 
148  virtual value_t data_sample() const
149  {
150  value_t result = value_t();
151  Item* mitem = mpool->allocate();
152  if (mitem != 0) {
153  result = *mitem;
154  mpool->deallocate( mitem );
155  }
156  return result;
157  }
158 
159 
160  size_type capacity() const
161  {
162  return bufs->capacity();
163  }
164 
165  size_type size() const
166  {
167  return bufs->size();
168  }
169 
170  bool empty() const
171  {
172  return bufs->isEmpty();
173  }
174 
175  bool full() const
176  {
177  return bufs->isFull();
178  }
179 
180  void clear()
181  {
182  Item* item;
183  while ( bufs->dequeue(item) )
184  mpool->deallocate( item );
185  }
186 
187  virtual size_type dropped() const
188  {
189  return droppedSamples.read();
190  }
191 
192  bool Push( param_t item)
193  {
194  if (!mcircular && ( capacity() == (size_type)bufs->size() )) {
195  droppedSamples.inc();
196  return false;
197  // we will recover below in case of circular
198  }
199  Item* mitem = mpool->allocate();
200  if ( mitem == 0 ) { // queue full ( rare but possible in race with PopWithoutRelease )
201  if (!mcircular) {
202  droppedSamples.inc();
203  return false;
204  }
205  else {
206  if (bufs->dequeue( mitem ) == false ) {
207  droppedSamples.inc();
208  return false; // assert(false) ???
209  }
210  // we keep mitem to write item to next
211  }
212  }
213 
214  // copy over.
215  *mitem = item;
216  if (bufs->enqueue( mitem ) == false ) {
217  //got memory, but buffer is full
218  //this can happen, as the memory pool is
219  //bigger than the buffer
220  if (!mcircular) {
221  mpool->deallocate( mitem );
222  droppedSamples.inc();
223  return false;
224  } else {
225  // pop & deallocate until we have free space.
226  Item* itmp = 0;
227  do {
228  if ( bufs->dequeue( itmp ) ) {
229  mpool->deallocate( itmp );
230  droppedSamples.inc();
231  } else {
232  // Both operations, enqueue() and dequeue() failed on the buffer:
233  // We could free the allocated pool item return false here,
234  // but in fact this can only happen during massive concurrent
235  // access to the circular buffer or in the trivial case that
236  // the buffer size is zero. So just keep on trying...
237  }
238  } while ( bufs->enqueue( mitem ) == false );
239  }
240  }
241  return true;
242  }
243 
244  size_type Push(const std::vector<value_t>& items)
245  {
246  // @todo Make this function more efficient as in BufferLocked.
247  int towrite = items.size();
248  size_type written = 0;
249  typename std::vector<value_t>::const_iterator it;
250  for( it = items.begin(); it != items.end(); ++it) {
251  if ( this->Push( *it ) == false ) {
252  break; // will only happen in non-circular case !
253  }
254  written++;
255  }
256  droppedSamples.add(towrite - written);
257  return written;
258  }
259 
260 
261  FlowStatus Pop( reference_t item )
262  {
263  Item* ipop;
264  if (bufs->dequeue( ipop ) == false )
265  return NoData;
266  item = *ipop;
267  if (mpool->deallocate( ipop ) == false )
268  assert(false);
269  return NewData;
270  }
271 
272  size_type Pop(std::vector<value_t>& items )
273  {
274  Item* ipop;
275  items.clear();
276  while( bufs->dequeue(ipop) ) {
277  items.push_back( *ipop );
278  if (mpool->deallocate(ipop) == false)
279  assert(false);
280  }
281  return items.size();
282  }
283 
284  value_t* PopWithoutRelease()
285  {
286  Item* ipop;
287  if (bufs->dequeue( ipop ) == false )
288  return 0;
289  return ipop;
290  }
291 
292  void Release(value_t *item)
293  {
294  if (mpool->deallocate( item ) == false )
295  assert(false);
296  }
297  };
298 }}
299 
300 #endif
void data_sample(const T &sample)
Definition: TsPool.hpp:153
size_type Push(const std::vector< value_t > &items)
boost::call_traits< T >::reference reference_t
virtual bool data_sample(param_t sample, bool reset=true)
FlowStatus
Definition: FlowStatus.hpp:56
virtual size_type capacity() const =0
boost::call_traits< T >::param_type param_t
value_t * allocate()
Definition: TsPool.hpp:159
size_type Pop(std::vector< value_t > &items)
internal::TsPool< Item > *const mpool
FlowStatus Pop(reference_t item)
virtual bool isEmpty() const =0
virtual bool dequeue(T &result)=0
BufferBase::size_type size_type
internal::AtomicQueue< Item * > *const bufs
BufferInterface< T >::size_type size_type
virtual bool enqueue(const T &value)=0
RTT::os::AtomicInt droppedSamples
BufferLockFree(unsigned int bufsize, param_t initial_value, const Options &options=Options())
const unsigned int MAX_THREADS
The maximum number of threads.
bool deallocate(T *Value)
Definition: TsPool.hpp:179
BufferLockFree(unsigned int bufsize, const Options &options=Options())
virtual value_t data_sample() const
BufferBase::Options Options
int read() const
Definition: Atomic.hpp:77
virtual size_type size() const =0
size_type capacity() const
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:53
BufferInterface< T >::param_t param_t
void add(int i)
Definition: Atomic.hpp:84
void Release(value_t *item)
virtual bool isFull() const =0
BufferInterface< T >::reference_t reference_t
virtual size_type dropped() const


rtt
Author(s): RTT Developers
autogenerated on Tue Jun 25 2019 19:33:20