79 virtual bool consume(std::shared_ptr<T> product) = 0;
109 for (
auto& con : consumers_)
111 con->setupConsumer();
119 for (
auto& con : consumers_)
121 con->teardownConsumer();
129 for (
auto& con : consumers_)
139 for (
auto& con : consumers_)
155 for (
auto& con : consumers_)
157 if (!con->consume(product))
169 template <
typename T>
204 virtual bool tryGet(std::vector<std::unique_ptr<T>>& products) = 0;
234 template <
typename T>
238 typedef std::chrono::high_resolution_clock
Clock;
239 typedef Clock::time_point
Time;
251 const bool producer_fifo_scheduling =
false)
252 : producer_(producer)
253 , consumer_(consumer)
255 , notifier_(notifier)
258 , producer_fifo_scheduling_(producer_fifo_scheduling)
271 : producer_(producer)
274 , notifier_(notifier)
277 , producer_fifo_scheduling_(producer_fifo_scheduling)
292 producer_.setupProducer();
293 if (consumer_ !=
nullptr)
294 consumer_->setupConsumer();
306 producer_.startProducer();
308 if (consumer_ !=
nullptr)
310 notifier_.started(name_);
325 producer_.stopProducer();
326 if (pThread_.joinable())
330 if (cThread_.joinable())
334 notifier_.stopped(name_);
351 while (queue_.tryDequeue(product))
357 return res || queue_.waitDequeTimed(product, timeout);
373 if (producer_fifo_scheduling_)
375 pthread_t this_thread = pthread_self();
376 const int max_thread_priority = sched_get_priority_max(SCHED_FIFO);
379 std::vector<std::unique_ptr<T>> products;
382 if (!producer_.
tryGet(products))
389 for (
auto& p : products)
393 URCL_LOG_ERROR(
"Pipeline producer overflowed! <%s>", name_.c_str());
405 std::unique_ptr<T> product;
412 if (!queue_.
waitDequeTimed(product, std::chrono::milliseconds(8)))
418 if (!consumer_->
consume(std::move(product)))
virtual void onTimeout()
Functionality for handling consumer timeouts.
bool producer_fifo_scheduling_
#define URCL_LOG_ERROR(...)
Consumer, that allows one product to be consumed by multiple arbitrary conusmers. ...
virtual void teardownProducer()
Fully tears down the producer - by default no difference to stopping it.
virtual void started(std::string name)
Start notification.
virtual ~Pipeline()
The Pipeline object's destructor, stopping the pipeline and joining all running threads.
void run()
Starts the producer and, if existing, the consumer in new threads.
MultiConsumer(std::vector< IConsumer< T > *> consumers)
Creates a new MultiConsumer object.
void stop()
Stops the pipeline and all running threads.
virtual void onTimeout()
Triggers timeout functionality for all registered consumers.
virtual bool consume(std::shared_ptr< T > product)=0
Consumes a product, utilizing it's contents.
bool consume(std::shared_ptr< T > product)
Consumes a given product with all registered consumers.
virtual void stopConsumer()
Stops the consumer.
virtual void setupProducer()
Set-up functionality of the producers.
Pipeline(IProducer< T > &producer, std::string name, INotifier ¬ifier, const bool producer_fifo_scheduling=false)
Creates a new Pipeline object, registering producer and notifier while no consumer is used...
bool setFiFoScheduling(pthread_t &thread, const int priority)
virtual void teardownConsumer()
Fully tears down the consumer - by default no difference to stopping it.
virtual void setupConsumer()
Sets up all registered consumers.
virtual void stopConsumer()
Stops all registered consumers.
Pipeline(IProducer< T > &producer, IConsumer< T > *consumer, std::string name, INotifier ¬ifier, const bool producer_fifo_scheduling=false)
Creates a new Pipeline object, registering producer, consumer and notifier. Additionally, an empty queue is initialized.
moodycamel::BlockingReaderWriterQueue< std::unique_ptr< T > > queue_
std::chrono::high_resolution_clock Clock
Parent class for for arbitrary consumers.
AE_FORCEINLINE bool tryEnqueue(T const &element)
virtual bool tryGet(std::vector< std::unique_ptr< T >> &products)=0
Reads packages from some source and produces corresponding objects.
Parent class for notifiers.
IConsumer< T > * consumer_
bool getLatestProduct(std::unique_ptr< T > &product, std::chrono::milliseconds timeout)
Returns the most recent package in the queue. Can be used instead of registering a consumer...
std::vector< IConsumer< T > * > consumers_
std::atomic< bool > running_
#define URCL_LOG_DEBUG(...)
IProducer< T > & producer_
virtual void startProducer()
virtual void teardownConsumer()
Tears down all registered consumers.
Parent class for arbitrary producers of packages.
virtual void stopProducer()
Stops the producer.
bool waitDequeTimed(U &result, std::int64_t timeout_usecs)
virtual void stopped(std::string name)
Stop notification.
The Pipepline manages the production and optionally consumption of packages. Cyclically the producer ...
virtual void setupConsumer()
Set-up functionality of the consumer.