dataflow_performance_test.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  * *
3  * This program is free software; you can redistribute it and/or modify *
4  * it under the terms of the GNU General Public License as published by *
5  * the Free Software Foundation; either version 2 of the License, or *
6  * (at your option) any later version. *
7  * *
8  ***************************************************************************/
9 
10 #include <rtt-config.h>
11 #include <iostream>
12 #include <iomanip>
13 #include <cstdlib>
14 #include <ctime>
15 #include <unistd.h>
16 #include <bitset>
17 
18 #include <boost/lexical_cast.hpp>
19 
20 #include <vector>
21 #include <boost/array.hpp>
22 
23 #include <boost/test/unit_test.hpp>
24 
25 #define SAMPLE_SIZE 10000
26 #define SAMPLE_TYPE boost::array<double,SAMPLE_SIZE>
27 // #define SAMPLE_TYPE std::vector<double>
28 #define NUMBER_OF_CYCLES 10000
29 
31 template <typename T, PortTypes> struct Adaptor;
32 
33 
34 #if (RTT_VERSION_MAJOR == 1)
35 
36  #define RTT_VERSION_GTE(major,minor,patch) \
37  ((RTT_VERSION_MAJOR > major) || (RTT_VERSION_MAJOR == major && \
38  (RTT_VERSION_MINOR > minor) || (RTT_VERSION_MINOR == minor && \
39  (RTT_VERSION_PATCH >= patch))))
40 
41  #include "TaskContext.hpp"
42  #include "DataPort.hpp"
43  #include "BufferPort.hpp"
44  #include "ConnPolicy.hpp"
45  #include "Activity.hpp"
46  #include "SlaveActivity.hpp"
47 
48  using RTT::TaskContext;
50  typedef enum { NoData, OldData, NewData } FlowStatus;
52  typedef enum { UnspecifiedReadPolicy, ReadUnordered, ReadShared } ReadPolicy;
53  typedef enum { UnspecifiedWritePolicy, WritePrivate, WriteShared } WritePolicy;
54 
55  // dummy RTT v2.8.99 compatible ConnPolicy
56  struct ConnPolicy {
57  static const int UNBUFFERED = -1;
58  static const int DATA = 0;
59  static const int BUFFER = 1;
60  static const int CIRCULAR_BUFFER = 2;
61 
62  static const int UNSYNC = 0;
63  static const int LOCKED = 1;
64  static const int LOCK_FREE = 2;
65 
66  static const bool PUSH = false;
67  static const bool PULL = true;
68 
69  ConnPolicy() : type(), size(), lock_policy(), init(), pull(), read_policy(), write_policy(), max_threads(), mandatory(), transport(), data_size(), name_id() {}
70 
71  int type;
72  int size;
73  int lock_policy;
74  bool init;
75  bool pull;
76  ReadPolicy read_policy;
77  WritePolicy write_policy;
78  int max_threads;
79  bool mandatory;
80  int transport;
81  mutable int data_size;
82  mutable std::string name_id;
83  };
84 
85  // alternative Mutex/MutexLock/Condition variable implementation for worker synchronization only
86  #include <boost/thread/mutex.hpp>
87  #include <boost/thread/condition_variable.hpp>
88  class Mutex
89  {
90  public:
91  void lock() { m.lock(); }
92  void unlock() { m.unlock(); }
93  boost::mutex m;
94  };
95 
96  class MutexLock
97  {
98  public:
99  MutexLock(Mutex &mutex) : m(mutex.m) { m.lock(); }
100  ~MutexLock() { m.unlock(); }
101  boost::mutex &m;
102  };
103 
104  class Condition
105  {
106  public:
107  void broadcast() { c.notify_all(); }
108  void wait(Mutex &m) { c.wait(m.m); }
109 
110  private:
111  boost::condition_variable_any c;
112  };
113 
114  using RTT::SlaveActivity;
115  using RTT::OS::ThreadInterface;
116 
117  #include <pthread.h>
118  namespace adaptor
119  {
120  static bool trigger(TaskContext *tc) {
121  return tc->getActivity()->trigger();
122  }
123 
124  static void yield(TaskContext *tc) {
125  tc->getActivity()->thread()->yield();
126  }
127 
128  static bool update(TaskContext *tc) {
129  return tc->getActivity()->execute();
130  }
131 
132  static bool setCpuAffinity(ThreadInterface *thread, std::bitset<16> cpu_affinity) {
133  if ( cpu_affinity.none() )
134  cpu_affinity = ~cpu_affinity;
135  RTOS_TASK *task = thread->getTask();
136  if( task && task->thread != 0 ) {
137  cpu_set_t cs;
138  CPU_ZERO(&cs);
139  for(std::size_t i = 0; i < cpu_affinity.size(); i++)
140  {
141  if(cpu_affinity[i]) { CPU_SET(i, &cs); }
142  }
143  return 0 == pthread_setaffinity_np(task->thread, sizeof(cs), &cs);
144  }
145  return false;
146  }
147 
148  static std::bitset<16> getCpuAffinity(ThreadInterface *thread) {
149  RTOS_TASK *task = thread->getTask();
150  if( task && task->thread != 0) {
151  std::bitset<16> cpu_affinity;
152  cpu_set_t cs;
153  pthread_getaffinity_np(task->thread, sizeof(cs), &cs);
154  for(std::size_t i = 0; i < cpu_affinity.size(); i++)
155  {
156  if(CPU_ISSET(i, &cs)) { cpu_affinity[i] = true; }
157  }
158  return cpu_affinity;
159  }
160  return ~std::bitset<16>();
161  }
162 
163  static RTT::ConnectionInterface::shared_ptr getOrCreateConnection(RTT::PortInterface &output_port, RTT::PortInterface &input_port, const ConnPolicy &cp) {
164  RTT::ConnectionTypes::ConnectionType con_type;
165  switch(cp.lock_policy) {
166  case ConnPolicy::LOCKED: con_type = RTT::ConnectionTypes::locked; break;
167  case ConnPolicy::LOCK_FREE: con_type = RTT::ConnectionTypes::lockfree; break;
168  default: return RTT::ConnectionInterface::shared_ptr();
169  }
170  RTT::ConnectionInterface::shared_ptr connection = output_port.connection();
171  if (!connection) connection = input_port.connection();
172  if (!connection) {
173  connection = output_port.createConnection(con_type);
174  connection->connect();
175  }
176  return connection;
177  }
178  }
179 
180  template <typename T, PortTypes>
181  struct Adaptor {};
182 
183  template <typename T>
184  struct Adaptor<T,DataPortType>
185  {
186  typedef RTT::DataPortBase<T> Port;
187  typedef RTT::WriteDataPort<T> OutputPort;
188  typedef RTT::ReadDataPort<T> InputPort;
189 
190  static void addPort(TaskContext *tc, Port &port) {
191  tc->ports()->addPort(&port);
192  }
193 
194  static void addEventPort(TaskContext *tc, InputPort &port) {
195  tc->ports()->addEventPort(&port);
196  }
197 
198  static void setDataSample(OutputPort &port, const T &sample) {
199  }
200 
201  static WriteStatus write(OutputPort &port, const T &sample) {
202  if (!port.connected()) return NotConnected;
203  port.Set(sample);
204  return WriteSuccess;
205  }
206 
207  static FlowStatus read(InputPort &port, T &sample, bool copy_old_data) {
208  if (!port.connected()) return NoData;
209  port.Get(sample);
210  return NewData;
211  }
212 
213  static bool connect(OutputPort &output_port, InputPort &input_port, const ConnPolicy &cp) {
214  if (cp.type != ConnPolicy::DATA) return false;
215  RTT::ConnectionInterface::shared_ptr connection = adaptor::getOrCreateConnection(output_port, input_port, cp);
216  if (!connection) return false;
217  if (!output_port.connected() && !output_port.connectTo(connection)) return false;
218  if (!input_port.connected() && !input_port.connectTo(connection)) return false;
219  return true;
220  }
221  };
222 
223  template <typename T>
224  struct Adaptor<T,BufferPortType>
225  {
226  typedef RTT::BufferPortBase<T> Port;
227  typedef RTT::ReadBufferPort<T> InputPort;
228 
229  // derive from RTT::WriteBufferPort<T> to adapt constructor signature
230  class OutputPort : public RTT::WriteBufferPort<T>
231  {
232  public:
233  OutputPort(const std::string &name, const T &initial_value = T()) : RTT::WriteBufferPort<T>(name, 0, initial_value) {}
234  };
235 
236  static void addPort(TaskContext *tc, Port &port) {
237  tc->ports()->addPort(&port);
238  }
239 
240  static void addEventPort(TaskContext *tc, InputPort &port) {
241  tc->ports()->addEventPort(&port);
242  }
243 
244  static void setDataSample(OutputPort &port, const T &sample) {
245  }
246 
247  static WriteStatus write(OutputPort &port, const T &sample) {
248  if (!port.connected()) return NotConnected;
249  return port.Push(sample) ? WriteSuccess : WriteFailure;
250  }
251 
252  static FlowStatus read(InputPort &port, T &sample, bool copy_old_data) {
253  if (!port.connected()) return NoData;
254  return port.Pop(sample) ? NewData : NoData;
255  }
256 
257  static bool connect(OutputPort &output_port, InputPort &input_port, const ConnPolicy &cp) {
258  if (cp.type != ConnPolicy::BUFFER) return false;
259  output_port.setBufferSize(cp.size);
260  RTT::ConnectionInterface::shared_ptr connection = adaptor::getOrCreateConnection(output_port, input_port, cp);
261  if (!connection) return false;
262  if (!output_port.connected() && !output_port.connectTo(connection)) return false;
263  if (!input_port.connected() && !input_port.connectTo(connection)) return false;
264  return true;
265  }
266  };
267 
268  class Activity : public RTT::Activity
269  {
270  public:
271  Activity() {}
272  Activity(int scheduler, int priority, RTT::Seconds period, unsigned cpu_affinity, RTT::RunnableInterface *r, const std::string &name)
273  : RTT::Activity(scheduler, priority, period, r, name)
274  {
275  adaptor::setCpuAffinity(this->thread(), cpu_affinity);
276  usleep(1000); // getScheduler() and getPriority() may return wrong values if called directly after construction!
277  }
278  };
279 
280 #else
281  #include <rtt/TaskContext.hpp>
282  #include <rtt/InputPort.hpp>
283  #include <rtt/OutputPort.hpp>
285  #include <rtt/Activity.hpp>
286 
287  using RTT::TaskContext;
288  using RTT::FlowStatus;
289  using RTT::NoData;
290  using RTT::OldData;
291  using RTT::NewData;
292  using RTT::Activity;
294 
295 #if !RTT_VERSION_GTE(2,8,99)
298 
299  // dummy RTT v2.8.99 compatible ConnPolicy
300  struct ConnPolicy : public RTT::ConnPolicy {
301  static const bool PUSH = false;
302  static const bool PULL = true;
303 
306  bool mandatory;
307 
308  ConnPolicy() : RTT::ConnPolicy(), buffer_policy(), max_threads(), mandatory() {}
309  explicit ConnPolicy(int type) : RTT::ConnPolicy(type), buffer_policy(), max_threads(), mandatory() {}
310  explicit ConnPolicy(int type, int lock_policy) : RTT::ConnPolicy(type, lock_policy), buffer_policy(), max_threads(), mandatory() {}
311 
313  static_cast<RTT::ConnPolicy&>(*this) = other;
314  buffer_policy = 0;
315  max_threads = 0;
316  mandatory = false;
317  return *this;
318  }
319  };
320 
321 #else
322  using namespace RTT;
323 #endif
324 
325  using RTT::os::Mutex;
326  using RTT::os::MutexLock;
327  using RTT::os::Condition;
329 
331 
332  namespace adaptor
333  {
334  static bool trigger(TaskContext *tc) {
335  return tc->trigger();
336  }
337 
338  static void yield(TaskContext *tc) {
339  tc->getActivity()->thread()->yield();
340  }
341 
342  static bool update(TaskContext *tc) {
343  return tc->update();
344  }
345 
346  RTT_UNUSED static bool setCpuAffinity(ThreadInterface *thread, const std::bitset<16> &cpu_affinity) {
347  RTT::os::Thread *t = dynamic_cast<RTT::os::Thread *>(thread);
348  if (!t) return false;
349  return t->setCpuAffinity(cpu_affinity.to_ulong());
350  }
351 
352  static std::bitset<16> getCpuAffinity(ThreadInterface *thread) {
353  RTT::os::Thread *t = dynamic_cast<RTT::os::Thread *>(thread);
354  if (!t) return ~std::bitset<16>();
355  return std::bitset<16>(t->getCpuAffinity());
356  }
357  }
358 
359  template <typename T, PortTypes>
360  struct Adaptor
361  {
365 
366  static void addPort(TaskContext *tc, Port &port) {
367  tc->addPort(port);
368  }
369 
370  static void addEventPort(TaskContext *tc, InputPort &port) {
371  tc->addEventPort(port);
372  }
373 
374  static void setDataSample(OutputPort &port, const T &sample) {
375  port.setDataSample(sample);
376  }
377 
378 #if RTT_VERSION_GTE(2,8,99)
379  static WriteStatus write(OutputPort &port, const T &sample) {
380  return port.write(sample);
381  }
382 #else
383  static WriteStatus write(OutputPort &port, const T &sample) {
384  port.write(sample);
385  return WriteSuccess;
386  }
387 #endif
388 
389  static FlowStatus read(InputPort &port, T &sample, bool copy_old_data = true) {
390  return port.read(sample, copy_old_data);
391  }
392 
393  static bool connect(OutputPort &output_port, InputPort &input_port, const ConnPolicy &cp) {
394  return output_port.connectTo(&input_port, cp);
395  }
396  };
397 #endif
398 
399 #if !RTT_VERSION_GTE(2,8,99)
400  std::ostream &operator<<(std::ostream &os, const ConnPolicy &cp)
401  {
402  std::string type;
403  switch(cp.type) {
404  case ConnPolicy::UNBUFFERED: type = "UNBUFFERED"; break;
405  case ConnPolicy::DATA: type = "DATA"; break;
406  case ConnPolicy::BUFFER: type = "BUFFER"; break;
407  case ConnPolicy::CIRCULAR_BUFFER: type = "CIRCULAR_BUFFER"; break;
408  default: type = "(unknown type)"; break;
409  }
410  if (cp.size > 0) {
411  type += "[" + boost::lexical_cast<std::string>(cp.size) + "]";
412  }
413 
414  std::string lock_policy;
415  switch(cp.lock_policy) {
416  case ConnPolicy::UNSYNC: lock_policy = "UNSYNC"; break;
417  case ConnPolicy::LOCKED: lock_policy = "LOCKED"; break;
418  case ConnPolicy::LOCK_FREE: lock_policy = "LOCK_FREE"; break;
419  default: lock_policy = "(unknown lock policy)"; break;
420  }
421 
422  std::string pull;
423  switch(int(cp.pull)) {
424  case int(ConnPolicy::PUSH): pull = "PUSH"; break;
425  case int(ConnPolicy::PULL): pull = "PULL"; break;
426  }
427 
428  os << pull << " ";
429  os << lock_policy << " ";
430  os << type;
431  if (!cp.name_id.empty()) os << " (name_id=" << cp.name_id << ")";
432 
433  return os;
434  }
435 #endif
436 
437 #ifndef _stringify
438  #define _stringify(x) _stringify2(x)
439  #ifndef _stringify2
440  #define _stringify2(x...) #x
441  #endif
442 #endif
443 
446  oro_atomic_set(&copies, 0);
447  oro_atomic_set(&assignments, 0);
448  oro_atomic_set(&refcount, 0);
449  }
453 };
454 
457 
458 template <typename Derived>
459 class CopyAndAssignmentCounted : public Derived
460 {
461 public:
462  typedef Derived value_type;
464 
466  : Derived(), sleep_usecs_during_assignment(0), _counter_details(new CopyAndAssignmentCountedDetails)
467  {
468  ORO_ATOMIC_SETUP(&_write_guard, 1);
469  ORO_ATOMIC_SETUP(&_read_guard, 1);
470  }
471  CopyAndAssignmentCounted(const value_type &value)
472  : Derived(value), sleep_usecs_during_assignment(0), _counter_details(new CopyAndAssignmentCountedDetails)
473  {
474  ORO_ATOMIC_SETUP(&_write_guard, 1);
475  ORO_ATOMIC_SETUP(&_read_guard, 1);
476  }
477  CopyAndAssignmentCounted(const this_type &other)
478  : Derived(other), sleep_usecs_during_assignment(0), _counter_details(other._counter_details)
479  {
480  ORO_ATOMIC_SETUP(&_write_guard, 1);
481  ORO_ATOMIC_SETUP(&_read_guard, 1);
482  oro_atomic_inc(&(_counter_details->copies));
483  }
485  ORO_ATOMIC_CLEANUP(&_write_guard);
486  ORO_ATOMIC_CLEANUP(&_read_guard);
487  }
488 
489  this_type &operator=(const value_type &) { throw std::runtime_error("Cannot assign CopyAndAssignmentCounted type from its value_type directly!"); }
490  this_type &operator=(const this_type &other) {
491  if (this == &other) return *this;
492 
493  // lock guards to detect concurrent reading and writing
494  if (!oro_atomic_dec_and_test(&_write_guard))
495  throw std::runtime_error("Conflicting assignment detected: instance is concurrently being assigned by another thread!");
496  oro_atomic_inc(&other._read_guard);
497  if (!oro_atomic_dec_and_test(&_read_guard))
498  throw std::runtime_error("Conflicting assignment detected: instance being assigned is concurrently read by another thread!");
499 
500  // copy
501  static_cast<value_type &>(*this) = other;
502  _counter_details = other._counter_details;
503  oro_atomic_inc(&(_counter_details->assignments));
504  if (sleep_usecs_during_assignment > 0) usleep(sleep_usecs_during_assignment);
506 
507  // cleanup guards
508  oro_atomic_inc(&_read_guard);
509  oro_atomic_dec(&other._read_guard);
510  oro_atomic_inc(&_write_guard);
511  return *this;
512  }
513 
514  int getCopyCount() { return oro_atomic_read(&(_counter_details->copies)); }
515  int getAssignmentCount() { return oro_atomic_read(&(_counter_details->assignments)); }
516  void resetCounters() {
517  oro_atomic_set(&(_counter_details->copies), 0);
518  oro_atomic_set(&(_counter_details->assignments), 0);
519  }
520 
521  this_type detachedCopy() const { return this_type(static_cast<const value_type &>(*this)); }
522 
523 public:
525 
526 private:
527  boost::intrusive_ptr<CopyAndAssignmentCountedDetails> _counter_details;
530 };
531 
532 template <typename T>
533 static void ResizeSample(T &sample, std::size_t size)
534 {
535  sample.resize(size);
536 }
537 
538 template <typename T, std::size_t N>
539 static void ResizeSample(CopyAndAssignmentCounted<boost::array<T,N> > &sample, std::size_t size)
540 {
541  assert(size == N);
542 }
543 
544 template <typename T>
545 static void GenerateRandomSample(T &sample, std::size_t size)
546 {
547  ResizeSample(sample, size);
548  std::srand(std::time(0));
549  for(typename T::iterator it = sample.begin(); it != sample.end(); ++it)
550  {
551  *it = std::rand();
552  }
553 }
554 
555 template <typename T>
556 static void GenerateOrderedSample(T &sample, std::size_t size)
557 {
558  ResizeSample(sample, size);
559  std::size_t counter = 0;
560  for(typename T::iterator it = sample.begin(); it != sample.end(); ++it)
561  {
562  *it = ++counter;
563  }
564 }
565 
566 timespec operator-(const timespec &a, const timespec &b)
567 {
568  timespec result;
569  result.tv_sec = a.tv_sec - b.tv_sec;
570  if (a.tv_nsec >= b.tv_nsec) {
571  result.tv_nsec = a.tv_nsec - b.tv_nsec;
572  } else {
573  result.tv_sec -= 1;
574  result.tv_nsec = 1000000000 - b.tv_nsec + a.tv_nsec;
575  }
576  return result;
577 }
578 
579 timespec operator+(const timespec &a, const timespec &b)
580 {
581  timespec result;
582  result.tv_sec = a.tv_sec + b.tv_sec;
583  result.tv_nsec = a.tv_nsec + b.tv_nsec;
584  if (result.tv_nsec >= 1000000000)
585  {
586  result.tv_sec += 1;
587  result.tv_nsec -= 1000000000;
588  }
589  return result;
590 }
591 
592 timespec operator/(const timespec &a, unsigned long divider)
593 {
594  timespec result;
595 
596  if (divider == 0) {
597  result.tv_sec = 0;
598  result.tv_nsec = 0;
599  return result;
600  }
601 
602  result.tv_sec = a.tv_sec / divider;
603  long long remainder_sec = a.tv_sec % divider;
604  result.tv_nsec = a.tv_nsec / divider + ((remainder_sec * 1000000000ll) / divider);
605  if (result.tv_nsec >= 1000000000)
606  {
607  result.tv_sec += 1;
608  result.tv_nsec -= 1000000000;
609  }
610 
611  return result;
612 }
613 
614 bool operator<(const timespec &a, const timespec &b)
615 {
616  return (a.tv_sec < b.tv_sec) || (a.tv_sec == b.tv_sec && a.tv_nsec < b.tv_nsec);
617 }
618 
619 std::ostream &operator<<(std::ostream &stream, const timespec &tp)
620 {
621  std::ostringstream seconds;
622 // std::ios state(NULL);
623 // state.copyfmt(stream);
624  seconds << tp.tv_sec << "." << std::setw(9) << std::setfill('0') << tp.tv_nsec;
625 // stream.copyfmt(state);
626  return stream << seconds.str();
627 }
628 
629 class Timer
630 {
631 public:
632  struct Data {
633  Data() { reset(); }
634  void reset() { monotonic.tv_sec = 0; monotonic.tv_nsec = 0; cputime.tv_sec = 0; cputime.tv_nsec = 0; }
635  operator const void *() const { return (monotonic.tv_sec == 0 && monotonic.tv_nsec == 0 && cputime.tv_sec == 0 && cputime.tv_nsec == 0) ? 0 : this; }
636  Data &operator+=(const Data &other) { monotonic = monotonic + other.monotonic; cputime = cputime + other.cputime; return *this; }
637  timespec monotonic;
638  timespec cputime;
639  };
640 
641  Timer(const std::string &name = "")
642  : name_(name)
643  , count_(0)
644  {}
645 
646  const std::string &getName() const
647  {
648  return name_;
649  }
650 
651  const Data &total() const { return total_; }
652  const Data &min() const { return min_; }
653  const Data &max() const { return max_; }
654  const Data &last() const { return last_delta_; }
655  std::size_t count() const { return count_; }
656 
657  Timer &operator+=(const Data &other) {
658  total_ += other;
659  count_++;
660  return *this;
661  }
662 
663  Timer &operator+=(const Timer &other) {
664  total_ += other.total_;
665  count_ += other.count_;
666  if (!(min_.monotonic.tv_sec || min_.monotonic.tv_nsec) || (other.min_.monotonic < min_.monotonic)) min_.monotonic = other.min_.monotonic;
667  if (!(min_.cputime.tv_sec || min_.cputime.tv_nsec) || (other.min_.cputime < min_.cputime)) min_.cputime = other.min_.cputime;
668  if (!(max_.monotonic.tv_sec || max_.monotonic.tv_nsec) || (max_.monotonic < other.max_.monotonic)) max_.monotonic = other.max_.monotonic;
669  if (!(max_.cputime.tv_sec || max_.cputime.tv_nsec) || (max_.cputime < other.max_.cputime)) max_.cputime = other.max_.cputime;
670  return *this;
671  }
672 
673  Timer &updateFrom(const Timer &other) {
674  total_ += other.last_delta_;
675  last_delta_ = other.last_delta_;
676  if (!(min_.monotonic.tv_sec || min_.monotonic.tv_nsec) || (other.last_delta_.monotonic < min_.monotonic)) min_.monotonic = other.last_delta_.monotonic;
677  if (!(min_.cputime.tv_sec || min_.cputime.tv_nsec) || (other.last_delta_.cputime < min_.cputime)) min_.cputime = other.last_delta_.cputime;
678  if (!(max_.monotonic.tv_sec || max_.monotonic.tv_nsec) || (max_.monotonic < other.last_delta_.monotonic)) max_.monotonic = other.last_delta_.monotonic;
679  if (!(max_.cputime.tv_sec || max_.cputime.tv_nsec) || (max_.cputime < other.last_delta_.cputime)) max_.cputime = other.last_delta_.cputime;
680  count_++;
681  return *this;
682  }
683 
684  void reset()
685  {
686  total_.reset();
687  min_.reset();
688  max_.reset();
689  tic_.reset();
690  last_delta_.reset();
691  count_ = 0;
692  }
693 
694  void tic()
695  {
696  tic_.reset();
697  last_delta_.reset();
698 #if defined(CLOCK_MONOTONIC)
699  clock_gettime(CLOCK_MONOTONIC, &tic_.monotonic);
700 #elif defined(CLOCK_MONOTONIC_RAW)
701  clock_gettime(CLOCK_MONOTONIC_RAW, &tic_.monotonic);
702 #else
703  clock_gettime(CLOCK_REALTIME, &tic_.monotonic);
704 #endif
705 #if defined(CLOCK_THREAD_CPUTIME_ID)
706  clock_gettime(CLOCK_THREAD_CPUTIME_ID, &tic_.cputime);
707 #endif
708  }
709 
711  {
712  if (!tic_) throw std::runtime_error("You called toc() without tic()!");
713 
714  Data toc;
715 #if defined(CLOCK_MONOTONIC)
716  if (0 != clock_gettime(CLOCK_MONOTONIC, &toc.monotonic)) throw std::runtime_error(strerror(errno));
717 #elif defined(CLOCK_MONOTONIC_RAW)
718  if (0 != clock_gettime(CLOCK_MONOTONIC_RAW, &toc.monotonic)) throw std::runtime_error(strerror(errno));
719 #else
720  if (0 != clock_gettime(CLOCK_REALTIME, &toc.monotonic)) throw std::runtime_error(strerror(errno));
721 #endif
722 #if defined(CLOCK_THREAD_CPUTIME_ID)
723  if (0 != clock_gettime(CLOCK_THREAD_CPUTIME_ID, &toc.cputime)) throw std::runtime_error(strerror(errno));
724 #endif
725 
726  count_++;
727  last_delta_.monotonic = toc.monotonic - tic_.monotonic;
728  last_delta_.cputime = toc.cputime - tic_.cputime;
729  total_.monotonic = total_.monotonic + last_delta_.monotonic;
730  total_.cputime = total_.cputime + last_delta_.cputime;
731  if (!(min_.monotonic.tv_sec || min_.monotonic.tv_nsec) || (last_delta_.monotonic < min_.monotonic)) min_.monotonic = last_delta_.monotonic;
732  if (!(min_.cputime.tv_sec || min_.cputime.tv_nsec) || (last_delta_.cputime < min_.cputime)) min_.cputime = last_delta_.cputime;
733  if (!(max_.monotonic.tv_sec || max_.monotonic.tv_nsec) || (max_.monotonic < last_delta_.monotonic)) max_.monotonic = last_delta_.monotonic;
734  if (!(max_.cputime.tv_sec || max_.cputime.tv_nsec) || (max_.cputime < last_delta_.cputime)) max_.cputime = last_delta_.cputime;
735 
736  tic_.reset();
737  return *this;
738  }
739 
740  class Section {
741  public:
742  Section(Timer &timer) : timer_(timer) { timer_.tic(); }
743  ~Section() { timer_.toc(); }
744  private:
746  };
747 
748 private:
749  Data total_, min_, max_;
752  std::string name_;
753  std::size_t count_;
754 };
755 using ::Timer;
756 
758 {
759 public:
760  ReaderWriterTaskContextBase(const std::string &name, std::size_t index = 0)
761  : RTT::TaskContext(name + "[" + boost::lexical_cast<std::string>(index) + "]")
762  , number_of_cycles_(0)
763  , desired_number_of_cycles_(0)
764  {}
765 
767  MutexLock lock(mutex_);
768  number_of_cycles_ = 0;
769  desired_number_of_cycles_ = n;
770  return *this;
771  }
772 
774  BOOST_REQUIRE_MESSAGE(!this->inException(), exception_reason_);
775  }
776 
778  {
779  bool finished = false;
780  {
781  MutexLock lock(mutex_);
782  number_of_cycles_++;
783  if ((desired_number_of_cycles_ > 0) && (number_of_cycles_ >= desired_number_of_cycles_)) finished = true;
784  finished_.broadcast();
785  }
786  if (trigger && !finished) {
787  adaptor::trigger(this);
788  adaptor::yield(this);
789  }
790  if (finished) this->stop();
791  }
792 
793  bool finished() {
794  MutexLock lock(mutex_);
795  return (desired_number_of_cycles_ > 0) && (number_of_cycles_ >= desired_number_of_cycles_);
796  }
797 
799  {
800  if (inException()) return;
801  MutexLock lock(mutex_);
802  while(number_of_cycles_ < desired_number_of_cycles_) {
803  finished_.wait(mutex_);
804  }
805  }
806 
807  std::size_t getNumberOfCycles()
808  {
809  MutexLock lock(mutex_);
810  return number_of_cycles_;
811  }
812 
814  {
815  MutexLock lock(mutex_);
816  return desired_number_of_cycles_;
817  }
818 
819 protected:
822 
823  std::size_t number_of_cycles_;
825 
826  std::string exception_reason_;
827 };
828 
829 template <typename T, PortTypes PortType>
831 {
832 public:
836  SampleType sample;
838  std::map<WriteStatus, Timer> timer_by_status;
839 
840  Writer(const std::string &name, std::size_t index, std::size_t sample_size = 1, bool keep_last_written_value = false)
841  : ReaderWriterTaskContextBase(name, index)
842  , output_port("out")
843  , timer(getName() + "::write()")
844  {
845 #if RTT_VERSION_MAJOR >= 2
846  output_port.keepLastWrittenValue(keep_last_written_value);
847 #endif
848  AdaptorType::addPort(this, output_port);
849  GenerateOrderedSample(sample, sample_size);
850  AdaptorType::setDataSample(output_port, sample.detachedCopy());
851 
852  timer_by_status[WriteSuccess] = Timer(timer.getName() + "[WriteSuccess]");
853  timer_by_status[WriteFailure] = Timer(timer.getName() + "[WriteFailure]");
854  timer_by_status[NotConnected] = Timer(timer.getName() + "[NotConnected]");
855  }
856 
858  {
859  stop();
860  disconnect();
861  }
862 
863  void updateHook()
864  {
865  try {
866  WriteStatus fs = WriteSuccess;
867  {
868  Timer::Section section(timer);
869  fs = AdaptorType::write(output_port, sample);
870  }
871  timer_by_status[fs].updateFrom(timer);
872  this->afterUpdateHook(/* trigger = */ true);
873  } catch(std::exception &e) {
874  exception_reason_ = e.what();
875  throw;
876  }
877  }
878 
880  {
881  return true;
882  }
883 };
884 
885 template <typename T, PortTypes PortType>
887 {
888 public:
892  SampleType sample;
894  std::map<FlowStatus, Timer> timer_by_status;
895 
896  Reader(const std::string &name, std::size_t index, bool read_loop, bool copy_old_data)
897  : ReaderWriterTaskContextBase(name, index)
898  , input_port("in")
899  , timer(getName() + "::read()")
900  , read_loop_(read_loop)
901  , copy_old_data_(copy_old_data)
902  {
903  AdaptorType::addEventPort(this, input_port);
904 
905  timer_by_status[NoData] = Timer(timer.getName() + "[NoData]");
906  timer_by_status[OldData] = Timer(timer.getName() + "[OldData]");
907  timer_by_status[NewData] = Timer(timer.getName() + "[NewData]");
908  }
909 
911  {
912  stop();
913  disconnect();
914  }
915 
916  void updateHook()
917  {
918  try {
919  FlowStatus fs = NewData;
920  while(fs == NewData) {
921  {
922  Timer::Section section(timer);
923  fs = Adaptor<SampleType,PortType>::read(input_port, sample, copy_old_data_);
924  }
925  timer_by_status[fs].updateFrom(timer);
926  if (!read_loop_) break;
927  }
928 
929  this->afterUpdateHook(/* trigger = */ false);
930  } catch(std::exception &e) {
931  exception_reason_ = e.what();
932  throw;
933  }
934  }
935 
937  {
938  return true;
939  }
940 
941 private:
944 };
945 
946 template <typename T, PortTypes> class TestRunner;
947 typedef SAMPLE_TYPE SampleType;
948 
949 struct TestOptions {
950  std::size_t SampleSize;
951  std::size_t NumberOfWriters;
952  std::size_t NumberOfReaders;
953  std::size_t NumberOfCycles;
954 
956 
957  enum { NoWrite, WriteSynchronous, WriteAsynchronous } WriteMode;
958  enum { NoRead, ReadSynchronous, ReadAsynchronous } ReadMode;
959 
961  bool ReadLoop;
967  std::bitset<16> CpuAffinity;
968 
970  : SampleSize(SAMPLE_SIZE),
971  NumberOfWriters(1),
972  NumberOfReaders(1),
973  NumberOfCycles(NUMBER_OF_CYCLES),
974  WriteMode(WriteAsynchronous),
975  ReadMode(ReadAsynchronous),
976  KeepLastWrittenValue(false),
977  ReadLoop(false),
978  CopyOldData(false),
979  SleepUsecsDuringWriteAssignment(1),
980  SleepUsecsDuringReadAssignment(10),
981  SchedulerType(ORO_SCHED_RT),
982  ThreadPriority(HighestPriority),
983  CpuAffinity(getDefaultCpuAffinity() & std::bitset<16>(0x000f)) // run with at most 4 cores
984  {
985  policy.init = false;
986  }
987 
988  static std::bitset<16> getDefaultCpuAffinity()
989  {
990  static std::bitset<16> default_cpu_affinity(0);
991  if (default_cpu_affinity.none()) {
992  Activity temp;
993  default_cpu_affinity = adaptor::getCpuAffinity(temp.thread());
994  }
995  return default_cpu_affinity;
996  }
997 };
998 
999 const char *schedulerTypeToString(int scheduler) {
1000  static std::map<int, const char *> map;
1001  if (map.empty()) {
1002  map[ORO_SCHED_OTHER] = "ORO_SCHED_OTHER";
1003  map[ORO_SCHED_RT] = "ORO_SCHED_RT";
1004  }
1005  return map.at(scheduler);
1006 }
1007 
1008 std::ostream &operator<<(std::ostream &os, const TestOptions &options)
1009 {
1010 
1011  os << "***************************************************************************************************************************" << std::endl;
1012  os << " * RTT version: " << _stringify(RTT_VERSION) << std::endl;
1013  os << " *" << std::endl;
1014  os << " * ConnPolicy: " << options.policy << std::endl;
1015  os << " * Sample Type: " << _stringify(SAMPLE_TYPE) << std::endl;
1016  os << " * Sample Size: " << options.SampleSize << std::endl;
1017  os << " *" << std::endl;
1018  os << " * Writers: " << options.NumberOfWriters << std::endl;
1019  os << " * Readers: " << options.NumberOfReaders << std::endl;
1020  os << " * Cycles: " << options.NumberOfCycles << std::endl;
1021  os << " *" << std::endl;
1022  os << " * Sleep during write: " << options.SleepUsecsDuringWriteAssignment << " usecs" << std::endl;
1023  os << " * Sleep during read: " << options.SleepUsecsDuringReadAssignment << " usecs" << std::endl;
1024  os << " *" << std::endl;
1025  os << " * Scheduler: " << schedulerTypeToString(options.SchedulerType) << std::endl;
1026  os << " * Priority: " << options.ThreadPriority << std::endl;
1027  os << " * CPU affinity: " << options.CpuAffinity << " (" << options.CpuAffinity.count() << " cores)" << std::endl;
1028 
1029  os << " *" << std::endl;
1030  switch(options.WriteMode) {
1031  case TestOptions::NoWrite:
1032  os << " * Write mode: NONE (no writes)" << std::endl; break;
1034  os << " * Write mode: Synchronous" << std::endl; break;
1036  os << " * Write mode: Asynchronous" << std::endl; break;
1037  }
1038  os << " * Keep last written value: " << (options.KeepLastWrittenValue ? "yes" : "no") << std::endl;
1039  os << " *" << std::endl;
1040  switch(options.ReadMode) {
1041  case TestOptions::NoRead:
1042  os << " * Read mode: NONE (no reads)" << std::endl; break;
1044  os << " * Read mode: Synchronous" << std::endl; break;
1046  os << " * Read mode: Asynchronous" << std::endl; break;
1047  }
1048  os << " * Read in loop: " << (options.ReadLoop ? "yes" : "no") << std::endl;
1049  os << " * Copy old data: " << (options.CopyOldData ? "yes" : "no") << std::endl;
1050  os << "***************************************************************************************************************************" << std::endl;
1051  os << std::endl;
1052 
1053  return os;
1054 }
1055 
1056 template <typename T, PortTypes PortType>
1057 class TestRunner {
1058 public:
1060  typedef boost::shared_ptr<TestRunnerType> shared_ptr;
1061 
1063  typedef boost::shared_ptr< Writer<T,PortType> > WriterPtr;
1064  typedef boost::shared_ptr< Reader<T,PortType> > ReaderPtr;
1065  typedef boost::shared_ptr<ReaderWriterTaskContextBase> TaskPtr;
1066  typedef std::vector<WriterPtr> Writers;
1067  typedef std::vector<ReaderPtr> Readers;
1068  typedef std::vector<TaskPtr> Tasks;
1069 
1070  Writers writers;
1071  Readers readers;
1072  Tasks tasks;
1073 
1074  TestRunner(const TestOptions &options)
1075  : options_(options)
1076  {
1077 
1078  for(std::size_t index = 0; index < options_.NumberOfWriters; ++index) {
1079  WriterPtr writer(new Writer<T,PortType>("Writer", index, options_.SampleSize, options_.KeepLastWrittenValue));
1080  writers.push_back(writer);
1081  tasks.push_back(writer);
1082  switch(options_.WriteMode) {
1084  writer->setActivity(new SlaveActivity());
1085  writer->setDesiredNumberOfCycles(options_.NumberOfCycles);
1086  break;
1088  writer->setActivity(new Activity(options_.SchedulerType, options_.ThreadPriority, 0.0, options_.CpuAffinity.to_ulong(), 0, writer->getName()));
1089  // adaptor::setCpuAffinity(writer.get(), options_.CpuAffinity);
1090  BOOST_WARN_EQUAL(writer->getActivity()->thread()->getScheduler(), options_.SchedulerType);
1091  BOOST_WARN_EQUAL(writer->getActivity()->thread()->getPriority(), options_.ThreadPriority);
1092  BOOST_WARN_EQUAL(adaptor::getCpuAffinity(writer->getActivity()->thread()), options_.CpuAffinity);
1093  writer->setDesiredNumberOfCycles(options_.NumberOfCycles);
1094  break;
1095  default:
1096  break;
1097  }
1098  writer->sample.sleep_usecs_during_assignment = options.SleepUsecsDuringWriteAssignment;
1099  }
1100 
1101  for(std::size_t index = 0; index < options_.NumberOfReaders; ++index) {
1102  ReaderPtr reader(new Reader<T,PortType>("Reader", index, options_.ReadLoop, options_.CopyOldData));
1103  readers.push_back(reader);
1104  tasks.push_back(reader);
1105  switch(options_.ReadMode) {
1107  reader->setActivity(new SlaveActivity());
1108  reader->setDesiredNumberOfCycles(options_.NumberOfCycles);
1109  break;
1111  reader->setActivity(new Activity(options_.SchedulerType, options_.ThreadPriority, 0.0, options_.CpuAffinity.to_ulong(), 0, reader->getName()));
1112  // adaptor::setCpuAffinity(reader.get(), options_.CpuAffinity);
1113  BOOST_WARN_EQUAL(reader->getActivity()->thread()->getScheduler(), options_.SchedulerType);
1114  BOOST_WARN_EQUAL(reader->getActivity()->thread()->getPriority(), options_.ThreadPriority);
1115  BOOST_WARN_EQUAL(adaptor::getCpuAffinity(reader->getActivity()->thread()), options_.CpuAffinity);
1116  break;
1117  default:
1118  break;
1119  }
1120  reader->sample.sleep_usecs_during_assignment = options.SleepUsecsDuringReadAssignment;
1121  }
1122  }
1123 
1124  bool connectAll() {
1125  bool result = true;
1126  for(typename Writers::const_iterator writer = writers.begin(); writer != writers.end(); ++writer) {
1127  for(typename Readers::const_iterator reader = readers.begin(); reader != readers.end(); ++reader) {
1128  result = Adaptor<SampleType,PortType>::connect((*writer)->output_port, (*reader)->input_port, options_.policy) && result;
1129  }
1130  }
1131  return result;
1132  }
1133 
1134  void disconnectAll() {
1135  for(typename Writers::const_iterator task = writers.begin(); task != writers.end(); ++task) {
1136  (*task)->output_port.disconnect();
1137  }
1138  for(typename Readers::const_iterator task = readers.begin(); task != readers.end(); ++task) {
1139  (*task)->input_port.disconnect();
1140  }
1141  }
1142 
1143  bool startWriters() {
1144  if (options_.WriteMode == TestOptions::NoWrite) return true;
1145  bool result = true;
1146  for(typename Writers::const_iterator task = writers.begin(); task != writers.end(); ++task) {
1147  if (!(*task)->isRunning()) {
1148  result = (*task)->start() && result;
1149  }
1150  }
1151  return result;
1152  }
1153 
1154  bool startReaders() {
1155  if (options_.ReadMode == TestOptions::NoRead) return true;
1156  bool result = true;
1157  for(typename Readers::const_iterator task = readers.begin(); task != readers.end(); ++task) {
1158  if (!(*task)->isRunning()) {
1159  result = (*task)->start() && result;
1160  }
1161  }
1162  return result;
1163  }
1164 
1165  bool startAll() {
1166  return startWriters() && startReaders();
1167  }
1168 
1169  void stopAll() {
1170  for(typename Tasks::const_iterator task = tasks.begin(); task != tasks.end(); ++task) {
1171  (*task)->stop();
1172  }
1173  }
1174 
1175  void waitAll() {
1176  for(typename Tasks::const_iterator task = tasks.begin(); task != tasks.end(); ++task) {
1177  (*task)->waitUntilFinished();
1178  }
1179  }
1180 
1181  bool run() {
1182  if (!startAll()) return false;
1183 
1184  std::size_t cycle = 0;
1185  bool all_finished = false;
1186 
1187  while(!all_finished && (options_.NumberOfCycles == 0 || cycle < options_.NumberOfCycles)) {
1188  cycle++;
1189  all_finished = true;
1190 
1191  for(typename Tasks::const_iterator task = tasks.begin(); task != tasks.end(); ++task) {
1192  if (!(*task)->isRunning()) continue;
1193  adaptor::update(task->get());
1194  if (!(*task)->finished()) all_finished = false;
1195  }
1196  }
1197 
1198  waitAll();
1199  stopAll();
1200  return true;
1201  }
1202 
1203  void printStats() {
1204  Timer total_write_timer;
1205  std::map<WriteStatus, Timer> total_write_timer_by_status;
1206 
1207  std::cout << " Writer Monotonic Time [s] CPU Time [s] " << std::endl;
1208  std::cout << " Task #Cycles #Writes Total Average Max Total Average Max #Copies #Assign #WSuccess #WFailure " << std::endl;
1209  std::cout << "----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------" << std::endl;
1210  for(typename Writers::const_iterator task = writers.begin(); task != writers.end(); ++task) {
1211  std::cout << std::left
1212  << " " << std::setw(30) << (*task)->getName()
1213  << " " << std::setw(16) << (boost::lexical_cast<std::string>((*task)->getNumberOfCycles()) +
1214  ((*task)->getDesiredNumberOfCycles() ? ("/" + boost::lexical_cast<std::string>((*task)->getDesiredNumberOfCycles())) : ""))
1215  << " " << std::setw(9) << (*task)->timer.count()
1216  << " " << std::setw(12) << (*task)->timer.total().monotonic
1217  << " " << std::setw(12) << ((*task)->timer.total().monotonic / (*task)->timer.count())
1218  << " " << std::setw(12) << (*task)->timer.max().monotonic
1219  << " " << std::setw(12) << (*task)->timer.total().cputime
1220  << " " << std::setw(12) << ((*task)->timer.total().cputime / (*task)->timer.count())
1221  << " " << std::setw(12) << (*task)->timer.max().cputime
1222  << " " << std::setw(9) << (*task)->sample.getCopyCount()
1223  << " " << std::setw(9) << (*task)->sample.getAssignmentCount()
1224  << " " << std::setw(9) << (*task)->timer_by_status[WriteSuccess].count()
1225  << " " << std::setw(9) << (*task)->timer_by_status[WriteFailure].count()
1226  << std::endl;
1227  if ((*task)->timer_by_status[WriteSuccess].count()) {
1228  std::cout << std::left
1229  << " " << std::setw(30) << " (WriteSuccess)"
1230  << " " << std::setw(16) << ""
1231  << " " << std::setw(9) << (*task)->timer_by_status[WriteSuccess].count()
1232  << " " << std::setw(12) << (*task)->timer_by_status[WriteSuccess].total().monotonic
1233  << " " << std::setw(12) << ((*task)->timer_by_status[WriteSuccess].total().monotonic / (*task)->timer_by_status[WriteSuccess].count())
1234  << " " << std::setw(12) << (*task)->timer_by_status[WriteSuccess].max().monotonic
1235  << " " << std::setw(12) << (*task)->timer_by_status[WriteSuccess].total().cputime
1236  << " " << std::setw(12) << ((*task)->timer_by_status[WriteSuccess].total().cputime / (*task)->timer_by_status[WriteSuccess].count())
1237  << " " << std::setw(12) << (*task)->timer_by_status[WriteSuccess].max().cputime
1238  << std::endl;
1239  }
1240  if ((*task)->timer_by_status[WriteFailure].count()) {
1241  std::cout << std::left
1242  << " " << std::setw(30) << " (WriteFailure)"
1243  << " " << std::setw(16) << ""
1244  << " " << std::setw(9) << (*task)->timer_by_status[WriteFailure].count()
1245  << " " << std::setw(12) << (*task)->timer_by_status[WriteFailure].total().monotonic
1246  << " " << std::setw(12) << ((*task)->timer_by_status[WriteFailure].total().monotonic / (*task)->timer_by_status[WriteFailure].count())
1247  << " " << std::setw(12) << (*task)->timer_by_status[WriteFailure].max().monotonic
1248  << " " << std::setw(12) << (*task)->timer_by_status[WriteFailure].total().cputime
1249  << " " << std::setw(12) << ((*task)->timer_by_status[WriteFailure].total().cputime / (*task)->timer_by_status[WriteFailure].count())
1250  << " " << std::setw(12) << (*task)->timer_by_status[WriteFailure].max().cputime
1251  << std::endl;
1252  }
1253 
1254  total_write_timer += (*task)->timer;
1255  total_write_timer_by_status[WriteSuccess] += (*task)->timer_by_status[WriteSuccess];
1256  total_write_timer_by_status[WriteFailure] += (*task)->timer_by_status[WriteFailure];
1257  total_write_timer_by_status[NotConnected] += (*task)->timer_by_status[NotConnected];
1258  }
1259  std::cout << "----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------" << std::endl;
1260  std::cout << std::left
1261  << " " << std::setw(30) << "Total"
1262  << " " << std::setw(16) << ""
1263  << " " << std::setw(9) << total_write_timer.count()
1264  << " " << std::setw(12) << total_write_timer.total().monotonic
1265  << " " << std::setw(12) << (total_write_timer.total().monotonic / total_write_timer.count())
1266  << " " << std::setw(12) << total_write_timer.max().monotonic
1267  << " " << std::setw(12) << total_write_timer.total().cputime
1268  << " " << std::setw(12) << (total_write_timer.total().cputime / total_write_timer.count())
1269  << " " << std::setw(12) << total_write_timer.max().cputime
1270  << " " << std::setw(9) << ""
1271  << " " << std::setw(9) << ""
1272  << " " << std::setw(9) << total_write_timer_by_status[WriteSuccess].count()
1273  << " " << std::setw(9) << total_write_timer_by_status[WriteFailure].count()
1274  << std::endl;
1275  std::cout << std::left
1276  << " " << std::setw(30) << "Average"
1277  << " " << std::setw(16) << ""
1278  << " " << std::setw(9) << total_write_timer.count() / writers.size()
1279  << " " << std::setw(12) << total_write_timer.total().monotonic / writers.size()
1280  << " " << std::setw(12) << (total_write_timer.total().monotonic / total_write_timer.count()) / writers.size()
1281  << " " << std::setw(12) << total_write_timer.max().monotonic / writers.size()
1282  << " " << std::setw(12) << total_write_timer.total().cputime / writers.size()
1283  << " " << std::setw(12) << (total_write_timer.total().cputime / total_write_timer.count()) / writers.size()
1284  << " " << std::setw(12) << total_write_timer.max().cputime / writers.size()
1285  << " " << std::setw(9) << ""
1286  << " " << std::setw(9) << ""
1287  << " " << std::setw(9) << total_write_timer_by_status[WriteSuccess].count() / writers.size()
1288  << " " << std::setw(9) << total_write_timer_by_status[WriteFailure].count() / writers.size()
1289  << std::endl;
1290  std::cout << std::endl;
1291 
1292  Timer total_read_timer;
1293  std::map<FlowStatus, Timer> total_read_timer_by_status;
1294 
1295  std::cout << " Reader Monotonic Time [s] CPU Time [s] " << std::endl;
1296  std::cout << " Task #Cycles #Reads Total Average Max Total Average Max #NewData #OldData #NoData " << std::endl;
1297  std::cout << "---------------------------------------------------------------------------------------------------------------------------------------------------------------------" << std::endl;
1298  for(typename Readers::const_iterator task = readers.begin(); task != readers.end(); ++task) {
1299  std::cout << std::left
1300  << " " << std::setw(30) << (*task)->getName()
1301  << " " << std::setw(16) << (boost::lexical_cast<std::string>((*task)->getNumberOfCycles()) +
1302  ((*task)->getDesiredNumberOfCycles() ? ("/" + boost::lexical_cast<std::string>((*task)->getDesiredNumberOfCycles())) : ""))
1303  << " " << std::setw(9) << (*task)->timer.count()
1304  << " " << std::setw(12) << (*task)->timer.total().monotonic
1305  << " " << std::setw(12) << ((*task)->timer.total().monotonic / (*task)->timer.count())
1306  << " " << std::setw(12) << (*task)->timer.max().monotonic
1307  << " " << std::setw(12) << (*task)->timer.total().cputime
1308  << " " << std::setw(12) << ((*task)->timer.total().cputime / (*task)->timer.count())
1309  << " " << std::setw(12) << (*task)->timer.max().cputime
1310  << " " << std::setw(9) << (*task)->timer_by_status[NewData].count()
1311  << " " << std::setw(9) << (*task)->timer_by_status[OldData].count()
1312  << " " << std::setw(9) << (*task)->timer_by_status[NoData].count()
1313  << std::endl;
1314  if ((*task)->timer_by_status[NewData].count()) {
1315  std::cout << std::left
1316  << " " << std::setw(30) << " (NewData)"
1317  << " " << std::setw(16) << ""
1318  << " " << std::setw(9) << (*task)->timer_by_status[NewData].count()
1319  << " " << std::setw(12) << (*task)->timer_by_status[NewData].total().monotonic
1320  << " " << std::setw(12) << ((*task)->timer_by_status[NewData].total().monotonic / (*task)->timer_by_status[NewData].count())
1321  << " " << std::setw(12) << (*task)->timer_by_status[NewData].max().monotonic
1322  << " " << std::setw(12) << (*task)->timer_by_status[NewData].total().cputime
1323  << " " << std::setw(12) << ((*task)->timer_by_status[NewData].total().cputime / (*task)->timer_by_status[NewData].count())
1324  << " " << std::setw(12) << (*task)->timer_by_status[NewData].max().cputime
1325  << std::endl;
1326  }
1327  if ((*task)->timer_by_status[OldData].count()) {
1328  std::cout << std::left
1329  << " " << std::setw(30) << " (OldData)"
1330  << " " << std::setw(16) << ""
1331  << " " << std::setw(9) << (*task)->timer_by_status[OldData].count()
1332  << " " << std::setw(12) << (*task)->timer_by_status[OldData].total().monotonic
1333  << " " << std::setw(12) << ((*task)->timer_by_status[OldData].total().monotonic / (*task)->timer_by_status[OldData].count())
1334  << " " << std::setw(12) << (*task)->timer_by_status[OldData].max().monotonic
1335  << " " << std::setw(12) << (*task)->timer_by_status[OldData].total().cputime
1336  << " " << std::setw(12) << ((*task)->timer_by_status[OldData].total().cputime / (*task)->timer_by_status[OldData].count())
1337  << " " << std::setw(12) << (*task)->timer_by_status[OldData].max().cputime
1338  << std::endl;
1339  }
1340  if ((*task)->timer_by_status[NoData].count()) {
1341  std::cout << std::left
1342  << " " << std::setw(30) << " (NoData)"
1343  << " " << std::setw(16) << ""
1344  << " " << std::setw(9) << (*task)->timer_by_status[NoData].count()
1345  << " " << std::setw(12) << (*task)->timer_by_status[NoData].total().monotonic
1346  << " " << std::setw(12) << ((*task)->timer_by_status[NoData].total().monotonic / (*task)->timer_by_status[NoData].count())
1347  << " " << std::setw(12) << (*task)->timer_by_status[NoData].max().monotonic
1348  << " " << std::setw(12) << (*task)->timer_by_status[NoData].total().cputime
1349  << " " << std::setw(12) << ((*task)->timer_by_status[NoData].total().cputime / (*task)->timer_by_status[NoData].count())
1350  << " " << std::setw(12) << (*task)->timer_by_status[NoData].max().cputime
1351  << std::endl;
1352  }
1353 
1354  total_read_timer += (*task)->timer;
1355  total_read_timer_by_status[NewData] += (*task)->timer_by_status[NewData];
1356  total_read_timer_by_status[OldData] += (*task)->timer_by_status[OldData];
1357  total_read_timer_by_status[NoData] += (*task)->timer_by_status[NoData];
1358  }
1359  std::cout << "---------------------------------------------------------------------------------------------------------------------------------------------------------------------" << std::endl;
1360  std::cout << std::left
1361  << " " << std::setw(30) << "Total"
1362  << " " << std::setw(16) << ""
1363  << " " << std::setw(9) << total_read_timer.count()
1364  << " " << std::setw(12) << total_read_timer.total().monotonic
1365  << " " << std::setw(12) << (total_read_timer.total().monotonic / total_read_timer.count())
1366  << " " << std::setw(12) << total_read_timer.max().monotonic
1367  << " " << std::setw(12) << total_read_timer.total().cputime
1368  << " " << std::setw(12) << (total_read_timer.total().cputime / total_read_timer.count())
1369  << " " << std::setw(12) << total_read_timer.max().cputime
1370  << " " << std::setw(9) << total_read_timer_by_status[NewData].count()
1371  << " " << std::setw(9) << total_read_timer_by_status[OldData].count()
1372  << " " << std::setw(9) << total_read_timer_by_status[NoData].count()
1373  << std::endl;
1374  std::cout << std::endl;
1375  }
1376 
1377  const TestOptions &getOptions() const { return options_; }
1378 
1379 private:
1381 };
1382 
1383 template <PortTypes PortType, typename Enabled = void>
1388 
1390  void run() {
1391  std::cout << runner->getOptions();
1392  BOOST_CHECK( runner->connectAll() );
1393  BOOST_CHECK( runner->run() );
1394 
1395  runner->printStats();
1396  std::cout << std::endl;
1397  }
1398 };
1399 
1400 template <>
1402 {
1403  options.policy.type = ConnPolicy::DATA;
1404 // options.policy.lock_policy = ConnPolicy::LOCK_FREE;
1405 }
1406 
1407 template <>
1409 {
1410  options.policy.type = ConnPolicy::BUFFER;
1411  options.policy.size = 100;
1412 // options.policy.lock_policy = ConnPolicy::LOCK_FREE;
1413 }
1414 
1415 struct EmptyReads;
1416 template <>
1418 {
1419  options.policy.type = ConnPolicy::DATA;
1420 // options.policy.lock_policy = ConnPolicy::LOCKED;
1421 
1422  options.WriteMode = TestOptions::NoWrite;
1423  options.ReadMode = TestOptions::ReadSynchronous;
1424 }
1425 
1426 // Registers the fixture into the 'registry'
1428 BOOST_FIXTURE_TEST_SUITE( DataFlowPerformanceTest_Data_Suite, DataFlowPerformanceTest_Data )
1429 
1430 #if (RTT_VERSION_MAJOR >= 2)
1431 // 7 writers, 1 reader, PerConnection
1432 BOOST_AUTO_TEST_CASE( DataFlowPerformanceTest_Data_PerConnection_7Writers1Reader )
1433 {
1434  options.NumberOfWriters = 7;
1435  options.NumberOfReaders = 1;
1436  options.policy.buffer_policy = PerConnection;
1437  runner.reset(new RunnerType(options));
1438  run();
1439 }
1440 #endif
1441 
1442 #if (RTT_VERSION_MAJOR == 1) || RTT_VERSION_GTE(2,8,99)
1443 // 7 writers, 1 reader, PerInputPort
1444 BOOST_AUTO_TEST_CASE( DataFlowPerformanceTest_Data_PerInputPort_7Writers1Reader )
1445 {
1446  options.NumberOfWriters = 7;
1447  options.NumberOfReaders = 1;
1448  options.policy.buffer_policy = PerInputPort;
1449  runner.reset(new RunnerType(options));
1450  run();
1451 }
1452 #endif
1453 
1454 #if (RTT_VERSION_MAJOR >= 2)
1455 // 1 writer, 7 readers, PerConnection
1456 BOOST_AUTO_TEST_CASE( DataFlowPerformanceTest_Data_PerConnection_1Writer7Readers )
1457 {
1458  options.NumberOfWriters = 1;
1459  options.NumberOfReaders = 7;
1460  options.policy.buffer_policy = PerConnection;
1461  runner.reset(new RunnerType(options));
1462  run();
1463 }
1464 #endif
1465 
1466 #if (RTT_VERSION_MAJOR == 1) || RTT_VERSION_GTE(2,8,99)
1467 // 1 writer, 7 readers, PerOutputPort
1468 BOOST_AUTO_TEST_CASE( DataFlowPerformanceTest_Data_PerOutputPort_1Writer7Readers )
1469 {
1470  options.NumberOfWriters = 1;
1471  options.NumberOfReaders = 7;
1472  options.policy.buffer_policy = PerOutputPort;
1473  runner.reset(new RunnerType(options));
1474  run();
1475 }
1476 #endif
1477 
1478 #if (RTT_VERSION_MAJOR >= 2)
1479 // 4 writers, 4 readers, PerConnection
1480 BOOST_AUTO_TEST_CASE( DataFlowPerformanceTest_Data_PerConnection_4Writers4Readers )
1481 {
1482  options.NumberOfWriters = 4;
1483  options.NumberOfReaders = 4;
1484  options.policy.buffer_policy = PerConnection;
1485  runner.reset(new RunnerType(options));
1486  run();
1487 }
1488 
1489 #if RTT_VERSION_GTE(2,8,99)
1490 // 4 writers, 4 readers, PerInputPort
1491 BOOST_AUTO_TEST_CASE( DataFlowPerformanceTest_Data_PerInputPort_4Writers4Readers )
1492 {
1493  options.NumberOfWriters = 4;
1494  options.NumberOfReaders = 4;
1495  options.policy.buffer_policy = PerInputPort;
1496  runner.reset(new RunnerType(options));
1497  run();
1498 }
1499 
1500 // 4 writers, 4 readers, PerOutputPort
1501 BOOST_AUTO_TEST_CASE( DataFlowPerformanceTest_Data_PerOutputPort_4Writers4Readers )
1502 {
1503  options.NumberOfWriters = 4;
1504  options.NumberOfReaders = 4;
1505  options.policy.buffer_policy = PerOutputPort;
1506  runner.reset(new RunnerType(options));
1507  run();
1508 }
1509 #endif
1510 #endif
1511 
1512 #if (RTT_VERSION_MAJOR == 1) || RTT_VERSION_GTE(2,8,99)
1513 // 4 writers, 4 readers, Shared
1514 BOOST_AUTO_TEST_CASE( DataFlowPerformanceTest_Data_Shared_4Writers4Readers )
1515 {
1516  options.NumberOfWriters = 4;
1517  options.NumberOfReaders = 4;
1518  options.policy.buffer_policy = Shared;
1519  runner.reset(new RunnerType(options));
1520  run();
1521 }
1522 #endif
1523 
1525 
1526 // Registers the fixture into the 'registry'
1528 BOOST_FIXTURE_TEST_SUITE( DataFlowPerformanceTest_Buffer_Suite, DataFlowPerformanceTest_Buffer )
1529 
1530 #if (RTT_VERSION_MAJOR >= 2)
1531 // 7 writers, 1 reader, PerConnection
1532 BOOST_AUTO_TEST_CASE( DataFlowPerformanceTest_Buffer_PerConnection_7Writers1Reader )
1533 {
1534  options.NumberOfWriters = 7;
1535  options.NumberOfReaders = 1;
1536  options.policy.buffer_policy = PerConnection;
1537  runner.reset(new RunnerType(options));
1538 
1539  std::cout << runner->getOptions();
1540  BOOST_CHECK( runner->connectAll() );
1541  BOOST_CHECK( runner->run() );
1542 
1543  runner->printStats();
1544  std::cout << std::endl;
1545  }
1546 #endif
1547 
1548 #if (RTT_VERSION_MAJOR == 1) || RTT_VERSION_GTE(2,8,99)
1549 // 7 writers, 1 reader, PerInputPort
1550 BOOST_AUTO_TEST_CASE( DataFlowPerformanceTest_Buffer_PerInputPort_7Writers1Reader )
1551 {
1552  options.NumberOfWriters = 7;
1553  options.NumberOfReaders = 1;
1554  options.policy.buffer_policy = PerInputPort;
1555  runner.reset(new RunnerType(options));
1556  run();
1557 }
1558 #endif
1559 
1560 #if (RTT_VERSION_MAJOR >= 2)
1561 // 1 writer, 7 readers, PerConnection
1562 BOOST_AUTO_TEST_CASE( DataFlowPerformanceTest_Buffer_PerInputPort_1Writer7Readers )
1563 {
1564  options.NumberOfWriters = 1;
1565  options.NumberOfReaders = 7;
1566  options.policy.buffer_policy = PerConnection;
1567  runner.reset(new RunnerType(options));
1568  run();
1569 }
1570 #endif
1571 
1572 #if (RTT_VERSION_MAJOR == 1) || RTT_VERSION_GTE(2,8,99)
1573 // 1 writer, 7 readers, PerOutputPort
1574 BOOST_AUTO_TEST_CASE( DataFlowPerformanceTest_Buffer_PerOutputPort_1Writer7Readers )
1575 {
1576  options.NumberOfWriters = 1;
1577  options.NumberOfReaders = 7;
1578  options.policy.buffer_policy = PerOutputPort;
1579  runner.reset(new RunnerType(options));
1580  run();
1581 }
1582 #endif
1583 
1584 #if (RTT_VERSION_MAJOR >= 2)
1585 // 4 writers, 4 readers, PerConnection
1586 BOOST_AUTO_TEST_CASE( DataFlowPerformanceTest_Buffer_PerConnection_4Writers4Readers )
1587 {
1588  options.NumberOfWriters = 4;
1589  options.NumberOfReaders = 4;
1590  options.policy.buffer_policy = PerConnection;
1591  runner.reset(new RunnerType(options));
1592  run();
1593 }
1594 
1595 #if RTT_VERSION_GTE(2,8,99)
1596 // 4 writers, 4 readers, PerInputPort
1597 BOOST_AUTO_TEST_CASE( DataFlowPerformanceTest_Buffer_PerInputPort_4Writers4Readers )
1598 {
1599  options.NumberOfWriters = 4;
1600  options.NumberOfReaders = 4;
1601  options.policy.buffer_policy = PerInputPort;
1602  runner.reset(new RunnerType(options));
1603  run();
1604 }
1605 
1606 // 4 writers, 4 readers, PerOutputPort
1607 BOOST_AUTO_TEST_CASE( DataFlowPerformanceTest_Buffer_PerOutputPort_4Writers4Readers )
1608 {
1609  options.NumberOfWriters = 4;
1610  options.NumberOfReaders = 4;
1611  options.policy.buffer_policy = PerOutputPort;
1612  runner.reset(new RunnerType(options));
1613  run();
1614 }
1615 #endif
1616 #endif
1617 
1618 #if (RTT_VERSION_MAJOR == 1) || RTT_VERSION_GTE(2,8,99)
1619 // 4 writers, 4 readers, Shared
1620 BOOST_AUTO_TEST_CASE( DataFlowPerformanceTest_Buffer_Shared_4Writers4Readers )
1621 {
1622  options.NumberOfWriters = 4;
1623  options.NumberOfReaders = 4;
1624  options.policy.buffer_policy = Shared;
1625  runner.reset(new RunnerType(options));
1626  run();
1627 }
1628 #endif
1629 
1631 
1632 // Registers the fixture into the 'registry'
1634 BOOST_FIXTURE_TEST_SUITE( DataFlowPerformanceTest_EmptyReads_Suite, DataFlowPerformanceTest_EmptyReads )
1635 
1636 #if (RTT_VERSION_MAJOR >= 2)
1637 // 7 writers, 1 reader, PerConnection
1638 BOOST_AUTO_TEST_CASE( DataFlowPerformanceTest_EmptyReads_PerConnection_7Writers1Reader )
1639 {
1640  options.NumberOfWriters = 7;
1641  options.NumberOfReaders = 1;
1642  options.policy.buffer_policy = PerConnection;
1643  runner.reset(new RunnerType(options));
1644  run();
1645 }
1646 #endif
1647 
1648 #if (RTT_VERSION_MAJOR == 1) || RTT_VERSION_GTE(2,8,99)
1649 // 7 writers, 1 reader, Shared
1650 BOOST_AUTO_TEST_CASE( DataFlowPerformanceTest_EmptyReads_Shared_7Writers1Reader )
1651 {
1652  options.NumberOfWriters = 7;
1653  options.NumberOfReaders = 1;
1654  options.policy.buffer_policy = Shared;
1655  runner.reset(new RunnerType(options));
1656  run();
1657 }
1658 #endif
1659 
void setDataSample(const T &sample)
Definition: OutputPort.hpp:209
timespec operator+(const timespec &a, const timespec &b)
boost::shared_ptr< Reader< T, PortType > > ReaderPtr
base::PortInterface & addPort(const std::string &name, base::PortInterface &port)
RTT::OutputPort< T > OutputPort
CopyAndAssignmentCounted< T > SampleType
static void setDataSample(OutputPort &port, const T &sample)
static WriteStatus write(OutputPort &port, const T &sample)
#define BOOST_FIXTURE_TEST_SUITE(suite_name, F)
virtual void yield()=0
static FlowStatus read(InputPort &port, T &sample, bool copy_old_data=true)
double Seconds
Definition: os/Time.hpp:53
AdaptorType::InputPort input_port
std::vector< ReaderPtr > Readers
TestRunner< SampleType, PortType > RunnerType
#define BOOST_AUTO_TEST_CASE(test_name)
timespec operator-(const timespec &a, const timespec &b)
this_type & operator=(const value_type &)
rt_mutex_t m
Definition: Mutex.hpp:97
void intrusive_ptr_release(CopyAndAssignmentCountedDetails *x)
virtual bool trigger()
Definition: TaskCore.cpp:91
boost::intrusive_ptr< CopyAndAssignmentCountedDetails > _counter_details
static const int CIRCULAR_BUFFER
Definition: ConnPolicy.hpp:113
base::InputPortInterface & addEventPort(const std::string &name, base::InputPortInterface &port, SlotFunction callback=SlotFunction())
#define SAMPLE_SIZE
static void addPort(TaskContext *tc, Port &port)
bool operator<(const timespec &a, const timespec &b)
virtual os::ThreadInterface * thread()
Definition: Activity.cpp:114
CopyAndAssignmentCounted(const this_type &other)
AdaptorType::OutputPort output_port
#define BOOST_AUTO_TEST_SUITE_END()
const Data & total() const
FlowStatus
Definition: FlowStatus.hpp:56
void keepLastWrittenValue(bool keep)
Definition: OutputPort.hpp:161
Definition: mystd.hpp:163
CopyAndAssignmentCounted(const value_type &value)
A class for running a certain piece of code in a thread.
ConnPolicy & operator==(const RTT::ConnPolicy &other)
const char * schedulerTypeToString(int scheduler)
this_type & operator=(const this_type &other)
static void GenerateRandomSample(T &sample, std::size_t size)
int usleep(unsigned int us)
Definition: fosi.cpp:58
virtual RTOS_TASK * getTask()=0
Timer & operator+=(const Timer &other)
boost::shared_ptr< ReaderWriterTaskContextBase > TaskPtr
virtual bool setCpuAffinity(unsigned cpu_affinity)
Definition: Thread.cpp:608
static const int LOCKED
Definition: ConnPolicy.hpp:116
static const bool PUSH
An object oriented wrapper around a condition variable.
Definition: Condition.hpp:60
cyg_thread thread
Definition: ecos/fosi.h:76
FlowStatus read(base::DataSourceBase::shared_ptr source)
Definition: InputPort.hpp:97
virtual bool createConnection(internal::SharedConnectionBase::shared_ptr shared_connection, ConnPolicy const &policy=ConnPolicy())=0
static const int DATA
Definition: ConnPolicy.hpp:111
base::InputPortInterface & addEventPort(const std::string &name, base::InputPortInterface &port, SlotFunction callback=SlotFunction())
const Data & min() const
RTT::base::PortInterface Port
std::vector< TaskPtr > Tasks
CopyAndAssignmentCounted< T > SampleType
TestRunner< T, PortType > TestRunnerType
int oro_atomic_read(oro_atomic_t *a)
const std::string & getName() const
Data & operator+=(const Data &other)
Timer & updateFrom(const Timer &other)
const int HighestPriority
Definition: ecosthreads.cpp:45
DataFlowPerformanceTest_< DataPortType > DataFlowPerformanceTest_Data
std::map< FlowStatus, Timer > timer_by_status
CopyAndAssignmentCounted< T > SampleType
printstream cout
Definition: rtstreams.cpp:45
DataFlowInterface * ports()
std::map< WriteStatus, Timer > timer_by_status
#define SAMPLE_TYPE
std::size_t count() const
void oro_atomic_inc(oro_atomic_t *a)
static RTT_UNUSED bool setCpuAffinity(ThreadInterface *thread, const std::bitset< 16 > &cpu_affinity)
virtual bool update()
Definition: TaskCore.cpp:86
std::vector< WriterPtr > Writers
base::PortInterface & addPort(const std::string &name, base::PortInterface &port)
void ORO_ATOMIC_SETUP(oro_atomic_t *a, int n)
TestRunner(const TestOptions &options)
virtual bool connectTo(PortInterface *other, ConnPolicy const &policy)
const Data & last() const
void ORO_ATOMIC_CLEANUP(oro_atomic_t *a)
WriteStatus write(const T &sample)
Definition: OutputPort.hpp:243
Adaptor< SampleType, PortType > AdaptorType
void intrusive_ptr_add_ref(CopyAndAssignmentCountedDetails *x)
virtual bool connectTo(PortInterface *other, ConnPolicy const &policy)
virtual unsigned getCpuAffinity() const
Definition: Thread.cpp:613
std::bitset< 16 > CpuAffinity
#define CLOCK_REALTIME
Definition: macosx/fosi.h:101
const TestOptions & getOptions() const
basic_ostreams & endl(basic_ostreams &s)
Definition: rtstreams.cpp:110
static const int LOCK_FREE
Definition: ConnPolicy.hpp:117
std::ostream & operator<<(std::ostream &os, const ConnPolicy &cp)
An Activity executes a RunnableInterface object in a (periodic) thread.
Definition: Activity.hpp:70
enum TestOptions::@8 WriteMode
enum TestOptions::@9 ReadMode
virtual void lock()
Definition: Mutex.hpp:120
int oro_atomic_dec_and_test(oro_atomic_t *a)
ConnPolicy(int type, int lock_policy)
static std::bitset< 16 > getDefaultCpuAffinity()
#define NUMBER_OF_CYCLES
static void yield(TaskContext *tc)
Timer & operator+=(const Data &other)
static int clock_gettime(int clk_id, struct timespec *tp)
Definition: macosx/fosi.h:102
virtual os::ThreadInterface * thread()=0
void oro_atomic_set(oro_atomic_t *a, int n)
static void addEventPort(TaskContext *tc, InputPort &port)
const Data & max() const
Writer(const std::string &name, std::size_t index, std::size_t sample_size=1, bool keep_last_written_value=false)
static const int UNSYNC
Definition: ConnPolicy.hpp:115
timespec operator/(const timespec &a, unsigned long divider)
static void GenerateOrderedSample(T &sample, std::size_t size)
static std::bitset< 16 > getCpuAffinity(ThreadInterface *thread)
#define ORO_SCHED_RT
Definition: ecos/fosi.h:61
Timer(const std::string &name="")
static const int BUFFER
Definition: ConnPolicy.hpp:112
An object oriented wrapper around a non recursive mutex.
Definition: Mutex.hpp:92
#define _stringify(x)
boost::shared_ptr< Writer< T, PortType > > WriterPtr
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:53
ReaderWriterTaskContextBase(const std::string &name, std::size_t index=0)
SAMPLE_TYPE SampleType
Reader(const std::string &name, std::size_t index, bool read_loop, bool copy_old_data)
Adaptor< SampleType, PortType > AdaptorType
static const bool PULL
std::string name_id
Definition: ConnPolicy.hpp:256
An base::ActivityInterface implementation which executes &#39;step&#39; upon the invocation of &#39;execute()&#39;...
ReaderWriterTaskContextBase & setDesiredNumberOfCycles(std::size_t n)
CopyAndAssignmentCounted< Derived > this_type
#define ORO_SCHED_OTHER
Definition: ecos/fosi.h:62
void oro_atomic_dec(oro_atomic_t *a)
static bool connect(OutputPort &output_port, InputPort &input_port, const ConnPolicy &cp)
base::ActivityInterface * getActivity()
static const int UNBUFFERED
Definition: ConnPolicy.hpp:110
static bool update(TaskContext *tc)
boost::shared_ptr< TestRunnerType > shared_ptr
RTT::InputPort< T > InputPort
MutexLock is a scope based Monitor, protecting critical sections with a Mutex object through locking ...
Definition: MutexLock.hpp:51
bool wait(Mutex &m)
Definition: Condition.hpp:90
static void ResizeSample(T &sample, std::size_t size)
static bool trigger(TaskContext *tc)


rtt
Author(s): RTT Developers
autogenerated on Fri Oct 25 2019 03:59:32