pipeline.h
Go to the documentation of this file.
1 /*
2  * Copyright 2019, FZI Forschungszentrum Informatik (templating)
3  *
4  * Copyright 2017, 2018 Simon Rasmussen (refactor)
5  *
6  * Copyright 2015, 2016 Thomas Timm Andersen (original version)
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #pragma once
22 
24 #include "ur_client_library/log.h"
27 #include <atomic>
28 #include <chrono>
29 #include <thread>
30 #include <vector>
31 #include <fstream>
32 #include <mutex>
33 #include <algorithm>
34 
35 namespace urcl
36 {
37 namespace comm
38 {
44 template <typename T>
45 class IConsumer
46 {
47 public:
48  virtual ~IConsumer() = default;
49 
53  virtual void setupConsumer()
54  {
55  }
59  virtual void teardownConsumer()
60  {
61  stopConsumer();
62  }
66  virtual void stopConsumer()
67  {
68  }
72  virtual void onTimeout()
73  {
74  }
75 
83  virtual bool consume(std::shared_ptr<T> product) = 0;
84 };
85 
92 template <typename T>
93 class MultiConsumer : public IConsumer<T>
94 {
95 private:
96  std::vector<std::shared_ptr<IConsumer<T>>> consumers_;
97 
98 public:
104  MultiConsumer(std::vector<std::shared_ptr<IConsumer<T>>> consumers) : consumers_(consumers)
105  {
106  }
107 
113  void addConsumer(std::shared_ptr<IConsumer<T>> consumer)
114  {
115  std::lock_guard<std::mutex> lk(consumer_list_);
116  consumers_.push_back(consumer);
117  }
118 
124  void removeConsumer(std::shared_ptr<IConsumer<T>> consumer)
125  {
126  std::lock_guard<std::mutex> lk(consumer_list_);
127  auto it = std::find(consumers_.begin(), consumers_.end(), consumer);
128  if (it == consumers_.end())
129  {
130  URCL_LOG_ERROR("Unable to remove consumer as it is not part of the consumer list");
131  return;
132  }
133  consumers_.erase(it);
134  }
135 
139  virtual void setupConsumer()
140  {
141  for (auto& con : consumers_)
142  {
143  con->setupConsumer();
144  }
145  }
149  virtual void teardownConsumer()
150  {
151  for (auto& con : consumers_)
152  {
153  con->teardownConsumer();
154  }
155  }
159  virtual void stopConsumer()
160  {
161  for (auto& con : consumers_)
162  {
163  con->stopConsumer();
164  }
165  }
169  virtual void onTimeout()
170  {
171  for (auto& con : consumers_)
172  {
173  con->onTimeout();
174  }
175  }
176 
184  bool consume(std::shared_ptr<T> product)
185  {
186  std::lock_guard<std::mutex> lk(consumer_list_);
187  bool res = true;
188  for (auto& con : consumers_)
189  {
190  if (!con->consume(product))
191  res = false;
192  }
193  return res;
194  }
195 
196 private:
197  std::mutex consumer_list_;
198 };
199 
205 template <typename T>
207 {
208 public:
209  virtual ~IProducer() = default;
210 
218  virtual void setupProducer(const size_t max_num_tries = 0,
219  const std::chrono::milliseconds reconnection_time = std::chrono::seconds(10))
220  {
221  }
225  virtual void teardownProducer()
226  {
227  stopProducer();
228  }
232  virtual void stopProducer()
233  {
234  }
235 
236  virtual void startProducer()
237  {
238  }
239 
247  virtual bool tryGet(std::vector<std::unique_ptr<T>>& products) = 0;
248 };
249 
254 {
255 public:
259  virtual void started(std::string name)
260  {
261  }
265  virtual void stopped(std::string name)
266  {
267  }
268 };
269 
277 template <typename T>
278 class Pipeline
279 {
280 public:
281  typedef std::chrono::high_resolution_clock Clock;
282  typedef Clock::time_point Time;
293  Pipeline(IProducer<T>& producer, IConsumer<T>* consumer, std::string name, INotifier& notifier,
294  const bool producer_fifo_scheduling = false)
295  : producer_(producer)
296  , consumer_(consumer)
297  , name_(name)
298  , notifier_(notifier)
299  , queue_{ 32 }
300  , running_{ false }
301  , producer_fifo_scheduling_(producer_fifo_scheduling)
302  {
303  }
313  Pipeline(IProducer<T>& producer, std::string name, INotifier& notifier, const bool producer_fifo_scheduling = false)
314  : producer_(producer)
315  , consumer_(nullptr)
316  , name_(name)
317  , notifier_(notifier)
318  , queue_{ 32 }
319  , running_{ false }
320  , producer_fifo_scheduling_(producer_fifo_scheduling)
321  {
322  }
323 
327  virtual ~Pipeline()
328  {
329  URCL_LOG_DEBUG("Destructing pipeline");
330  stop();
331  }
332 
340  void init(const size_t max_num_tries = 0,
341  const std::chrono::milliseconds reconnection_time = std::chrono::seconds(10))
342  {
343  producer_.setupProducer(max_num_tries, reconnection_time);
344  if (consumer_ != nullptr)
345  consumer_->setupConsumer();
346  }
347 
351  void run()
352  {
353  if (running_)
354  return;
355 
356  running_ = true;
357  producer_.startProducer();
358  pThread_ = std::thread(&Pipeline::runProducer, this);
359  if (consumer_ != nullptr)
360  cThread_ = std::thread(&Pipeline::runConsumer, this);
362  }
363 
367  void stop()
368  {
369  if (!running_)
370  return;
371 
372  URCL_LOG_DEBUG("Stopping pipeline! <%s>", name_.c_str());
373 
374  running_ = false;
375 
376  producer_.stopProducer();
377  if (pThread_.joinable())
378  {
379  pThread_.join();
380  }
381  if (cThread_.joinable())
382  {
383  cThread_.join();
384  }
386  }
387 
398  bool getLatestProduct(std::unique_ptr<T>& product, std::chrono::milliseconds timeout)
399  {
400  // If the queue has more than one package, get the latest one.
401  bool res = false;
402  while (queue_.tryDequeue(product))
403  {
404  res = true;
405  }
406 
407  // If the queue is empty, wait for a package.
408  return res || queue_.waitDequeTimed(product, timeout);
409  }
410 
411 private:
414  std::string name_;
417  std::atomic<bool> running_;
418  std::thread pThread_, cThread_;
420 
421  void runProducer()
422  {
423  URCL_LOG_DEBUG("Starting up producer");
425  {
426  pthread_t this_thread = pthread_self();
427  const int max_thread_priority = sched_get_priority_max(SCHED_FIFO);
428  setFiFoScheduling(this_thread, max_thread_priority);
429  }
430  std::vector<std::unique_ptr<T>> products;
431  while (running_)
432  {
433  if (!producer_.tryGet(products))
434  {
435  producer_.teardownProducer();
436  running_ = false;
437  break;
438  }
439 
440  for (auto& p : products)
441  {
442  if (!queue_.tryEnqueue(std::move(p)))
443  {
444  URCL_LOG_ERROR("Pipeline producer overflowed! <%s>", name_.c_str());
445  }
446  }
447 
448  products.clear();
449  }
450  URCL_LOG_DEBUG("Pipeline producer ended! <%s>", name_.c_str());
452  }
453 
454  void runConsumer()
455  {
456  std::unique_ptr<T> product;
457  while (running_)
458  {
459  // timeout was chosen because we should receive messages
460  // at roughly 125hz (every 8ms) and have to update
461  // the controllers (i.e. the consumer) with *at least* 125Hz
462  // So we update the consumer more frequently via onTimeout
463  if (!queue_.waitDequeTimed(product, std::chrono::milliseconds(8)))
464  {
465  consumer_->onTimeout();
466  continue;
467  }
468 
469  if (!consumer_->consume(std::move(product)))
470  {
471  consumer_->teardownConsumer();
472  running_ = false;
473  break;
474  }
475  }
476  consumer_->stopConsumer();
477  URCL_LOG_DEBUG("Pipeline consumer ended! <%s>", name_.c_str());
479  }
480 };
481 } // namespace comm
482 } // namespace urcl
urcl::setFiFoScheduling
bool setFiFoScheduling(pthread_t &thread, const int priority)
Definition: helpers.cpp:46
urcl::comm::INotifier::stopped
virtual void stopped(std::string name)
Stop notification.
Definition: pipeline.h:265
urcl::comm::IConsumer::teardownConsumer
virtual void teardownConsumer()
Fully tears down the consumer - by default no difference to stopping it.
Definition: pipeline.h:59
urcl::comm::Pipeline::Pipeline
Pipeline(IProducer< T > &producer, std::string name, INotifier &notifier, const bool producer_fifo_scheduling=false)
Creates a new Pipeline object, registering producer and notifier while no consumer is used....
Definition: pipeline.h:313
urcl::comm::MultiConsumer
Consumer, that allows one product to be consumed by multiple arbitrary consumers.
Definition: pipeline.h:93
moodycamel::BlockingReaderWriterQueue::waitDequeTimed
bool waitDequeTimed(U &result, std::int64_t timeout_usecs)
Definition: readerwriterqueue.h:788
moodycamel::BlockingReaderWriterQueue
Definition: readerwriterqueue.h:691
urcl::comm::IConsumer::consume
virtual bool consume(std::shared_ptr< T > product)=0
Consumes a product, utilizing it's contents.
urcl
Definition: bin_parser.h:36
urcl::comm::IConsumer::~IConsumer
virtual ~IConsumer()=default
urcl::comm::MultiConsumer::consume
bool consume(std::shared_ptr< T > product)
Consumes a given product with all registered consumers.
Definition: pipeline.h:184
URCL_LOG_ERROR
#define URCL_LOG_ERROR(...)
Definition: log.h:26
urcl::comm::Pipeline::Time
Clock::time_point Time
Definition: pipeline.h:282
urcl::comm::IConsumer::setupConsumer
virtual void setupConsumer()
Set-up functionality of the consumer.
Definition: pipeline.h:53
urcl::comm::Pipeline::stop
void stop()
Stops the pipeline and all running threads.
Definition: pipeline.h:367
urcl::comm::MultiConsumer::teardownConsumer
virtual void teardownConsumer()
Tears down all registered consumers.
Definition: pipeline.h:149
urcl::comm::IProducer::startProducer
virtual void startProducer()
Definition: pipeline.h:236
urcl::comm::Pipeline::producer_
IProducer< T > & producer_
Definition: pipeline.h:412
URCL_LOG_DEBUG
#define URCL_LOG_DEBUG(...)
Definition: log.h:23
urcl::comm::Pipeline::consumer_
IConsumer< T > * consumer_
Definition: pipeline.h:413
urcl::comm::Pipeline::runConsumer
void runConsumer()
Definition: pipeline.h:454
urcl::comm::IProducer
Parent class for arbitrary producers of packages.
Definition: pipeline.h:206
urcl::comm::Pipeline::name_
std::string name_
Definition: pipeline.h:414
urcl::comm::IProducer::tryGet
virtual bool tryGet(std::vector< std::unique_ptr< T >> &products)=0
Reads packages from some source and produces corresponding objects.
urcl::comm::Pipeline::getLatestProduct
bool getLatestProduct(std::unique_ptr< T > &product, std::chrono::milliseconds timeout)
Returns the most recent package in the queue. Can be used instead of registering a consumer....
Definition: pipeline.h:398
urcl::comm::Pipeline::Pipeline
Pipeline(IProducer< T > &producer, IConsumer< T > *consumer, std::string name, INotifier &notifier, const bool producer_fifo_scheduling=false)
Creates a new Pipeline object, registering producer, consumer and notifier. Additionally,...
Definition: pipeline.h:293
urcl::comm::IConsumer::stopConsumer
virtual void stopConsumer()
Stops the consumer.
Definition: pipeline.h:66
urcl::comm::INotifier
Parent class for notifiers.
Definition: pipeline.h:253
urcl::comm::IProducer::~IProducer
virtual ~IProducer()=default
urcl::comm::IProducer::stopProducer
virtual void stopProducer()
Stops the producer.
Definition: pipeline.h:232
urcl::comm::MultiConsumer::consumer_list_
std::mutex consumer_list_
Definition: pipeline.h:197
urcl::comm::Pipeline::runProducer
void runProducer()
Definition: pipeline.h:421
urcl::comm::Pipeline
The Pipeline manages the production and optionally consumption of packages. Cyclically the producer i...
Definition: pipeline.h:278
helpers.h
urcl::comm::Pipeline::producer_fifo_scheduling_
bool producer_fifo_scheduling_
Definition: pipeline.h:419
urcl::comm::MultiConsumer::stopConsumer
virtual void stopConsumer()
Stops all registered consumers.
Definition: pipeline.h:159
urcl::comm::Pipeline::Clock
std::chrono::high_resolution_clock Clock
Definition: pipeline.h:281
moodycamel::BlockingReaderWriterQueue::tryDequeue
bool tryDequeue(U &result)
Definition: readerwriterqueue.h:757
urcl::comm::Pipeline::notifier_
INotifier & notifier_
Definition: pipeline.h:415
log.h
urcl::comm::MultiConsumer::MultiConsumer
MultiConsumer(std::vector< std::shared_ptr< IConsumer< T >>> consumers)
Creates a new MultiConsumer object.
Definition: pipeline.h:104
moodycamel::BlockingReaderWriterQueue::tryEnqueue
AE_FORCEINLINE bool tryEnqueue(T const &element)
Definition: readerwriterqueue.h:704
urcl::comm::IConsumer
Parent class for for arbitrary consumers.
Definition: pipeline.h:45
urcl::comm::IProducer::teardownProducer
virtual void teardownProducer()
Fully tears down the producer - by default no difference to stopping it.
Definition: pipeline.h:225
urcl::comm::MultiConsumer::onTimeout
virtual void onTimeout()
Triggers timeout functionality for all registered consumers.
Definition: pipeline.h:169
urcl::comm::Pipeline::~Pipeline
virtual ~Pipeline()
The Pipeline object's destructor, stopping the pipeline and joining all running threads.
Definition: pipeline.h:327
urcl::comm::Pipeline::queue_
moodycamel::BlockingReaderWriterQueue< std::unique_ptr< T > > queue_
Definition: pipeline.h:416
urcl::comm::IConsumer::onTimeout
virtual void onTimeout()
Functionality for handling consumer timeouts.
Definition: pipeline.h:72
package.h
readerwriterqueue.h
urcl::comm::Pipeline::pThread_
std::thread pThread_
Definition: pipeline.h:418
urcl::comm::Pipeline::cThread_
std::thread cThread_
Definition: pipeline.h:418
urcl::comm::Pipeline::init
void init(const size_t max_num_tries=0, const std::chrono::milliseconds reconnection_time=std::chrono::seconds(10))
Initialize the pipeline. Internally calls setup of producer and consumer.
Definition: pipeline.h:340
urcl::comm::IProducer::setupProducer
virtual void setupProducer(const size_t max_num_tries=0, const std::chrono::milliseconds reconnection_time=std::chrono::seconds(10))
Set-up functionality of the producers.
Definition: pipeline.h:218
urcl::comm::Pipeline::run
void run()
Starts the producer and, if existing, the consumer in new threads.
Definition: pipeline.h:351
urcl::comm::Pipeline::running_
std::atomic< bool > running_
Definition: pipeline.h:417
urcl::comm::INotifier::started
virtual void started(std::string name)
Start notification.
Definition: pipeline.h:259
urcl::comm::MultiConsumer::removeConsumer
void removeConsumer(std::shared_ptr< IConsumer< T >> consumer)
Remove a consumer from the list of consumers.
Definition: pipeline.h:124
urcl::comm::MultiConsumer::setupConsumer
virtual void setupConsumer()
Sets up all registered consumers.
Definition: pipeline.h:139
urcl::comm::MultiConsumer::addConsumer
void addConsumer(std::shared_ptr< IConsumer< T >> consumer)
Adds a new consumer to the list of consumers.
Definition: pipeline.h:113
urcl::comm::MultiConsumer::consumers_
std::vector< std::shared_ptr< IConsumer< T > > > consumers_
Definition: pipeline.h:96


ur_client_library
Author(s): Thomas Timm Andersen, Simon Rasmussen, Felix Exner, Lea Steffen, Tristan Schnell
autogenerated on Mon May 26 2025 02:35:58