buffers_test.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: Peter Soetens Mon Jan 10 15:59:51 CET 2005 buffers_test.cpp
3 
4  buffers_test.cpp - description
5  -------------------
6  begin : Mon January 10 2005
7  copyright : (C) 2005 Peter Soetens
8  email : peter.soetens@mech.kuleuven.ac.be
9 
10  ***************************************************************************
11  * *
12  * This program is free software; you can redistribute it and/or modify *
13  * it under the terms of the GNU General Public License as published by *
14  * the Free Software Foundation; either version 2 of the License, or *
15  * (at your option) any later version. *
16  * *
17  ***************************************************************************/
18 
19 #include "unit.hpp"
20 
21 #include <iostream>
22 #include <boost/scoped_ptr.hpp>
23 
24 #include <internal/AtomicQueue.hpp>
26 
27 #include <Activity.hpp>
28 
29 #include <RTT.hpp>
30 #include <base/Buffer.hpp>
32 #include <base/DataObject.hpp>
33 #include <internal/TsPool.hpp>
34 //#include <internal/SortedList.hpp>
35 
36 #include <os/Thread.hpp>
37 #include <rtt-config.h>
38 
39 #include <boost/foreach.hpp>
40 #include <boost/lexical_cast.hpp>
41 
42 using namespace std;
43 using namespace RTT;
44 using namespace RTT::detail;
45 
46 class Dummy {
47 public:
48  Dummy(double a = 0.0, double b =1.0, double c=2.0)
49  :d1(a), d2(b), d3(c) {}
50  double d1;
51  double d2;
52  double d3;
53  bool operator==(const Dummy& d) const
54  {
55  return d.d1 == d1 && d.d2 == d2 && d.d3 == d3;
56  }
57 
58  bool operator!=(const Dummy& d) const
59  {
60  return d.d1 != d1 || d.d2 != d2 || d.d3 != d3;
61  }
62 
63  bool operator<(const Dummy& d) const
64  {
65  return d1+d2+d3 < d.d1 + d.d2 + d.d3;
66  }
67 /*
68  volatile Dummy& operator=(const Dummy& d) volatile
69  {
70  d1 = d.d1;
71  d2 = d.d2;
72  d3 = d.d3;
73  return *this;
74  }
75 */
76 };
77 
78 
81 
82 // Don't make queue size too large, we want to catch
83 // overrun issues too.
84 #define QS 10
85 
87 {
88 public:
92 
94  {
95  aqueue = new MWMRQueueType(QS);
96  }
98  aqueue->clear();
99  delete aqueue;
100  }
101 };
102 
104 {
105 public:
107 
109  {
110  aqueue = new MWSRQueueType(QS);
111  }
113  aqueue->clear();
114  delete aqueue;
115  }
116 };
117 
118 template <class Worker>
119 class ThreadPool : public std::vector< std::pair< boost::shared_ptr<Worker>, boost::shared_ptr<ThreadInterface> > >
120 {
121 public:
122  typedef std::vector< std::pair< boost::shared_ptr<Worker>, boost::shared_ptr<ThreadInterface> > > Threads;
123  typedef typename Threads::value_type value_type;
124  typedef typename Threads::iterator iterator;
125  typedef typename Threads::const_iterator const_iterator;
126 
127  template <typename Arg1>
128  ThreadPool(int threads, int scheduler, int priority, Seconds period, const std::string &name, const Arg1 &arg1)
129  : Threads(threads)
130  {
131  int count = 0;
132  for(iterator worker = this->begin(); worker != this->end(); ++worker) {
133  worker->first.reset(new Worker(arg1));
134  worker->second.reset(new Activity(scheduler, priority, period, worker->first.get(), name + boost::lexical_cast<std::string>(count++)));
135  }
136  }
137 
139  {
140  stop();
141  for(iterator worker = this->begin(); worker != this->end(); ++worker) {
142  worker->second.reset();
143  worker->first.reset();
144  }
145  }
146 
147  bool start()
148  {
149  bool result = true;
150  for(const_iterator worker = this->begin(); worker != this->end(); ++worker) {
151  if (!worker->second->start()) result = false;
152  }
153  return result;
154  }
155 
156  bool stop()
157  {
158  bool result = true;
159  for(const_iterator worker = this->begin(); worker != this->end(); ++worker) {
160  worker->second->stop();
161  }
162  return result;
163  }
164 };
165 
167 {
168 public:
169  BufferInterface<Dummy>* buffer;
170  BufferInterface<Dummy>* circular;
172 
173  BufferLockFree<Dummy>* lockfree;
175  BufferUnSync<Dummy>* unsync;
176 
177  BufferLockFree<Dummy>* clockfree;
179  BufferUnSync<Dummy>* cunsync;
180 
182  DataObjectLockFree<Dummy>* dlockfree;
184 
185  void testBuf();
186  void testCirc();
187  void testDObj();
188 
189  typedef std::map<FlowStatus, int> ReadsByStatusMap;
190 
191  void testBufMultiThreaded(int number_of_writers, int number_of_readers);
192  void testDObjMultiThreaded(int number_of_writers, int number_of_readers);
193 
195  {
196  // classical variants
197  lockfree = new BufferLockFree<Dummy>(QS, Dummy());
198  locked = new BufferLocked<Dummy>(QS, Dummy());
199  unsync = new BufferUnSync<Dummy>(QS, Dummy());
200 
201  // circular variants.
202  clockfree = new BufferLockFree<Dummy>(QS, Dummy(), BufferBase::Options().circular(true));
203  clocked = new BufferLocked<Dummy>(QS, Dummy(), BufferBase::Options().circular(true));
204  cunsync = new BufferUnSync<Dummy>(QS, Dummy(), BufferBase::Options().circular(true));
205 
206  dlockfree = new DataObjectLockFree<Dummy>(Dummy());
207  dlocked = new DataObjectLocked<Dummy>(Dummy());
208  dunsync = new DataObjectUnSync<Dummy>(Dummy());
209 
210  // defaults
211  buffer = lockfree;
212  dataobj = dlockfree;
213  }
214 
216  delete lockfree;
217  delete locked;
218  delete unsync;
219  delete clockfree;
220  delete clocked;
221  delete cunsync;
222  delete dlockfree;
223  delete dlocked;
224  delete dunsync;
225  }
226 
227  class DataObjectWriter : public RunnableInterface {
228  private:
229  DataObjectInterface<Dummy> *dataobj;
230  bool stop;
232 
233  public:
234  int writes;
235  int dropped;
236 
237  public:
238  DataObjectWriter(DataObjectInterface<Dummy> *dataobj) : dataobj(dataobj), stop(false), writes(0), dropped(0) {}
239  bool initialize() {
240  stop = false;
241  return true;
242  }
243  void step() {
244  while (stop == false) {
245  if (dataobj->Set(sample)) {
246  ++writes;
247  } else {
248  ++dropped;
249  }
250  }
251  }
252 
253  void finalize() {}
254 
255  bool breakLoop() {
256  stop = true;
257  return true;
258  }
259  };
260 
261  class DataObjectReader : public RunnableInterface {
262  private:
263  DataObjectInterface<Dummy> *dataobj;
264  bool stop;
266 
267  public:
268  int reads;
269  ReadsByStatusMap reads_by_status;
270 
271  public:
272  DataObjectReader(DataObjectInterface<Dummy> *dataobj) : dataobj(dataobj), stop(false), reads(0) {}
273  bool initialize() {
274  stop = false;
275  return true;
276  }
277  void step() {
278  while (stop == false) {
279  FlowStatus fs = dataobj->Get(sample, false);
280  ++reads;
281  ++reads_by_status[fs];
282  }
283  }
284 
285  void finalize() {}
286 
287  bool breakLoop() {
288  stop = true;
289  return true;
290  }
291  };
292 
293  class BufferWriter : public RunnableInterface {
294  private:
295  BufferInterface<Dummy> *buffer;
296  bool stop;
298 
299  public:
300  int writes;
301  int dropped;
302 
303  public:
304  BufferWriter(BufferInterface<Dummy> *buffer) : buffer(buffer), stop(false), writes(0), dropped(0) {}
305  bool initialize() {
306  stop = false;
307  return true;
308  }
309  void step() {
310  while (stop == false) {
311  if (buffer->Push(sample)) {
312  ++writes;
313  } else {
314  ++dropped;
315  }
316  }
317  }
318 
319  void finalize() {}
320 
321  bool breakLoop() {
322  stop = true;
323  return true;
324  }
325  };
326 
327  class BufferReader : public RunnableInterface {
328  private:
329  BufferInterface<Dummy> *buffer;
330  bool stop;
332 
333  public:
334  int reads;
335  ReadsByStatusMap reads_by_status;
336 
337  public:
338  BufferReader(BufferInterface<Dummy> *buffer) : buffer(buffer), stop(false), reads(0) {}
339  bool initialize() {
340  stop = false;
341  return true;
342  }
343  void step() {
344  while (stop == false) {
345  FlowStatus fs = buffer->Pop(sample);
346  ++reads;
347  ++reads_by_status[fs];
348  }
349  }
350 
351  void finalize() {}
352 
353  bool breakLoop() {
354  stop = true;
355  return true;
356  }
357  };
358 };
359 
361 {
367  Dummy* d = new Dummy;
368  Dummy* c = new Dummy(2.0, 1.0, 0.0);
369  Dummy r;
370 
371  BOOST_CHECK( buffer->Pop(r) == false );
372 
373  BOOST_CHECK( buffer->Push( *d ) );
374  BOOST_CHECK( buffer->Pop(r) );
375  BOOST_CHECK( r == *d );
376 
377  BOOST_CHECK( buffer->Push( *c ) );
378  BOOST_CHECK( buffer->Pop(r) );
379  BOOST_CHECK( r == *c );
380 
381  BOOST_CHECK( buffer->Push( *d ) );
382  BOOST_CHECK( buffer->Push( *c ) );
383  BOOST_CHECK( buffer->Push( *d ) );
384  BOOST_CHECK( buffer->Push( *c ) );
385  BOOST_CHECK( buffer->Push( *d ) );
386  BOOST_CHECK( buffer->Push( *c ) );
387  BOOST_CHECK( buffer->Push( *d ) );
388  BOOST_CHECK( buffer->Push( *c ) );
389  BOOST_CHECK( buffer->Push( *d ) );
390  BOOST_CHECK( buffer->Push( *c ) );
391  BOOST_CHECK( buffer->Push( *c ) == false );
392  BOOST_CHECK( buffer->Push( *c ) == false );
393  BOOST_CHECK( buffer->Push( *c ) == false );
394  BOOST_CHECK( buffer->Push( *c ) == false );
395  BOOST_CHECK( buffer->Push( *c ) == false );
396  BOOST_CHECK( buffer->Push( *c ) == false );
397  BOOST_CHECK( buffer->Push( *c ) == false );
398  BOOST_CHECK( buffer->Push( *c ) == false );
399  BOOST_CHECK( buffer->Push( *c ) == false );
400  BOOST_CHECK( buffer->Push( *c ) == false );
401  BOOST_CHECK( buffer->Push( *c ) == false );
402  BOOST_CHECK( buffer->Push( *c ) == false );
403  BOOST_CHECK( buffer->Pop(r) );
404  BOOST_CHECK( r == *d );
405  BOOST_CHECK( buffer->Pop(r) );
406  BOOST_CHECK( r == *c );
407  BOOST_CHECK( buffer->Pop(r) );
408  BOOST_CHECK( r == *d );
409  BOOST_CHECK( buffer->Pop(r) );
410  BOOST_CHECK( r == *c );
411  BOOST_CHECK( buffer->Pop(r) );
412  BOOST_CHECK( r == *d );
413 
414  // start writing again half-way
415  BOOST_CHECK( buffer->Push( *d ) );
416  BOOST_CHECK( buffer->Push( *c ) );
417  BOOST_CHECK( buffer->Push( *d ) );
418  BOOST_CHECK( buffer->Push( *c ) );
419  BOOST_CHECK( buffer->Push( *d ) );
420 
421  BOOST_CHECK( buffer->Pop(r) );
422  BOOST_CHECK( r == *c );
423  BOOST_CHECK( buffer->Pop(r) );
424  BOOST_CHECK( r == *d );
425  BOOST_CHECK( buffer->Pop(r) );
426  BOOST_CHECK( r == *c );
427  BOOST_CHECK( buffer->Pop(r) );
428  BOOST_CHECK( r == *d );
429  BOOST_CHECK( buffer->Pop(r) );
430  BOOST_CHECK( r == *c );
431  BOOST_CHECK( buffer->Pop(r) );
432  BOOST_CHECK( r == *d );
433  BOOST_CHECK( buffer->Pop(r) );
434  BOOST_CHECK( r == *c );
435  BOOST_CHECK( buffer->Pop(r) );
436  BOOST_CHECK( r == *d );
437  BOOST_CHECK( buffer->Pop(r) );
438  BOOST_CHECK( r == *c );
439  BOOST_CHECK( buffer->Pop(r) );
440  BOOST_CHECK( r == *d );
441 
442  BOOST_CHECK( buffer->Pop(r) == false );
443  BOOST_CHECK( buffer->Pop(r) == false );
444  BOOST_CHECK( buffer->Pop(r) == false );
445  BOOST_CHECK( buffer->Pop(r) == false );
446  BOOST_CHECK( buffer->Pop(r) == false );
447  BOOST_CHECK( buffer->Pop(r) == false );
448  BOOST_CHECK( buffer->Pop(r) == false );
449  BOOST_CHECK( buffer->Pop(r) == false );
450  BOOST_CHECK( buffer->Pop(r) == false );
451  BOOST_CHECK( buffer->Pop(r) == false );
452  BOOST_CHECK( buffer->Pop(r) == false );
453 
454  BOOST_CHECK( buffer->Push( *c ) );
455  BOOST_CHECK( buffer->Push( *d ) );
456  BOOST_CHECK( buffer->Push( *c ) );
457  BOOST_CHECK( buffer->Push( *d ) );
458  BOOST_CHECK( buffer->Push( *c ) );
459 
460  std::vector<Dummy> v;
461  BOOST_CHECK( 5 == buffer->Pop(v) );
462  BOOST_CHECK( v[0] == *c );
463  BOOST_CHECK( v[1] == *d );
464  BOOST_CHECK( v[2] == *c );
465  BOOST_CHECK( v[3] == *d );
466  BOOST_CHECK( v[4] == *c );
467 
468  BufferBase::size_type sz = 10;
469  BOOST_CHECK( buffer->Push( *c ) );
470  BOOST_CHECK( buffer->Push( *d ) );
471  BOOST_CHECK( buffer->Push( v ) == (int)v.size() );
472  BOOST_CHECK( buffer->Push( *c ) );
473  BOOST_CHECK( buffer->Push( *d ) );
474  BOOST_CHECK( buffer->Push( v ) == 1 );
475  BOOST_CHECK( buffer->Push( v ) == 0 );
476  BOOST_CHECK( buffer->Push( v ) == 0 );
477  BOOST_CHECK( buffer->Push( v ) == 0 );
478  BOOST_CHECK( buffer->Push( v ) == 0 );
479  BOOST_CHECK( buffer->Push( v ) == 0 );
480  BOOST_CHECK( buffer->Push( v ) == 0 );
481  BOOST_CHECK( buffer->Push( v ) == 0 );
482  BOOST_CHECK( buffer->Push( v ) == 0 );
483  BOOST_CHECK( buffer->Push( v ) == 0 );
484  BOOST_CHECK( buffer->Push( v ) == 0 );
485  BOOST_CHECK( buffer->Push( v ) == 0 );
486  BOOST_CHECK( buffer->Push( v ) == 0 );
487  BOOST_REQUIRE_EQUAL( sz, buffer->Pop(v) );
488  BOOST_CHECK( v[0] == *c );
489  BOOST_CHECK( v[1] == *d );
490  BOOST_CHECK( v[2] == *c );
491  BOOST_CHECK( v[3] == *d );
492  BOOST_CHECK( v[4] == *c );
493  BOOST_CHECK( v[5] == *d );
494  BOOST_CHECK( v[6] == *c );
495  BOOST_CHECK( v[7] == *c );
496  BOOST_CHECK( v[8] == *d );
497  //BOOST_CHECK( v[9] == *c );
498  BOOST_CHECK( 0 == buffer->Pop(v) );
499  delete d;
500  delete c;
501 }
502 
504 {
510  Dummy* d = new Dummy;
511  Dummy* c = new Dummy(2.0, 1.0, 0.0);
512  Dummy r;
513 
514  BOOST_CHECK( circular->Pop(r) == false );
515 
516  BOOST_CHECK( circular->Push( *d ) );
517  BOOST_CHECK( circular->Pop(r) );
518  BOOST_CHECK( r == *d );
519 
520  BOOST_CHECK( circular->Push( *c ) );
521  BOOST_CHECK( circular->Pop(r) );
522  BOOST_CHECK( r == *c );
523 
524  BOOST_CHECK( circular->Push( *d ) );
525  BOOST_CHECK( circular->Push( *c ) );
526  BOOST_CHECK( circular->Push( *d ) );
527  BOOST_CHECK( circular->Push( *c ) );
528  BOOST_CHECK( circular->Push( *d ) );
529  BOOST_CHECK( circular->Push( *c ) );
530  BOOST_CHECK( circular->Push( *d ) );
531  BOOST_CHECK( circular->Push( *c ) );
532  BOOST_CHECK( circular->Push( *d ) );
533  BOOST_CHECK( circular->Push( *c ) ); // oldest item at end of Push series.
534  BOOST_CHECK( circular->Push( *d ) );
535  BOOST_CHECK( circular->Push( *c ) );
536  BOOST_CHECK( circular->Push( *d ) );
537  BOOST_CHECK( circular->Push( *c ) );
538  BOOST_CHECK( circular->Push( *d ) );
539  BOOST_CHECK( circular->Push( *c ) );
540  BOOST_CHECK( circular->Push( *d ) );
541  BOOST_CHECK( circular->Push( *c ) );
542  BOOST_CHECK( circular->Push( *d ) );
543  BOOST_CHECK( circular->Pop(r) );
544  BOOST_CHECK( r == *c );
545  BOOST_CHECK( circular->Pop(r) );
546  BOOST_CHECK( r == *d );
547  BOOST_CHECK( circular->Pop(r) );
548  BOOST_CHECK( r == *c );
549  BOOST_CHECK( circular->Pop(r) );
550  BOOST_CHECK( r == *d );
551  BOOST_CHECK( circular->Pop(r) );
552  BOOST_CHECK( r == *c );
553 
554  // start writing again half-way
555  BOOST_CHECK( circular->Push( *d ) );
556  BOOST_CHECK( circular->Push( *c ) );
557  BOOST_CHECK( circular->Push( *d ) );
558  BOOST_CHECK( circular->Push( *c ) );
559  BOOST_CHECK( circular->Push( *d ) );
560 
561  BOOST_CHECK( circular->Pop(r) );
562  BOOST_CHECK( r == *d );
563  BOOST_CHECK( circular->Pop(r) );
564  BOOST_CHECK( r == *c );
565  BOOST_CHECK( circular->Pop(r) );
566  BOOST_CHECK( r == *d );
567  BOOST_CHECK( circular->Pop(r) );
568  BOOST_CHECK( r == *c );
569  BOOST_CHECK( circular->Pop(r) );
570  BOOST_CHECK( r == *d );
571  BOOST_CHECK( circular->Pop(r) );
572  BOOST_CHECK( r == *d );
573  BOOST_CHECK( circular->Pop(r) );
574  BOOST_CHECK( r == *c );
575  BOOST_CHECK( circular->Pop(r) );
576  BOOST_CHECK( r == *d );
577  BOOST_CHECK( circular->Pop(r) );
578  BOOST_CHECK( r == *c );
579  BOOST_CHECK( circular->Pop(r) );
580  BOOST_CHECK( r == *d );
581 
582  BOOST_CHECK( circular->Pop(r) == false );
583  BOOST_CHECK( circular->Pop(r) == false );
584  BOOST_CHECK( circular->Pop(r) == false );
585  BOOST_CHECK( circular->Pop(r) == false );
586  BOOST_CHECK( circular->Pop(r) == false );
587  BOOST_CHECK( circular->Pop(r) == false );
588  BOOST_CHECK( circular->Pop(r) == false );
589  BOOST_CHECK( circular->Pop(r) == false );
590  BOOST_CHECK( circular->Pop(r) == false );
591  BOOST_CHECK( circular->Pop(r) == false );
592  BOOST_CHECK( circular->Pop(r) == false );
593 
594  BOOST_CHECK( circular->Push( *c ) );
595  BOOST_CHECK( circular->Push( *d ) );
596  BOOST_CHECK( circular->Push( *c ) );
597  BOOST_CHECK( circular->Push( *d ) );
598  BOOST_CHECK( circular->Push( *c ) );
599 
600  std::vector<Dummy> v;
601  BOOST_CHECK( 5 == circular->Pop(v) );
602  BOOST_CHECK( v[0] == *c );
603  BOOST_CHECK( v[1] == *d );
604  BOOST_CHECK( v[2] == *c );
605  BOOST_CHECK( v[3] == *d );
606  BOOST_CHECK( v[4] == *c );
607 
608  BufferBase::size_type sz = 10;
609  BOOST_CHECK( circular->Push( *c ) );
610  BOOST_CHECK( circular->Push( *d ) );
611  BOOST_CHECK( circular->Push( v ) == (int)v.size() );
612  BOOST_CHECK( circular->Push( *c ) );
613  BOOST_CHECK( circular->Push( *d ) );
614  BOOST_CHECK( circular->Push( v ) == (int)v.size() );
615  BOOST_CHECK( circular->Push( v ) == (int)v.size() );
616  BOOST_CHECK( circular->Push( v ) == (int)v.size() );
617  BOOST_CHECK( circular->Push( v ) == (int)v.size() );
618  BOOST_CHECK( circular->Push( v ) == (int)v.size() );
619  BOOST_REQUIRE_EQUAL( sz, circular->Pop(v) );
620  BOOST_CHECK( v[0] == *c );
621  BOOST_CHECK( v[1] == *d );
622  BOOST_CHECK( v[2] == *c );
623  BOOST_CHECK( v[3] == *d );
624  BOOST_CHECK( v[4] == *c );
625  BOOST_CHECK( v[5] == *c );
626  BOOST_CHECK( v[6] == *d );
627  BOOST_CHECK( v[7] == *c );
628  BOOST_CHECK( v[8] == *d );
629  BOOST_CHECK( v[9] == *c );
630  BOOST_CHECK( 0 == circular->Pop(v) );
631  delete d;
632  delete c;
633 }
634 
636 {
637  Dummy* c = new Dummy(2.0, 1.0, 0.0);
638  Dummy d(5.0, 4.0, 3.0);
639 
640  BOOST_REQUIRE_EQUAL( NoData, dataobj->Get(d) );
641  BOOST_REQUIRE_EQUAL( d, Dummy(5.0, 4.0, 3.0) );
642 
643  dataobj->Set( *c );
644  BOOST_REQUIRE_EQUAL( *c, dataobj->Get() );
645  int i = 0;
646  while ( i != 3.5*dlockfree->MAX_THREADS ) {
647  dataobj->Set( *c );
648  dataobj->Set( d );
649  ++i;
650  }
651  BOOST_REQUIRE_EQUAL( d , dataobj->Get() );
652  BOOST_REQUIRE_EQUAL( d , dataobj->Get() );
653 
654  delete c;
655 }
656 
657 void BuffersDataFlowTest::testBufMultiThreaded(int number_of_writers, int number_of_readers)
658 {
659  ThreadPool<BufferWriter> writers(number_of_writers, ORO_SCHED_OTHER, 0, 0, "BufferWriter", buffer);
660  ThreadPool<BufferReader> readers(number_of_readers, ORO_SCHED_OTHER, 0, 0, "BufferReader", buffer);
661 
662  buffer->clear();
663 
664  BOOST_REQUIRE( readers.start() );
665  BOOST_REQUIRE( writers.start() );
666  sleep(5);
667  BOOST_REQUIRE( writers.stop() );
668  BOOST_REQUIRE( readers.stop() );
669 
670  int total_writes = 0, total_dropped = 0, total_reads = 0;
671  std::map<FlowStatus, int> total_reads_by_status;
672  BOOST_FOREACH(ThreadPool<BufferWriter>::value_type &writer, writers) {
673  BOOST_REQUIRE( !writer.second->isRunning() );
674  total_writes += writer.first->writes;
675  total_dropped += writer.first->dropped;
676  BOOST_CHECK_GT(writer.first->writes, 0);
677  }
678  BOOST_FOREACH(ThreadPool<BufferReader>::value_type &reader, readers) {
679  BOOST_REQUIRE( !reader.second->isRunning() );
680  total_reads += reader.first->reads;
681  BOOST_CHECK_GT(reader.first->reads, 0);
682  BOOST_FOREACH(ReadsByStatusMap::value_type &reads_by_status, reader.first->reads_by_status) {
683  total_reads_by_status[reads_by_status.first] += reads_by_status.second;
684  }
685  }
686 
687  if (buffer != circular) {
688  BOOST_CHECK_EQUAL(total_writes, (total_reads_by_status[NewData] + buffer->size()));
689  } else {
690  BOOST_CHECK_GE(total_writes, (total_reads_by_status[NewData] + buffer->size()));
691  }
692 
693  if (writers.size() == 1) {
694  if (buffer != circular) {
695  BOOST_WARN_EQUAL(0, total_dropped);
696  } else {
697  BOOST_CHECK_EQUAL(0, total_dropped);
698  }
699  } else {
700  // Ignore dropped samples in case of multiple writers:
701  // It's normal that some samples will be dropped.
702  }
703 }
704 
705 void BuffersDataFlowTest::testDObjMultiThreaded(int number_of_writers, int number_of_readers)
706 {
707  ThreadPool<DataObjectWriter> writers(number_of_writers, ORO_SCHED_OTHER, 0, 0, "DataObjectWriter", dataobj);
708  ThreadPool<DataObjectReader> readers(number_of_readers, ORO_SCHED_OTHER, 0, 0, "DataObjectReader", dataobj);
709 
710  BOOST_REQUIRE( readers.start() );
711  BOOST_REQUIRE( writers.start() );
712  sleep(5);
713  BOOST_REQUIRE( writers.stop() );
714  BOOST_REQUIRE( readers.stop() );
715 
716  int total_writes = 0, total_dropped = 0, total_reads = 0;
717  std::map<FlowStatus, int> total_reads_by_status;
718  BOOST_FOREACH(ThreadPool<DataObjectWriter>::value_type &writer, writers) {
719  BOOST_REQUIRE( !writer.second->isRunning() );
720  total_writes += writer.first->writes;
721  total_dropped += writer.first->dropped;
722  BOOST_CHECK_GT(writer.first->writes, 0);
723  }
724  BOOST_FOREACH(ThreadPool<DataObjectReader>::value_type &reader, readers) {
725  BOOST_REQUIRE( !reader.second->isRunning() );
726  total_reads += reader.first->reads;
727  BOOST_CHECK_GT(reader.first->reads, 0);
728  BOOST_FOREACH(ReadsByStatusMap::value_type &reads_by_status, reader.first->reads_by_status) {
729  total_reads_by_status[reads_by_status.first] += reads_by_status.second;
730  }
731  }
732 
733  // BOOST_CHECK_EQUAL(total_writes, total_reads_by_status[NewData]);
734  BOOST_CHECK_GE(total_writes, total_reads_by_status[NewData]);
735  if (writers.size() == 1) {
736  BOOST_CHECK_EQUAL(total_dropped, 0);
737  } else {
738  // Ignore dropped samples in case of multiple writers:
739  // It's normal that some samples will be dropped.
740  }
741 }
742 
744 {
745 public:
746 
747  ThreadInterface* athread;
748  ThreadInterface* bthread;
749 
750  TsPool<Dummy>* mpool;
751  TsPool<std::vector<Dummy> >* vpool;
752 
754  {
755  mpool = new TsPool<Dummy>(QS);
756  vpool = new TsPool<std::vector<Dummy> >(QS, std::vector<Dummy>(QS) );
757  }
758 
760  delete mpool;
761  delete vpool;
762  }
763 
764  template <typename T>
766  {
767  private:
768  TsPool<T> *mpool;
769  bool stop;
770  public:
771  int cycles;
772 
773  Worker(TsPool<T> *pool) : mpool(pool), stop(false), cycles(0) {}
774  bool initialize() {
775  stop = false;
776  return true;
777  }
778  void step() {
779  while( stop == false ) {
780  T *item;
781  BOOST_VERIFY( item = mpool->allocate() );
782  getThread()->yield();
783  if (item) BOOST_VERIFY(mpool->deallocate(item));
784  ++cycles;
785  }
786  }
787  bool breakLoop() {
788  stop = true;
789  return true;
790  }
791  void finalize() {}
792  };
793 };
794 
795 std::ostream& operator<<( std::ostream& os, const Dummy& d ) {
796  os << "(" << d.d1 <<","<<d.d2<<","<<d.d3<<")";
797  return os;
798 }
799 void addOne(Dummy& d)
800 {
801  ++d.d1;
802  ++d.d2;
803  ++d.d3;
804 }
805 
806 void subOne(Dummy& d)
807 {
808  --d.d1;
809  --d.d2;
810  --d.d3;
811 }
812 
813 
814 struct LLFWorker : public RunnableInterface
815 {
816  volatile bool stop;
818  T* mlst;
819  int i;
820  int appends;
821  int erases;
822  LLFWorker(T* l ) : stop(false), mlst(l), i(1) {}
823  bool initialize() {
824  stop = false; i = 1;
825  appends = 0; erases = 0;
826  return true;
827  }
828  void step() {
829  while (stop == false ) {
830  //log(Info) << "Appending, i="<<i<<endlog();
831  while ( stop == false && mlst->append( Dummy(i,i,i) ) ) { ++i; ++appends; }
832  //log(Info) << "Erasing, i="<<i<<endlog();
833  while ( mlst->erase( Dummy(i-1,i-1,i-1) ) ) { --i; ++erases; }
834  }
835  //log(Info) << "Stopping, i="<<i<<endlog();
836  }
837 
838  void finalize() {}
839 
840  bool breakLoop() {
841  stop = true;
842  return true;
843  }
844 };
845 
846 struct LLFGrower : public RunnableInterface
847 {
848  volatile bool stop;
850  T* mlst;
851  int i;
852  LLFGrower(T* l ) : stop(false), mlst(l), i(1) {}
853  bool initialize() {
854  stop = false; i = 1;
855  return true;
856  }
857  void step() {
858  // stress growing of list during append/erase.
859  while (stop == false && i < 2500 ) {
860  // reserve is quite slow.
861  mlst->reserve(i);
862  ++i;
863  }
864  }
865 
866  void finalize() {}
867 
868  bool breakLoop() {
869  stop = true;
870  return true;
871  }
872 };
873 
877 template<class T>
878 struct AQWorker : public RunnableInterface
879 {
880  static os::Mutex m;
881  bool stop;
882  T* mlst;
883  int appends;
884  int erases;
886  AQWorker(T* l ) : stop(false), mlst(l),appends(0), erases(0) {
887  orig = new Dummy( 1,2,3);
888  }
890  delete orig;
891  }
892  bool initialize() {
893  stop = false;
894  return true;
895  }
896  void step() {
897  Dummy* d = orig;
898  while (stop == false ) {
899  //log(Info) << "Appending, i="<<i<<endlog();
900  if ( mlst->enqueue( d ) ) { ++appends; }
901  //log(Info) << "Erasing, i="<<i<<endlog();
902  if ( mlst->dequeue( d ) ) {
903  if( *d != *orig) {
904  os::MutexLock lock(m);
905  assert(*d == *orig); // exercise reading returned memory.
906  }
907  ++erases;
908  }
909  }
910  //log(Info) << "Stopping, i="<<i<<endlog();
911  }
912 
913  void finalize() {}
914 
915  bool breakLoop() {
916  stop = true;
917  return true;
918  }
919 };
920 
921 template<class T>
923 
928 template<class T>
929 struct AQGrower : public RunnableInterface
930 {
931  volatile bool stop;
932  T* mlst;
933  int appends;
935  AQGrower(T* l ) : stop(false), mlst(l), appends(0) {
936  orig = new Dummy( 1,2,3);
937  }
939  delete orig;
940  }
941  bool initialize() {
942  stop = false;
943  return true;
944  }
945  void step() {
946  // stress full queue
947  Dummy* d = orig;
948  while (stop == false ) {
949  if ( mlst->enqueue(d) ) {
950  ++appends;
951  }
952  }
953  }
954 
955  void finalize() {}
956 
957  bool breakLoop() {
958  stop = true;
959  return true;
960  }
961 };
962 
967 template<class T>
968 struct AQEater : public RunnableInterface
969 {
970  volatile bool stop;
971  T* mlst;
972  int erases;
973  AQEater(T* l ) : stop(false), mlst(l), erases(0) {}
974  bool initialize() {
975  stop = false;
976  return true;
977  }
978  void step() {
979  // stress full queue
980  Dummy* d;
981  while (stop == false ) {
982  if ( mlst->dequeue(d) ) {
983  //if( *d != *orig)
984  // BOOST_CHECK_EQUAL(*d, *orig); // exercise reading returned memory.
985  ++erases;
986  }
987  }
988  }
989 
990  void finalize() {}
991 
992  bool breakLoop() {
993  stop = true;
994  return true;
995  }
996 };
997 
998 
999 BOOST_FIXTURE_TEST_SUITE( BuffersAtomicMWMRQueueTestSuite, BuffersAtomicMWMRQueueTest )
1000 
1001 BOOST_AUTO_TEST_CASE( testAtomicMWMRQueue )
1002 {
1006  Dummy* d = new Dummy();
1007  Dummy* c = d;
1008 
1009  BOOST_REQUIRE_EQUAL( AtomicQueue<Dummy*>::size_type(QS), aqueue->capacity() );
1010  BOOST_REQUIRE_EQUAL( AtomicQueue<Dummy*>::size_type(0), aqueue->size() );
1011  BOOST_CHECK( aqueue->isFull() == false );
1012  BOOST_CHECK( aqueue->isEmpty() == true );
1013  BOOST_CHECK( aqueue->dequeue(c) == false );
1014  BOOST_CHECK( c == d );
1015 
1016  for ( int i = 0; i < QS; ++i) {
1017  BOOST_CHECK( aqueue->enqueue( d ) == true);
1018  BOOST_REQUIRE_EQUAL( AtomicQueue<Dummy*>::size_type(i+1), aqueue->size() );
1019  }
1020  BOOST_REQUIRE_EQUAL( AtomicQueue<Dummy*>::size_type(QS), aqueue->capacity() );
1021  BOOST_CHECK( aqueue->isFull() == true );
1022  BOOST_CHECK( aqueue->isEmpty() == false );
1023  BOOST_CHECK( aqueue->enqueue( d ) == false );
1024  BOOST_REQUIRE_EQUAL( AtomicQueue<Dummy*>::size_type(QS), aqueue->size() );
1025 
1026  aqueue->dequeue( d );
1027  BOOST_CHECK( aqueue->isFull() == false );
1028  BOOST_REQUIRE_EQUAL( AtomicQueue<Dummy*>::size_type(QS-1), aqueue->size() );
1029 
1030  for ( int i = 0; i < QS - 1 ; ++i) {
1031  BOOST_CHECK( aqueue->dequeue( d ) == true);
1032  BOOST_REQUIRE_EQUAL( AtomicQueue<Dummy*>::size_type(QS - 2 - i), aqueue->size() );
1033  }
1034  BOOST_CHECK( aqueue->isFull() == false );
1035  BOOST_CHECK( aqueue->isEmpty() == true );
1036 
1037  delete d;
1038 }
1040 
1041 BOOST_FIXTURE_TEST_SUITE( BuffersAtomicMWSRQueueTestSuite, BuffersAtomicMWSRQueueTest )
1042 
1043 BOOST_AUTO_TEST_CASE( testAtomicMWSRQueue )
1044 {
1048  Dummy* d = new Dummy();
1049  Dummy* c = d;
1050 
1051  BOOST_REQUIRE_EQUAL( AtomicQueue<Dummy*>::size_type(QS), aqueue->capacity() );
1052  BOOST_REQUIRE_EQUAL( AtomicQueue<Dummy*>::size_type(0), aqueue->size() );
1053  BOOST_CHECK( aqueue->isFull() == false );
1054  BOOST_CHECK( aqueue->isEmpty() == true );
1055  BOOST_CHECK( aqueue->dequeue(c) == false );
1056  BOOST_CHECK( c == d );
1057 
1058  for ( int i = 0; i < QS; ++i) {
1059  BOOST_CHECK( aqueue->enqueue( d ) == true);
1060  BOOST_REQUIRE_EQUAL( AtomicQueue<Dummy*>::size_type(i+1), aqueue->size() );
1061  BOOST_CHECK( d );
1062  }
1063  BOOST_REQUIRE_EQUAL( AtomicQueue<Dummy*>::size_type(QS), aqueue->capacity() );
1064  BOOST_CHECK( aqueue->isFull() == true );
1065  BOOST_CHECK( aqueue->isEmpty() == false );
1066  BOOST_CHECK( aqueue->enqueue( d ) == false );
1067  BOOST_REQUIRE_EQUAL( AtomicQueue<Dummy*>::size_type(QS), aqueue->size() );
1068  d = 0;
1069  aqueue->dequeue( d );
1070  BOOST_CHECK( d ); // not null
1071  BOOST_CHECK( aqueue->isFull() == false );
1072  BOOST_REQUIRE_EQUAL( AtomicQueue<Dummy*>::size_type(QS-1), aqueue->size() );
1073 
1074  for ( int i = 0; i < QS - 1 ; ++i) {
1075  BOOST_CHECK( aqueue->dequeue( d ) == true);
1076  BOOST_REQUIRE_EQUAL( AtomicQueue<Dummy*>::size_type(QS - 2 - i), aqueue->size() );
1077  BOOST_CHECK( d );
1078  }
1079  BOOST_CHECK( aqueue->isFull() == false );
1080  BOOST_CHECK( aqueue->isEmpty() == true );
1081  BOOST_CHECK( aqueue->dequeue(d) == false );
1082  BOOST_CHECK( d ); // not null
1083 
1084  delete d;
1085 }
1086 
1088 
1089 BOOST_FIXTURE_TEST_SUITE( BuffersDataFlowTestSuite, BuffersDataFlowTest )
1090 
1091 BOOST_AUTO_TEST_CASE( testBufLockFree )
1092 {
1093  buffer = lockfree;
1094  circular = clockfree;
1095  testBuf();
1096  testCirc();
1097 }
1098 
1099 BOOST_AUTO_TEST_CASE( testBufLocked )
1100 {
1101  buffer = locked;
1102  circular = clocked;
1103  testBuf();
1104  testCirc();
1105 }
1106 
1107 BOOST_AUTO_TEST_CASE( testBufUnsync )
1108 {
1109  buffer = unsync;
1110  circular = cunsync;
1111  testBuf();
1112  testCirc();
1113 }
1114 
1115 BOOST_AUTO_TEST_CASE( testDObjLockFree )
1116 {
1117  dataobj = dlockfree;
1118  testDObj();
1119 }
1120 
1121 BOOST_AUTO_TEST_CASE( testDObjLocked )
1122 {
1123  dataobj = dlocked;
1124  testDObj();
1125 }
1126 
1127 BOOST_AUTO_TEST_CASE( testDObjUnSync )
1128 {
1129  dataobj = dunsync;
1130  testDObj();
1131 }
1132 
1133 BOOST_AUTO_TEST_CASE( testBufLockFree4Writers1Reader )
1134 {
1135  buffer
1137  .circular(false)
1138  .multiple_writers(true)
1139  .max_threads(5));
1140  testBufMultiThreaded(4, 1);
1141  delete buffer;
1142 
1143  circular = buffer
1145  .circular(true)
1146  .multiple_writers(true)
1147  .max_threads(5));
1148  testBufMultiThreaded(4, 1);
1149  delete buffer;
1150 }
1151 
1152 BOOST_AUTO_TEST_CASE( testBufLockFree4Writers4Readers )
1153 {
1154  buffer
1156  .circular(false)
1157  .multiple_writers(true)
1158  .multiple_readers(true)
1159  .max_threads(8));
1160  testBufMultiThreaded(4, 4);
1161  delete buffer;
1162 
1163  circular = buffer
1165  .circular(true)
1166  .multiple_writers(true)
1167  .multiple_readers(true)
1168  .max_threads(8));
1169  testBufMultiThreaded(4, 4);
1170  delete buffer;
1171 }
1172 
1173 BOOST_AUTO_TEST_CASE( testBufLocked4Writers4Readers )
1174 {
1175  buffer
1177  .circular(false)
1178  .multiple_writers(true)
1179  .multiple_readers(true)
1180  .max_threads(8));
1181  testBufMultiThreaded(4, 4);
1182  delete buffer;
1183 
1184  circular = buffer
1186  .circular(true)
1187  .multiple_writers(true)
1188  .multiple_readers(true)
1189  .max_threads(8));
1190  testBufMultiThreaded(4, 4);
1191  delete buffer;
1192 }
1193 
1194 BOOST_AUTO_TEST_CASE( testDObjLockFree4Writers1Reader )
1195 {
1196  dataobj = new DataObjectLockFree<Dummy>(Dummy(), /* max_threads = */ 5);
1197  testDObjMultiThreaded(4, 1);
1198  delete dataobj;
1199 }
1200 
1201 BOOST_AUTO_TEST_CASE( testDObjLockFreeSingleWriter4Readers )
1202 {
1203  dataobj = new DataObjectLockFree<Dummy>(Dummy(), /* max_threads = */ 5);
1204  testDObjMultiThreaded(1, 4);
1205  delete dataobj;
1206 }
1207 
1208 BOOST_AUTO_TEST_CASE( testDObjLockFree4Writers4Readers )
1209 {
1210  dataobj = new DataObjectLockFree<Dummy>(Dummy(), /* max_threads = */ 8);
1211  testDObjMultiThreaded(4, 4);
1212  delete dataobj;
1213 }
1214 
1215 BOOST_AUTO_TEST_CASE( testDObjLockedSingleWriter4Readers )
1216 {
1217  dataobj = dlocked;
1218  testDObjMultiThreaded(1, 4);
1219 }
1220 
1222 BOOST_FIXTURE_TEST_SUITE( BuffersMPoolTestSuite, BuffersMPoolTest )
1223 
1224 BOOST_AUTO_TEST_CASE( testMemoryPool )
1225 {
1226  // Test initial conditions.
1228  // Capacity test
1229  BOOST_REQUIRE_EQUAL( sz, mpool->capacity() );
1230  BOOST_REQUIRE_EQUAL( sz, vpool->capacity() );
1231  BOOST_CHECK_EQUAL( sz, mpool->size());
1232  BOOST_CHECK_EQUAL( sz, vpool->size());
1233 
1234  // test default initialiser:
1235  for (TsPool<Dummy>::size_type i = 0; i <3*sz; ++i ) {
1236  // MemoryPool:
1237  std::vector<Dummy>* v = vpool->allocate();
1238  BOOST_CHECK_EQUAL( sz - 1, vpool->size());
1239  std::vector<Dummy>::size_type szv = QS;
1240  BOOST_REQUIRE_EQUAL( szv, v->size() );
1241  BOOST_REQUIRE_EQUAL( szv, v->capacity() );
1242  BOOST_CHECK(vpool->deallocate( v ));
1243  BOOST_CHECK_EQUAL( sz, vpool->size());
1244  }
1245  BOOST_CHECK_EQUAL( vpool->size(), QS);
1246 
1247  // test Allocation.
1248  std::vector<Dummy*> mpv;
1249  // MemoryPool:
1250  for (TsPool<Dummy>::size_type i = 0; i <sz; ++i ) {
1251  mpv.push_back( mpool->allocate() );
1252  BOOST_CHECK_EQUAL( sz - i - 1, mpool->size());
1253  BOOST_CHECK( mpv.back() );
1254  BOOST_REQUIRE_EQUAL( sz, mpool->capacity() );
1255  }
1256  BOOST_CHECK_EQUAL( mpool->size(), 0);
1257  BOOST_CHECK_EQUAL( mpool->allocate(), (Dummy*)0 );
1258  for (TsPool<Dummy>::size_type i = 0; i <sz; ++i ) {
1259  BOOST_CHECK_EQUAL( i , mpool->size());
1260  BOOST_CHECK(mpool->deallocate( mpv.front() ));
1261  BOOST_CHECK_EQUAL( i + 1, mpool->size());
1262  mpv.erase( mpv.begin() );
1263  BOOST_REQUIRE_EQUAL( sz, mpool->capacity() );
1264  }
1265  BOOST_CHECK_EQUAL( mpv.size(), 0 );
1266  BOOST_CHECK_EQUAL( mpool->size(), QS);
1267 }
1268 
1269 BOOST_AUTO_TEST_CASE( testMemoryPoolMultiThreaded )
1270 {
1271  Logger::In in("testMemoryPoolMultiThreaded");
1272  int number_of_workers = QS;
1273 
1274  {
1275  BOOST_CHECK_EQUAL( mpool->size(), QS);
1276  ThreadPool<BuffersMPoolTest::Worker<Dummy> > workers(number_of_workers, ORO_SCHED_OTHER, 0, 0, "BuffersMPoolWorker", mpool);
1277  BOOST_REQUIRE( workers.start() );
1278  sleep(5);
1279  BOOST_REQUIRE( workers.stop() );
1280  BOOST_CHECK_EQUAL( mpool->size(), QS);
1281 
1282  int total_cycles = 0;
1283  BOOST_FOREACH(ThreadPool<BuffersMPoolTest::Worker<Dummy> >::value_type &worker, workers) {
1284  BOOST_CHECK_GT(worker.first->cycles, 0);
1285  log(Info) << worker.second->getName() << ": " << worker.first->cycles << " cycles" << endlog();
1286  total_cycles += worker.first->cycles;
1287  }
1288  }
1289  {
1290  BOOST_CHECK_EQUAL( vpool->size(), QS);
1291  ThreadPool<BuffersMPoolTest::Worker<std::vector<Dummy> > > workers(number_of_workers, ORO_SCHED_OTHER, 0, 0, "BuffersVPoolWorker", vpool);
1292  BOOST_REQUIRE( workers.start() );
1293  sleep(5);
1294  BOOST_REQUIRE( workers.stop() );
1295  BOOST_CHECK_EQUAL( vpool->size(), QS);
1296 
1297  int total_cycles = 0;
1298  BOOST_FOREACH(ThreadPool<BuffersMPoolTest::Worker<std::vector<Dummy> > >::value_type &worker, workers) {
1299  BOOST_CHECK_GT(worker.first->cycles, 0);
1300  log(Info) << worker.second->getName() << ": " << worker.first->cycles << " cycles" << endlog();
1301  total_cycles += worker.first->cycles;
1302  }
1303  }
1304 }
1305 
1306 #if 0
1307 BOOST_AUTO_TEST_CASE( testSortedList )
1308 {
1309  // 7 elements.
1310  mslist->reserve(7);
1311  BOOST_CHECK( mslist->empty() );
1312 
1313  // empty list has no keys.
1314  BOOST_CHECK( mslist->hasKey(Dummy()) == false );
1315 
1316  // empty list fails to erase key.
1317  BOOST_CHECK( mslist->erase(Dummy()) == false );
1318 
1319  // insert element once
1320  BOOST_CHECK( mslist->insert(Dummy(1,2,1)) == true );
1321  BOOST_CHECK( mslist->hasKey(Dummy(1,2,1)) == true );
1322 
1323  BOOST_CHECK( mslist->insert(Dummy(1,2,1)) == false );
1324  BOOST_CHECK( mslist->hasKey(Dummy(1,2,1)) == true );
1325 
1326  // erase element once
1327  BOOST_CHECK( mslist->erase(Dummy(1,2,1)) == true );
1328  BOOST_CHECK( mslist->hasKey(Dummy(1,2,1)) == false );
1329  BOOST_CHECK( mslist->erase(Dummy(1,2,1)) == false );
1330  BOOST_CHECK( mslist->hasKey(Dummy(1,2,1)) == false );
1331 
1332  BOOST_CHECK( mslist->insert(Dummy(1,2,1)) == true );
1333  BOOST_CHECK( mslist->insert(Dummy(1,2,2)) == true );
1334  BOOST_CHECK( mslist->insert(Dummy(1,2,3)) == true );
1335  BOOST_CHECK( mslist->insert(Dummy(1,2,4)) == true );
1336  BOOST_CHECK( mslist->insert(Dummy(1,2,5)) == true );
1337  BOOST_CHECK( mslist->insert(Dummy(1,2,6)) == true );
1338  BOOST_CHECK( mslist->insert(Dummy(1,2,7)) == true );
1339 
1340  BOOST_CHECK( mslist->hasKey(Dummy(1,2,4)) == true );
1341  BOOST_CHECK( mslist->hasKey(Dummy(1,2,7)) == true );
1342 
1343  BOOST_CHECK( mslist->erase(Dummy(1,2,7)) == true );
1344  BOOST_CHECK( mslist->hasKey(Dummy(1,2,7)) == false );
1345 
1346  BOOST_CHECK( mslist->erase(Dummy(1,2,4)) == true );
1347  BOOST_CHECK( mslist->hasKey(Dummy(1,2,4)) == false );
1348 
1349  mslist->applyOnData( &addOne );
1350  BOOST_CHECK( mslist->hasKey(Dummy(2,3,2)) == true );
1351  BOOST_CHECK( mslist->hasKey(Dummy(2,3,3)) == true );
1352  BOOST_CHECK( mslist->hasKey(Dummy(2,3,4)) == true );
1353  BOOST_CHECK( mslist->hasKey(Dummy(2,3,6)) == true );
1354  BOOST_CHECK( mslist->hasKey(Dummy(2,3,7)) == true );
1355 
1356  mslist->applyOnData( &subOne );
1357  BOOST_CHECK( mslist->hasKey(Dummy(1,2,1)) == true );
1358  BOOST_CHECK( mslist->hasKey(Dummy(1,2,2)) == true );
1359  BOOST_CHECK( mslist->hasKey(Dummy(1,2,3)) == true );
1360  BOOST_CHECK( mslist->hasKey(Dummy(1,2,5)) == true );
1361  BOOST_CHECK( mslist->hasKey(Dummy(1,2,6)) == true );
1362 
1363  BOOST_CHECK( mslist->erase(Dummy(1,2,1)) == true );
1364  BOOST_CHECK( mslist->erase(Dummy(1,2,6)) == true );
1365  BOOST_CHECK( mslist->erase(Dummy(1,2,5)) == true );
1366  BOOST_CHECK( mslist->erase(Dummy(1,2,2)) == true );
1367  BOOST_CHECK( mslist->erase(Dummy(1,2,3)) == true );
1368 
1369  BOOST_CHECK( mslist->empty() );
1370 }
1371 #endif
1373 
1374 #ifdef OROPKG_OS_GNULINUX
1375 
1376 BOOST_AUTO_TEST_CASE( testListLockFree )
1377 {
1378  // maximum of 4 threads: 3 workers and one grower
1379  ListLockFree<Dummy> *listlockfree = new ListLockFree<Dummy>(10, 4);
1380 
1381  BOOST_REQUIRE_EQUAL( 10, listlockfree->capacity() );
1382  BOOST_REQUIRE_EQUAL( 0, listlockfree->size() );
1383 
1384  ThreadPool< LLFWorker > pool(3, ORO_SCHED_OTHER, 0, 0.0, "LLFWorker", listlockfree);
1385  LLFGrower* grower = new LLFGrower( listlockfree );
1386 
1387  {
1388  boost::scoped_ptr<Activity> gthread( new Activity(ORO_SCHED_OTHER, 0, 0, grower, "LLFGrower" ));
1389 
1390  BOOST_REQUIRE(pool.start());
1391 
1392  sleep(5);
1393  BOOST_REQUIRE(gthread->start());
1394  sleep(10);
1395  BOOST_REQUIRE(gthread->stop());
1396  sleep(5);
1397 
1398  BOOST_REQUIRE(pool.stop());
1399  }
1400 
1401 #if 0
1402  for(ThreadPool< LLFWorker >::const_iterator it = pool.begin(); it != pool.end(); ++it) {
1403  log(Info) << it->second->getName() << " appends: " << it->first->appends<<endlog();
1404  log(Info) << it->second->getName() << " erases: " << it->first->erases<<endlog();
1405  }
1406  log(Info) << "List capacity: "<< listlockfree->capacity()<<endlog();
1407  log(Info) << "List size: "<< listlockfree->size()<<endlog();
1408 // while( listlockfree->empty() == false ) {
1409 // Dummy d = listlockfree->back();
1410 // //log(Info) << "Left: "<< d <<endlog();
1411 // BOOST_CHECK( listlockfree->erase( d ) );
1412 // }
1413 #endif
1414 
1415  for(ThreadPool< LLFWorker >::const_iterator it = pool.begin(); it != pool.end(); ++it) {
1416  BOOST_CHECK_EQUAL( it->first->appends, it->first->erases );
1417  }
1418 
1419  pool.clear();
1420  delete grower;
1421  delete listlockfree;
1422 }
1423 
1424 BOOST_AUTO_TEST_CASE( testAtomicMWMRQueue )
1425 {
1426  Logger::In in("testAtomicMWMRQueue");
1427 
1428  MWMRQueueType* qt = new MWMRQueueType(QS);
1429  ThreadPool< AQWorker<MWMRQueueType> > pool(5, ORO_SCHED_OTHER, 20, 0.0, "AQWorker", qt);
1432  boost::scoped_ptr<Activity> gthread( new Activity(20, grower, "AQGrower"));
1433  boost::scoped_ptr<Activity> ethread( new Activity(20, eater, "AQEater"));
1434 
1435  // avoid system lock-ups
1436  gthread->thread()->setScheduler(ORO_SCHED_OTHER);
1437  ethread->thread()->setScheduler(ORO_SCHED_OTHER);
1438 
1439  {
1440  log(Info) <<"Stressing multi-read/multi-write..." <<endlog();
1441  BOOST_REQUIRE(pool.start());
1442  sleep(5);
1443  log(Info) <<"Stressing multi-read/multi-write...on full buffer" <<endlog();
1444  BOOST_REQUIRE(gthread->start()); // stress full bufs
1445  sleep(5);
1446  BOOST_REQUIRE(gthread->stop());
1447  log(Info) <<"Stressing multi-read/multi-write...on empty buffer" <<endlog();
1448  BOOST_REQUIRE(ethread->start()); // stress empty bufs
1449  sleep(5);
1450  BOOST_REQUIRE(pool.stop());
1451  gthread->start(); // stress single-reader single-writer
1452  log(Info) <<"Stressing read&write..." <<endlog();
1453  sleep(5);
1454  BOOST_REQUIRE(gthread->stop());
1455  BOOST_REQUIRE(ethread->stop());
1456  }
1457 
1458  int appends = 0;
1459  int erases = 0;
1460  for(ThreadPool< AQWorker<MWMRQueueType> >::const_iterator it = pool.begin(); it != pool.end(); ++it) {
1461  appends += it->first->appends;
1462  erases += it->first->erases;
1463  }
1464  appends += grower->appends;
1465  erases += eater->erases;
1466 
1467  log(Info) << nlog()
1468  << "Total appends: " << appends << endlog();
1469  log(Info) << "Total erases : " << erases << endlog();
1470  if (appends != erases + int(qt->size())) {
1471  log(Info) << "Mismatch detected !" <<endlog();
1472  }
1473  int i = 0; // left-over count
1474  Dummy* d = 0;
1475  BOOST_CHECK( qt->size() <= QS );
1476  while( qt->size() != 0 ) {
1477  BOOST_CHECK( qt->dequeue(d) == true);
1478  BOOST_CHECK( d );
1479  i++;
1480  if ( i > QS ) {
1481  BOOST_CHECK( i <= QS); // avoid infinite loop.
1482  break;
1483  }
1484  }
1485  log(Info) << "Left in Queue: "<< i <<endlog();
1486  BOOST_CHECK( qt->dequeue(d) == false );
1487  BOOST_CHECK( qt->dequeue(d) == false );
1488  BOOST_CHECK( qt->isEmpty() );
1489  BOOST_CHECK_EQUAL( qt->size(), 0 );
1490 
1491  // assert: sum queues == sum dequeues
1492  BOOST_CHECK_EQUAL( appends,
1493  erases + i );
1494 
1495  pool.clear();
1496  delete grower;
1497  delete eater;
1498  delete qt;
1499 }
1500 
1501 BOOST_AUTO_TEST_CASE( testAtomicMWSRQueue )
1502 {
1503  Logger::In in("testAtomicMWSRQueue");
1504 
1505  MWSRQueueType* qt = new MWSRQueueType(QS);
1506  ThreadPool< AQGrower<MWSRQueueType> > pool(5, ORO_SCHED_OTHER, 20, 0.0, "AQGrower", qt);
1509  boost::scoped_ptr<Activity> gthread( new Activity(20, grower, "AQGrower"));
1510  boost::scoped_ptr<Activity> ethread( new Activity(20, eater, "AQEater"));
1511 
1512  // avoid system lock-ups
1513  gthread->thread()->setScheduler(ORO_SCHED_OTHER);
1514  ethread->thread()->setScheduler(ORO_SCHED_OTHER);
1515 
1516  {
1517  log(Info) <<"Stressing multi-write/single-read..." <<endlog();
1518  BOOST_REQUIRE(pool.start());
1519  BOOST_REQUIRE(gthread->start());
1520  BOOST_REQUIRE(ethread->start());
1521  sleep(5);
1522  BOOST_REQUIRE(pool.stop());
1523  log(Info) <<"Stressing single-write/single-read..." <<endlog();
1524  sleep(5);
1525  BOOST_REQUIRE(gthread->stop());
1526  BOOST_REQUIRE(ethread->stop());
1527  }
1528 
1529  int appends = 0;
1530  int erases = 0;
1531  for(ThreadPool< AQGrower<MWSRQueueType> >::const_iterator it = pool.begin(); it != pool.end(); ++it) {
1532  appends += it->first->appends;
1533  }
1534  appends += grower->appends;
1535  erases += eater->erases;
1536 
1537  log(Info) << nlog()
1538  << "Total appends: " << appends << endlog();
1539  log(Info) << "Total erases : " << erases << endlog();
1540  if (appends != int(qt->size()) + erases) {
1541  log(Info) << "Mismatch detected !" <<endlog();
1542  }
1543  int i = 0; // left-over count
1544  Dummy* d = 0;
1545  BOOST_CHECK( qt->size() <= QS );
1546  while( qt->size() != 0 ) {
1547  BOOST_CHECK( qt->dequeue(d) == true);
1548  BOOST_CHECK( d );
1549  i++;
1550  if ( i > QS ) {
1551  BOOST_CHECK( i <= QS); // avoid infinite loop.
1552  break;
1553  }
1554  }
1555  log(Info) << "Left in Queue: "<< i <<endlog();
1556  BOOST_CHECK( qt->dequeue(d) == false );
1557  BOOST_CHECK( qt->dequeue(d) == false );
1558  BOOST_CHECK( qt->isEmpty() );
1559  BOOST_CHECK_EQUAL( qt->size(), 0 );
1560 
1561  // assert: sum queues == sum dequeues
1562  BOOST_CHECK_EQUAL( appends,
1563  erases + i );
1564 
1565  pool.clear();
1566  delete grower;
1567  delete eater;
1568  delete qt;
1569 }
1570 
1571 #endif
bool initialize()
void testDObjMultiThreaded(int number_of_writers, int number_of_readers)
static Logger::LogFunction nlog()
Definition: Logger.hpp:375
Dummy * orig
#define BOOST_FIXTURE_TEST_SUITE(suite_name, F)
BufferWriter(BufferInterface< Dummy > *buffer)
BufferInterface< Dummy > * buffer
unsigned int capacity()
Definition: TsPool.hpp:222
volatile bool stop
double Seconds
Definition: os/Time.hpp:53
AQWorker(T *l)
bool initialize()
void finalize()
BOOST_AUTO_TEST_CASE(testAtomicMWMRQueue)
BufferInterface< Dummy > * circular
BufferReader(BufferInterface< Dummy > *buffer)
DataObjectLockFree< Dummy > * dlockfree
BufferLocked< Dummy > * clocked
unsigned int size_type
Definition: TsPool.hpp:100
AtomicMWSRQueue< Dummy * > MWSRQueueType
bool breakLoop()
AQGrower(T *l)
ThreadInterface * bthread
void testBufMultiThreaded(int number_of_writers, int number_of_readers)
#define BOOST_AUTO_TEST_SUITE_END()
void finalize()
FlowStatus
Definition: FlowStatus.hpp:56
std::vector< std::pair< boost::shared_ptr< Worker >, boost::shared_ptr< ThreadInterface > > > Threads
Definition: mystd.hpp:163
A class for running a certain piece of code in a thread.
void addOne(Dummy &d)
ThreadInterface * athread
void subOne(Dummy &d)
volatile bool stop
BufferInterface< Dummy > * buffer
A class which provides unprotected (not thread-safe) access to one typed element of data...
Worker(TsPool< T > *pool)
DataObjectInterface< Dummy > * dataobj
BufferLockFree< Dummy > * clockfree
TsPool< Dummy > * mpool
ThreadPool(int threads, int scheduler, int priority, Seconds period, const std::string &name, const Arg1 &arg1)
DataObjectReader(DataObjectInterface< Dummy > *dataobj)
Dummy * orig
std::ostream & operator<<(std::ostream &os, const std::vector< double > &vect)
bool breakLoop()
void step()
BufferUnSync< Dummy > * cunsync
DataObjectInterface< Dummy > * dataobj
volatile bool stop
Dummy(double a=0.0, double b=1.0, double c=2.0)
#define QS
Threads::value_type value_type
bool breakLoop()
DataObjectLocked< Dummy > * dlocked
This DataObject is a Lock-Free implementation, such that reads and writes can happen concurrently wit...
bool breakLoop()
void step()
ThreadInterface * athread
ThreadInterface * bthread
bool breakLoop()
DataObjectUnSync< Dummy > * dunsync
double d2
BufferInterface< Dummy > * buffer
Threads::iterator iterator
An Activity executes a RunnableInterface object in a (periodic) thread.
Definition: Activity.hpp:70
void finalize()
A DataObjectInterface implements multi-threaded read/write solutions.
unsigned int sleep(unsigned int s)
Definition: fosi.cpp:51
bool operator<(const Dummy &d) const
bool initialize()
A class which provides locked/protected access to one typed element of data.
TsPool< std::vector< Dummy > > * vpool
DataObjectWriter(DataObjectInterface< Dummy > *dataobj)
BufferUnSync< Dummy > * unsync
bool initialize()
LLFGrower(T *l)
AQEater(T *l)
An object oriented wrapper around a non recursive mutex.
Definition: Mutex.hpp:92
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:53
unsigned int size()
Definition: TsPool.hpp:205
BufferLockFree< Dummy > * lockfree
void step()
ListLockFree< Dummy > T
ListLockFree< Dummy > T
static os::Mutex m
bool operator!=(const Dummy &d) const
LLFWorker(T *l)
double d3
static Logger & log()
Definition: Logger.hpp:350
bool operator==(const Dummy &d) const
DataObjectInterface< Dummy > * dataobj
AtomicMWMRQueue< Dummy * > MWMRQueueType
bool initialize()
#define ORO_SCHED_OTHER
Definition: ecos/fosi.h:62
std::map< FlowStatus, int > ReadsByStatusMap
double d1
AtomicInt threads(0)
Definition: threads.hpp:54
Threads::const_iterator const_iterator
static Logger::LogFunction endlog()
Definition: Logger.hpp:362
BufferLocked< Dummy > * locked
void finalize()
volatile bool stop
MutexLock is a scope based Monitor, protecting critical sections with a Mutex object through locking ...
Definition: MutexLock.hpp:51
void finalize()


rtt
Author(s): RTT Developers
autogenerated on Tue Jun 25 2019 19:33:20