48 virtual bool consume(shared_ptr<T> product) = 0;
64 for (
auto& con : consumers_)
71 for (
auto& con : consumers_)
73 con->teardownConsumer();
78 for (
auto& con : consumers_)
85 for (
auto& con : consumers_)
94 for (
auto& con : consumers_)
96 if (!con->consume(product))
103 template <
typename T>
117 virtual bool tryGet(std::vector<unique_ptr<T>>& products) = 0;
131 template <
typename T>
135 typedef std::chrono::high_resolution_clock
Clock;
136 typedef Clock::time_point
Time;
148 std::vector<unique_ptr<T>> products;
151 if (!producer_.
tryGet(products))
156 for (
auto& p : products)
160 LOG_ERROR(
"Pipeline producer overflowed! <%s>", name_.c_str());
167 LOG_DEBUG(
"Pipeline producer ended! <%s>", name_.c_str());
176 unique_ptr<T> product;
189 if (!consumer_.
consume(std::move(product)))
193 LOG_DEBUG(
"Pipeline consumer ended! <%s>", name_.c_str());
201 : producer_(producer), consumer_(consumer), name_(name), notifier_(notifier), queue_{ 32 }, running_{
false }
221 LOG_DEBUG(
"Stopping pipeline! <%s>", name_.c_str());
MultiConsumer(std::vector< IConsumer< T > * > consumers)
virtual void teardownProducer()
virtual void setupProducer()
IConsumer< T > & consumer_
bool wait_dequeue_timed(U &result, std::int64_t timeout_usecs)
bool consume(shared_ptr< T > product)
#define LOG_DEBUG(format,...)
virtual void setupConsumer()
virtual void setupConsumer()
virtual void stopConsumer()
virtual bool tryGet(std::vector< unique_ptr< T >> &products)=0
std::chrono::high_resolution_clock Clock
virtual void stopProducer()
virtual void stopConsumer()
AE_FORCEINLINE bool try_enqueue(T const &element)
virtual void stopped(std::string name)
virtual void started(std::string name)
virtual void teardownConsumer()
std::vector< IConsumer< T > * > consumers_
IProducer< T > & producer_
virtual void teardownConsumer()
Pipeline(IProducer< T > &producer, IConsumer< T > &consumer, std::string name, INotifier ¬ifier)
virtual bool consume(shared_ptr< T > product)=0
#define LOG_ERROR(format,...)
BlockingReaderWriterQueue< unique_ptr< T > > queue_