22 #include <aws/core/utils/logging/LogMacros.h> 31 namespace FileManagement {
59 virtual bool isDataAvailableToRead() = 0;
67 virtual void fileUploadCompleteStatus(
71 virtual void setStatusMonitor(std::shared_ptr<StatusMonitor> status_monitor) = 0;
73 virtual void deleteStaleData() = 0;
100 file_manager_strategy_ = std::make_shared<FileManagerStrategy>(options);
109 explicit FileManager(std::shared_ptr<DataManagerStrategy> file_manager_strategy) {
111 if(file_manager_strategy) {
112 file_manager_strategy_ = file_manager_strategy;
118 if(file_manager_strategy_) {
119 started &= file_manager_strategy_->start();
120 if (file_manager_strategy_->isDataAvailable()) {
121 file_status_monitor_->setStatus(Aws::DataFlow::Status::AVAILABLE);
130 if(file_manager_strategy_) {
131 file_status_monitor_->setStatus(Aws::DataFlow::Status::UNAVAILABLE);
132 is_shutdown &= file_manager_strategy_->shutdown();
146 DataToken token = file_manager_strategy_->read(data);
147 if (!file_manager_strategy_->isDataAvailable()) {
148 AWS_LOG_INFO(__func__,
149 "Data is no longer available to read.");
150 file_status_monitor_->setStatus(Aws::DataFlow::Status::UNAVAILABLE);
159 virtual void write(
const T & data) = 0;
170 if (Aws::DataFlow::UploadStatus::SUCCESS == upload_status) {
171 total_logs_uploaded_ += log_messages.
batch_size;
172 AWS_LOG_INFO(__func__,
173 "Total items uploaded: %i",
174 total_logs_uploaded_);
178 for (
const auto &token : log_messages.
data_tokens) {
182 file_manager_strategy_->resolve(token, upload_status == Aws::DataFlow::UploadStatus::SUCCESS);
183 if (upload_status != Aws::DataFlow::UploadStatus::SUCCESS) {
184 file_status_monitor_->setStatus(Aws::DataFlow::Status::AVAILABLE);
186 }
catch(std::runtime_error& exception) {
187 AWS_LOG_WARN(__func__,
188 "caught runtime_error attempting to resolve token %i",
199 file_status_monitor_ = status_monitor;
209 if (file_status_monitor_) {
210 return file_status_monitor_->getStatus() == Aws::DataFlow::Status::AVAILABLE;
221 std::lock_guard<std::mutex> lock(active_delete_stale_data_mutex_);
223 if (stale_data_.empty()) {
227 AWS_LOG_INFO(__func__,
"Deleting stale data from Logbatch");
231 while(!stale_data_.empty()){
232 file_manager_strategy_->resolve(stale_data_.back(),
true);
234 stale_data_.pop_back();
238 AWS_LOG_INFO(__func__,
"%d logs were deleted since the time" 239 " difference was > 14 days.", logsDeleted
249 size_t total_logs_uploaded_ = 0;
void fileUploadCompleteStatus(const Aws::DataFlow::UploadStatus &upload_status, const FileObject< T > &log_messages) override
std::shared_ptr< StatusMonitor > file_status_monitor_
std::mutex active_delete_stale_data_mutex_
FileManager(std::shared_ptr< DataManagerStrategy > file_manager_strategy)
void setStatusMonitor(std::shared_ptr< StatusMonitor > status_monitor) override
bool isDataAvailableToRead() override
std::vector< FileManagement::DataToken > stale_data_
DataToken read(std::string &data)
static const FileManagerStrategyOptions kDefaultFileManagerStrategyOptions
FileManager(const FileManagerStrategyOptions &options)
std::list< DataToken > data_tokens
std::shared_ptr< DataManagerStrategy > file_manager_strategy_