78 virtual bool consume(std::shared_ptr<T> product) = 0;
108 for (
auto& con : consumers_)
110 con->setupConsumer();
118 for (
auto& con : consumers_)
120 con->teardownConsumer();
128 for (
auto& con : consumers_)
138 for (
auto& con : consumers_)
154 for (
auto& con : consumers_)
156 if (!con->consume(product))
168 template <
typename T>
203 virtual bool tryGet(std::vector<std::unique_ptr<T>>& products) = 0;
233 template <
typename T>
237 typedef std::chrono::high_resolution_clock
Clock;
238 typedef Clock::time_point
Time;
249 : producer_(producer), consumer_(consumer), name_(name), notifier_(notifier), queue_{ 32 }, running_{
false }
261 : producer_(producer), consumer_(nullptr), name_(name), notifier_(notifier), queue_{ 32 }, running_{
false }
276 producer_.setupProducer();
277 if (consumer_ !=
nullptr)
278 consumer_->setupConsumer();
290 producer_.startProducer();
292 if (consumer_ !=
nullptr)
294 notifier_.started(name_);
309 producer_.stopProducer();
310 if (pThread_.joinable())
314 if (cThread_.joinable())
318 notifier_.stopped(name_);
331 return queue_.waitDequeTimed(product, timeout);
346 std::ifstream realtime_file(
"/sys/kernel/realtime", std::ios::in);
348 realtime_file >> has_realtime;
351 const int max_thread_priority = sched_get_priority_max(SCHED_FIFO);
352 if (max_thread_priority != -1)
355 pthread_t this_thread = pthread_self();
358 struct sched_param params;
361 params.sched_priority = max_thread_priority;
363 int ret = pthread_setschedparam(this_thread, SCHED_FIFO, ¶ms);
366 URCL_LOG_ERROR(
"Unsuccessful in setting producer thread realtime priority. Error code: %d", ret);
370 ret = pthread_getschedparam(this_thread, &policy, ¶ms);
373 std::cout <<
"Couldn't retrieve real-time scheduling paramers" << std::endl;
377 if (policy != SCHED_FIFO)
387 URCL_LOG_INFO(
"Thread priority is %d", params.sched_priority);
391 URCL_LOG_ERROR(
"Could not get maximum thread priority for producer thread");
396 URCL_LOG_WARN(
"No realtime capabilities found. Consider using a realtime system for better performance");
398 std::vector<std::unique_ptr<T>> products;
401 if (!producer_.
tryGet(products))
408 for (
auto& p : products)
412 URCL_LOG_ERROR(
"Pipeline producer overflowed! <%s>", name_.c_str());
424 std::unique_ptr<T> product;
431 if (!queue_.
waitDequeTimed(product, std::chrono::milliseconds(8)))
437 if (!consumer_->
consume(std::move(product)))
virtual void onTimeout()
Functionality for handling consumer timeouts.
Pipeline(IProducer< T > &producer, IConsumer< T > *consumer, std::string name, INotifier ¬ifier)
Creates a new Pipeline object, registering producer, consumer and notifier. Additionally, an empty queue is initialized.
#define URCL_LOG_ERROR(...)
Consumer, that allows one product to be consumed by multiple arbitrary conusmers. ...
virtual void teardownProducer()
Fully tears down the producer - by default no difference to stopping it.
virtual void started(std::string name)
Start notification.
virtual ~Pipeline()
The Pipeline object's destructor, stopping the pipeline and joining all running threads.
void run()
Starts the producer and, if existing, the consumer in new threads.
void stop()
Stops the pipeline and all running threads.
virtual void onTimeout()
Triggers timeout functionality for all registered consumers.
virtual bool consume(std::shared_ptr< T > product)=0
Consumes a product, utilizing it's contents.
bool consume(std::shared_ptr< T > product)
Consumes a given product with all registered consumers.
virtual void stopConsumer()
Stops the consumer.
virtual void setupProducer()
Set-up functionality of the producers.
virtual void teardownConsumer()
Fully tears down the consumer - by default no difference to stopping it.
virtual void setupConsumer()
Sets up all registered consumers.
virtual void stopConsumer()
Stops all registered consumers.
moodycamel::BlockingReaderWriterQueue< std::unique_ptr< T > > queue_
std::chrono::high_resolution_clock Clock
Parent class for for arbitrary consumers.
AE_FORCEINLINE bool tryEnqueue(T const &element)
virtual bool tryGet(std::vector< std::unique_ptr< T >> &products)=0
Reads packages from some source and produces corresponding objects.
Parent class for notifiers.
IConsumer< T > * consumer_
bool getLatestProduct(std::unique_ptr< T > &product, std::chrono::milliseconds timeout)
Returns the next package in the queue. Can be used instead of registering a consumer.
std::vector< IConsumer< T > * > consumers_
std::atomic< bool > running_
#define URCL_LOG_DEBUG(...)
IProducer< T > & producer_
virtual void startProducer()
#define URCL_LOG_WARN(...)
virtual void teardownConsumer()
Tears down all registered consumers.
Parent class for arbitrary producers of packages.
virtual void stopProducer()
Stops the producer.
bool waitDequeTimed(U &result, std::int64_t timeout_usecs)
Pipeline(IProducer< T > &producer, std::string name, INotifier ¬ifier)
Creates a new Pipeline object, registering producer and notifier while no consumer is used...
virtual void stopped(std::string name)
Stop notification.
The Pipepline manages the production and optionally consumption of packages. Cyclically the producer ...
#define URCL_LOG_INFO(...)
MultiConsumer(std::vector< IConsumer< T > * > consumers)
Creates a new MultiConsumer object.
virtual void setupConsumer()
Set-up functionality of the consumer.