18 #include <aws/core/Aws.h> 20 #include <file_management/file_upload/file_upload_streamer.h> 21 #include <file_management/file_upload/file_manager.h> 33 namespace CloudWatch {
37 static const std::chrono::milliseconds
kDequeueDuration = std::chrono::milliseconds(100);
47 template<
typename D,
typename T>
59 if (
nullptr == publisher) {
60 throw std::invalid_argument(
"Invalid argument: publisher cannot be null");
63 if (
nullptr == batcher) {
64 throw std::invalid_argument(
"Invalid argument: batcher cannot be null");
67 this->publisher_ = publisher;
68 this->batcher_ = batcher;
69 this->file_upload_streamer_ =
nullptr;
71 this->number_dequeued_.store(0);
84 started &= publisher_->start();
85 started &= batcher_->start();
87 if (file_upload_streamer_) {
88 started &= file_upload_streamer_->start();
108 is_shutdown &= publisher_->shutdown();
109 is_shutdown &= batcher_->shutdown();
111 if (file_upload_streamer_) {
112 is_shutdown &= file_upload_streamer_->shutdown();
114 file_upload_streamer_->join();
131 T t = convertInputToBatched(data_to_batch);
132 return batcher_->batchData(t);
142 virtual inline bool batchData(
const D &data_to_batch,
const std::chrono::milliseconds &milliseconds) {
145 T t = convertInputToBatched(data_to_batch, milliseconds);
146 return batcher_->batchData(t);
157 return batcher_->publishBatchedData();
168 return this->dequeue_duration_;
173 if (new_value.count() >= 0) {
174 this->dequeue_duration_ = new_value;
186 return this->number_dequeued_.load();
205 return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch());
210 virtual T convertInputToBatched(
const D &input,
const std::chrono::milliseconds &milliseconds) = 0;
211 virtual T convertInputToBatched(
const D &input) = 0;
219 TaskPtr<std::list<T>> task_to_publish;
221 ->dequeue(task_to_publish, dequeue_duration_);
225 if (task_to_publish) {
226 this->number_dequeued_++;
227 AWS_LOGSTREAM_DEBUG(__func__,
"Number of tasks dequeued = " << this->number_dequeued_++);
231 task_to_publish->run(publisher_);
234 task_to_publish->cancel();
virtual bool publishBatchedData()
virtual std::chrono::milliseconds getCurrentTimestamp()
std::shared_ptr< DataBatcher< T > > batcher_
virtual bool batchData(const D &data_to_batch, const std::chrono::milliseconds &milliseconds)
virtual bool batchData(const D &data_to_batch)
virtual std::chrono::milliseconds getDequeueDuration()
CloudWatchService(std::shared_ptr< Publisher< std::list< T >>> publisher, std::shared_ptr< DataBatcher< T >> batcher)
virtual bool isConnected()
std::shared_ptr< FileUploadStreamer< std::list< T > > > file_upload_streamer_
static const std::chrono::milliseconds kDequeueDuration
virtual bool start() override
std::atomic< int > number_dequeued_
virtual bool setDequeueDuration(std::chrono::milliseconds new_value)
std::shared_ptr< Publisher< std::list< T > > > publisher_
std::chrono::milliseconds dequeue_duration_