21 #include <aws/core/utils/logging/LogMacros.h> 34 namespace FileManagement {
39 static constexpr std::chrono::milliseconds
kTimeout = std::chrono::minutes(5);
75 data_reader_ = data_reader;
76 auto data_status_monitor = std::make_shared<StatusMonitor>();
77 addStatusMonitor(data_status_monitor);
78 network_monitor_ = std::make_shared<StatusMonitor>();
79 addStatusMonitor(network_monitor_);
81 data_reader_->setStatusMonitor(data_status_monitor);
94 status_condition_monitor_.addStatusMonitor(status_monitor);
98 bool is_shutdown =
true;
100 is_shutdown &= data_reader_->shutdown();
105 AWS_LOG_INFO(__func__,
106 "Publisher state has changed to: %s",
107 (status == Aws::DataFlow::Status::AVAILABLE) ?
"available" :
"unavailable");
108 network_monitor_->setStatus(status);
116 if (upload_status == Aws::DataFlow::UploadStatus::FAIL) {
119 data_reader_->fileUploadCompleteStatus(upload_status, message);
126 bool is_started =
true;
127 is_started &= data_reader_->start();
138 status_monitor_timeout_ = new_timeout;
154 AWS_LOG_DEBUG(__func__,
155 "Waiting for files and work.");
156 auto wait_result = status_condition_monitor_.waitForWork(status_monitor_timeout_);
160 if (wait_result == std::cv_status::timeout) {
162 if (!data_reader_->isDataAvailableToRead()) {
163 AWS_LOG_DEBUG(__func__,
"Timed out when waiting for work, no data available to read");
166 AWS_LOG_DEBUG(__func__,
"Timed out when waiting for work, but data available to read: attempting to publish");
171 AWS_LOG_WARN(__func__,
172 "No Sink Configured");
175 AWS_LOG_DEBUG(__func__,
176 "Found work, batching");
177 FileObject<T> file_object = data_reader_->readBatch(batch_size_);
178 total_logs_uploaded += file_object.
batch_size;
179 stored_task_ = std::make_shared<FileUploadTask<T>>(
180 std::move(file_object),
184 std::placeholders::_1,
185 std::placeholders::_2));
187 AWS_LOG_DEBUG(__func__,
188 "Previous task found, retrying upload.");
192 AWS_LOG_DEBUG(__func__,
194 stored_task_ =
nullptr;
196 AWS_LOG_DEBUG(__func__,
199 data_reader_->deleteStaleData();
216 size_t total_logs_uploaded = 0;
MultiStatusConditionMonitor status_condition_monitor_
void setStatusMonitorTimeout(std::chrono::milliseconds new_timeout)
std::shared_ptr< FileUploadTask< T > > stored_task_
void onPublisherStateChange(const Aws::DataFlow::Status &status)
std::shared_ptr< StatusMonitor > network_monitor_
std::shared_ptr< DataReader< T > > data_reader_
static constexpr std::chrono::milliseconds kTimeout
void addStatusMonitor(std::shared_ptr< StatusMonitor > &status_monitor)
std::chrono::milliseconds status_monitor_timeout_
FileUploadStreamer(std::shared_ptr< DataReader< T >> data_reader, FileUploadStreamerOptions options)
std::shared_ptr< Aws::DataFlow::Task< T >> TaskPtr
void onComplete(const Aws::DataFlow::UploadStatus &upload_status, const FileObject< T > &message)