pipeline.h
Go to the documentation of this file.
1 /*
2  * Copyright 2019, FZI Forschungszentrum Informatik (templating)
3  *
4  * Copyright 2017, 2018 Simon Rasmussen (refactor)
5  *
6  * Copyright 2015, 2016 Thomas Timm Andersen (original version)
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #pragma once
22 
24 #include "ur_client_library/log.h"
27 #include <atomic>
28 #include <chrono>
29 #include <thread>
30 #include <vector>
31 #include <fstream>
32 
33 namespace urcl
34 {
35 namespace comm
36 {
42 template <typename T>
43 class IConsumer
44 {
45 public:
49  virtual void setupConsumer()
50  {
51  }
55  virtual void teardownConsumer()
56  {
57  stopConsumer();
58  }
62  virtual void stopConsumer()
63  {
64  }
68  virtual void onTimeout()
69  {
70  }
71 
79  virtual bool consume(std::shared_ptr<T> product) = 0;
80 };
81 
88 template <typename T>
89 class MultiConsumer : public IConsumer<T>
90 {
91 private:
92  std::vector<IConsumer<T>*> consumers_;
93 
94 public:
100  MultiConsumer(std::vector<IConsumer<T>*> consumers) : consumers_(consumers)
101  {
102  }
103 
107  virtual void setupConsumer()
108  {
109  for (auto& con : consumers_)
110  {
111  con->setupConsumer();
112  }
113  }
117  virtual void teardownConsumer()
118  {
119  for (auto& con : consumers_)
120  {
121  con->teardownConsumer();
122  }
123  }
127  virtual void stopConsumer()
128  {
129  for (auto& con : consumers_)
130  {
131  con->stopConsumer();
132  }
133  }
137  virtual void onTimeout()
138  {
139  for (auto& con : consumers_)
140  {
141  con->onTimeout();
142  }
143  }
144 
152  bool consume(std::shared_ptr<T> product)
153  {
154  bool res = true;
155  for (auto& con : consumers_)
156  {
157  if (!con->consume(product))
158  res = false;
159  }
160  return res;
161  }
162 };
163 
169 template <typename T>
171 {
172 public:
176  virtual void setupProducer()
177  {
178  }
182  virtual void teardownProducer()
183  {
184  stopProducer();
185  }
189  virtual void stopProducer()
190  {
191  }
192 
193  virtual void startProducer()
194  {
195  }
196 
204  virtual bool tryGet(std::vector<std::unique_ptr<T>>& products) = 0;
205 };
206 
211 {
212 public:
216  virtual void started(std::string name)
217  {
218  }
222  virtual void stopped(std::string name)
223  {
224  }
225 };
226 
234 template <typename T>
235 class Pipeline
236 {
237 public:
238  typedef std::chrono::high_resolution_clock Clock;
239  typedef Clock::time_point Time;
250  Pipeline(IProducer<T>& producer, IConsumer<T>* consumer, std::string name, INotifier& notifier,
251  const bool producer_fifo_scheduling = false)
252  : producer_(producer)
253  , consumer_(consumer)
254  , name_(name)
255  , notifier_(notifier)
256  , queue_{ 32 }
257  , running_{ false }
258  , producer_fifo_scheduling_(producer_fifo_scheduling)
259  {
260  }
270  Pipeline(IProducer<T>& producer, std::string name, INotifier& notifier, const bool producer_fifo_scheduling = false)
271  : producer_(producer)
272  , consumer_(nullptr)
273  , name_(name)
274  , notifier_(notifier)
275  , queue_{ 32 }
276  , running_{ false }
277  , producer_fifo_scheduling_(producer_fifo_scheduling)
278  {
279  }
280 
284  virtual ~Pipeline()
285  {
286  URCL_LOG_DEBUG("Destructing pipeline");
287  stop();
288  }
289 
290  void init()
291  {
292  producer_.setupProducer();
293  if (consumer_ != nullptr)
294  consumer_->setupConsumer();
295  }
296 
300  void run()
301  {
302  if (running_)
303  return;
304 
305  running_ = true;
306  producer_.startProducer();
307  pThread_ = std::thread(&Pipeline::runProducer, this);
308  if (consumer_ != nullptr)
309  cThread_ = std::thread(&Pipeline::runConsumer, this);
310  notifier_.started(name_);
311  }
312 
316  void stop()
317  {
318  if (!running_)
319  return;
320 
321  URCL_LOG_DEBUG("Stopping pipeline! <%s>", name_.c_str());
322 
323  running_ = false;
324 
325  producer_.stopProducer();
326  if (pThread_.joinable())
327  {
328  pThread_.join();
329  }
330  if (cThread_.joinable())
331  {
332  cThread_.join();
333  }
334  notifier_.stopped(name_);
335  }
336 
347  bool getLatestProduct(std::unique_ptr<T>& product, std::chrono::milliseconds timeout)
348  {
349  // If the queue has more than one package, get the latest one.
350  bool res = false;
351  while (queue_.tryDequeue(product))
352  {
353  res = true;
354  }
355 
356  // If the queue is empty, wait for a package.
357  return res || queue_.waitDequeTimed(product, timeout);
358  }
359 
360 private:
363  std::string name_;
366  std::atomic<bool> running_;
367  std::thread pThread_, cThread_;
369 
370  void runProducer()
371  {
372  URCL_LOG_DEBUG("Starting up producer");
373  if (producer_fifo_scheduling_)
374  {
375  pthread_t this_thread = pthread_self();
376  const int max_thread_priority = sched_get_priority_max(SCHED_FIFO);
377  setFiFoScheduling(this_thread, max_thread_priority);
378  }
379  std::vector<std::unique_ptr<T>> products;
380  while (running_)
381  {
382  if (!producer_.tryGet(products))
383  {
384  producer_.teardownProducer();
385  running_ = false;
386  break;
387  }
388 
389  for (auto& p : products)
390  {
391  if (!queue_.tryEnqueue(std::move(p)))
392  {
393  URCL_LOG_ERROR("Pipeline producer overflowed! <%s>", name_.c_str());
394  }
395  }
396 
397  products.clear();
398  }
399  URCL_LOG_DEBUG("Pipeline producer ended! <%s>", name_.c_str());
400  notifier_.stopped(name_);
401  }
402 
403  void runConsumer()
404  {
405  std::unique_ptr<T> product;
406  while (running_)
407  {
408  // timeout was chosen because we should receive messages
409  // at roughly 125hz (every 8ms) and have to update
410  // the controllers (i.e. the consumer) with *at least* 125Hz
411  // So we update the consumer more frequently via onTimeout
412  if (!queue_.waitDequeTimed(product, std::chrono::milliseconds(8)))
413  {
414  consumer_->onTimeout();
415  continue;
416  }
417 
418  if (!consumer_->consume(std::move(product)))
419  {
420  consumer_->teardownConsumer();
421  running_ = false;
422  break;
423  }
424  }
425  consumer_->stopConsumer();
426  URCL_LOG_DEBUG("Pipeline consumer ended! <%s>", name_.c_str());
427  notifier_.stopped(name_);
428  }
429 };
430 } // namespace comm
431 } // namespace urcl
virtual void onTimeout()
Functionality for handling consumer timeouts.
Definition: pipeline.h:68
bool producer_fifo_scheduling_
Definition: pipeline.h:368
#define URCL_LOG_ERROR(...)
Definition: log.h:26
Consumer, that allows one product to be consumed by multiple arbitrary conusmers. ...
Definition: pipeline.h:89
virtual void teardownProducer()
Fully tears down the producer - by default no difference to stopping it.
Definition: pipeline.h:182
virtual void started(std::string name)
Start notification.
Definition: pipeline.h:216
virtual ~Pipeline()
The Pipeline object&#39;s destructor, stopping the pipeline and joining all running threads.
Definition: pipeline.h:284
void run()
Starts the producer and, if existing, the consumer in new threads.
Definition: pipeline.h:300
MultiConsumer(std::vector< IConsumer< T > *> consumers)
Creates a new MultiConsumer object.
Definition: pipeline.h:100
void stop()
Stops the pipeline and all running threads.
Definition: pipeline.h:316
virtual void onTimeout()
Triggers timeout functionality for all registered consumers.
Definition: pipeline.h:137
virtual bool consume(std::shared_ptr< T > product)=0
Consumes a product, utilizing it&#39;s contents.
Clock::time_point Time
Definition: pipeline.h:239
bool consume(std::shared_ptr< T > product)
Consumes a given product with all registered consumers.
Definition: pipeline.h:152
virtual void stopConsumer()
Stops the consumer.
Definition: pipeline.h:62
virtual void setupProducer()
Set-up functionality of the producers.
Definition: pipeline.h:176
Pipeline(IProducer< T > &producer, std::string name, INotifier &notifier, const bool producer_fifo_scheduling=false)
Creates a new Pipeline object, registering producer and notifier while no consumer is used...
Definition: pipeline.h:270
bool setFiFoScheduling(pthread_t &thread, const int priority)
Definition: helpers.cpp:38
virtual void teardownConsumer()
Fully tears down the consumer - by default no difference to stopping it.
Definition: pipeline.h:55
virtual void setupConsumer()
Sets up all registered consumers.
Definition: pipeline.h:107
virtual void stopConsumer()
Stops all registered consumers.
Definition: pipeline.h:127
Pipeline(IProducer< T > &producer, IConsumer< T > *consumer, std::string name, INotifier &notifier, const bool producer_fifo_scheduling=false)
Creates a new Pipeline object, registering producer, consumer and notifier. Additionally, an empty queue is initialized.
Definition: pipeline.h:250
moodycamel::BlockingReaderWriterQueue< std::unique_ptr< T > > queue_
Definition: pipeline.h:365
std::chrono::high_resolution_clock Clock
Definition: pipeline.h:238
Parent class for for arbitrary consumers.
Definition: pipeline.h:43
AE_FORCEINLINE bool tryEnqueue(T const &element)
virtual bool tryGet(std::vector< std::unique_ptr< T >> &products)=0
Reads packages from some source and produces corresponding objects.
Parent class for notifiers.
Definition: pipeline.h:210
IConsumer< T > * consumer_
Definition: pipeline.h:362
bool getLatestProduct(std::unique_ptr< T > &product, std::chrono::milliseconds timeout)
Returns the most recent package in the queue. Can be used instead of registering a consumer...
Definition: pipeline.h:347
std::vector< IConsumer< T > * > consumers_
Definition: pipeline.h:92
std::atomic< bool > running_
Definition: pipeline.h:366
#define URCL_LOG_DEBUG(...)
Definition: log.h:23
std::thread pThread_
Definition: pipeline.h:367
std::string name_
Definition: pipeline.h:363
IProducer< T > & producer_
Definition: pipeline.h:361
virtual void startProducer()
Definition: pipeline.h:193
virtual void teardownConsumer()
Tears down all registered consumers.
Definition: pipeline.h:117
Parent class for arbitrary producers of packages.
Definition: pipeline.h:170
virtual void stopProducer()
Stops the producer.
Definition: pipeline.h:189
bool waitDequeTimed(U &result, std::int64_t timeout_usecs)
INotifier & notifier_
Definition: pipeline.h:364
virtual void stopped(std::string name)
Stop notification.
Definition: pipeline.h:222
The Pipepline manages the production and optionally consumption of packages. Cyclically the producer ...
Definition: pipeline.h:235
virtual void setupConsumer()
Set-up functionality of the consumer.
Definition: pipeline.h:49


ur_client_library
Author(s): Thomas Timm Andersen, Simon Rasmussen, Felix Exner, Lea Steffen, Tristan Schnell
autogenerated on Tue Jul 4 2023 02:09:47