pipeline.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017, 2018 Simon Rasmussen (refactor)
3  *
4  * Copyright 2015, 2016 Thomas Timm Andersen (original version)
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 #pragma once
20 
21 #include <atomic>
22 #include <chrono>
23 #include <thread>
24 #include <vector>
25 #include "ur_modern_driver/log.h"
27 
28 using namespace moodycamel;
29 using namespace std;
30 
31 template <typename T>
32 class IConsumer
33 {
34 public:
35  virtual void setupConsumer()
36  {
37  }
38  virtual void teardownConsumer()
39  {
40  }
41  virtual void stopConsumer()
42  {
43  }
44  virtual void onTimeout()
45  {
46  }
47 
48  virtual bool consume(shared_ptr<T> product) = 0;
49 };
50 
51 template <typename T>
52 class MultiConsumer : public IConsumer<T>
53 {
54 private:
55  std::vector<IConsumer<T>*> consumers_;
56 
57 public:
58  MultiConsumer(std::vector<IConsumer<T>*> consumers) : consumers_(consumers)
59  {
60  }
61 
62  virtual void setupConsumer()
63  {
64  for (auto& con : consumers_)
65  {
66  con->setupConsumer();
67  }
68  }
69  virtual void teardownConsumer()
70  {
71  for (auto& con : consumers_)
72  {
73  con->teardownConsumer();
74  }
75  }
76  virtual void stopConsumer()
77  {
78  for (auto& con : consumers_)
79  {
80  con->stopConsumer();
81  }
82  }
83  virtual void onTimeout()
84  {
85  for (auto& con : consumers_)
86  {
87  con->onTimeout();
88  }
89  }
90 
91  bool consume(shared_ptr<T> product)
92  {
93  bool res = true;
94  for (auto& con : consumers_)
95  {
96  if (!con->consume(product))
97  res = false;
98  }
99  return res;
100  }
101 };
102 
103 template <typename T>
105 {
106 public:
107  virtual void setupProducer()
108  {
109  }
110  virtual void teardownProducer()
111  {
112  }
113  virtual void stopProducer()
114  {
115  }
116 
117  virtual bool tryGet(std::vector<unique_ptr<T>>& products) = 0;
118 };
119 
121 {
122 public:
123  virtual void started(std::string name)
124  {
125  }
126  virtual void stopped(std::string name)
127  {
128  }
129 };
130 
131 template <typename T>
132 class Pipeline
133 {
134 private:
135  typedef std::chrono::high_resolution_clock Clock;
136  typedef Clock::time_point Time;
139  std::string name_;
142  atomic<bool> running_;
143  thread pThread_, cThread_;
144 
146  {
147  producer_.setupProducer();
148  std::vector<unique_ptr<T>> products;
149  while (running_)
150  {
151  if (!producer_.tryGet(products))
152  {
153  break;
154  }
155 
156  for (auto& p : products)
157  {
158  if (!queue_.try_enqueue(std::move(p)))
159  {
160  LOG_ERROR("Pipeline producer overflowed! <%s>", name_.c_str());
161  }
162  }
163 
164  products.clear();
165  }
166  producer_.teardownProducer();
167  LOG_DEBUG("Pipeline producer ended! <%s>", name_.c_str());
168  consumer_.stopConsumer();
169  running_ = false;
170  notifier_.stopped(name_);
171  }
172 
174  {
175  consumer_.setupConsumer();
176  unique_ptr<T> product;
177  while (running_)
178  {
179  // timeout was chosen because we should receive messages
180  // at roughly 125hz (every 8ms) and have to update
181  // the controllers (i.e. the consumer) with *at least* 125Hz
182  // So we update the consumer more frequently via onTimeout
183  if (!queue_.wait_dequeue_timed(product, std::chrono::milliseconds(8)))
184  {
185  consumer_.onTimeout();
186  continue;
187  }
188 
189  if (!consumer_.consume(std::move(product)))
190  break;
191  }
192  consumer_.teardownConsumer();
193  LOG_DEBUG("Pipeline consumer ended! <%s>", name_.c_str());
194  producer_.stopProducer();
195  running_ = false;
196  notifier_.stopped(name_);
197  }
198 
199 public:
200  Pipeline(IProducer<T>& producer, IConsumer<T>& consumer, std::string name, INotifier& notifier)
201  : producer_(producer), consumer_(consumer), name_(name), notifier_(notifier), queue_{ 32 }, running_{ false }
202  {
203  }
204 
205  void run()
206  {
207  if (running_)
208  return;
209 
210  running_ = true;
211  pThread_ = thread(&Pipeline::run_producer, this);
212  cThread_ = thread(&Pipeline::run_consumer, this);
213  notifier_.started(name_);
214  }
215 
216  void stop()
217  {
218  if (!running_)
219  return;
220 
221  LOG_DEBUG("Stopping pipeline! <%s>", name_.c_str());
222 
223  consumer_.stopConsumer();
224  producer_.stopProducer();
225 
226  running_ = false;
227 
228  pThread_.join();
229  cThread_.join();
230  notifier_.stopped(name_);
231  }
232 };
MultiConsumer(std::vector< IConsumer< T > * > consumers)
Definition: pipeline.h:58
virtual void teardownProducer()
Definition: pipeline.h:110
virtual void onTimeout()
Definition: pipeline.h:44
virtual void setupProducer()
Definition: pipeline.h:107
IConsumer< T > & consumer_
Definition: pipeline.h:138
void run()
Definition: pipeline.h:205
bool wait_dequeue_timed(U &result, std::int64_t timeout_usecs)
bool consume(shared_ptr< T > product)
Definition: pipeline.h:91
virtual void onTimeout()
Definition: pipeline.h:83
#define LOG_DEBUG(format,...)
Definition: log.h:33
std::string name_
Definition: pipeline.h:139
void run_producer()
Definition: pipeline.h:145
virtual void setupConsumer()
Definition: pipeline.h:35
virtual void setupConsumer()
Definition: pipeline.h:62
virtual void stopConsumer()
Definition: pipeline.h:41
virtual bool tryGet(std::vector< unique_ptr< T >> &products)=0
void stop()
Definition: pipeline.h:216
std::chrono::high_resolution_clock Clock
Definition: pipeline.h:135
INotifier & notifier_
Definition: pipeline.h:140
virtual void stopProducer()
Definition: pipeline.h:113
virtual void stopConsumer()
Definition: pipeline.h:76
AE_FORCEINLINE bool try_enqueue(T const &element)
virtual void stopped(std::string name)
Definition: pipeline.h:126
virtual void started(std::string name)
Definition: pipeline.h:123
virtual void teardownConsumer()
Definition: pipeline.h:69
std::vector< IConsumer< T > * > consumers_
Definition: pipeline.h:55
IProducer< T > & producer_
Definition: pipeline.h:137
virtual void teardownConsumer()
Definition: pipeline.h:38
Pipeline(IProducer< T > &producer, IConsumer< T > &consumer, std::string name, INotifier &notifier)
Definition: pipeline.h:200
atomic< bool > running_
Definition: pipeline.h:142
Clock::time_point Time
Definition: pipeline.h:136
virtual bool consume(shared_ptr< T > product)=0
#define LOG_ERROR(format,...)
Definition: log.h:36
BlockingReaderWriterQueue< unique_ptr< T > > queue_
Definition: pipeline.h:141
void run_consumer()
Definition: pipeline.h:173
thread pThread_
Definition: pipeline.h:143


ur_modern_driver
Author(s): Thomas Timm Andersen, Simon Rasmussen
autogenerated on Fri Jun 26 2020 03:37:00