file_manager.h
Go to the documentation of this file.
1 /*
2  * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  * http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 #pragma once
17 #include <list>
18 #include <memory>
19 #include <stdexcept>
20 #include <thread>
21 
22 #include <aws/core/utils/logging/LogMacros.h>
23 
25 
29 
30 namespace Aws {
31 namespace FileManagement {
32 
34 
35 template <typename T>
36 class FileObject {
37 public:
39  size_t batch_size{};
40  std::list<DataToken> data_tokens;
41 };
42 
43 
44 
45 template <typename T>
46 class DataReader : public Service {
47 public:
53  virtual FileObject<T> readBatch(size_t batch_size) = 0;
54 
59  virtual bool isDataAvailableToRead() = 0;
60 
67  virtual void fileUploadCompleteStatus(
68  const Aws::DataFlow::UploadStatus &upload_status,
69  const FileObject<T> &log_messages) = 0;
70 
71  virtual void setStatusMonitor(std::shared_ptr<StatusMonitor> status_monitor) = 0;
72 
73  virtual void deleteStaleData() = 0;
74 };
75 
80 template <typename T>
81 class FileManager :
82  public DataReader <T>
83 {
84 public:
85 
90  file_manager_strategy_ = std::make_shared<FileManagerStrategy>(Aws::FileManagement::kDefaultFileManagerStrategyOptions);
91  }
92 
98  // NOLINTNEXTLINE(google-explicit-constructor, hicpp-explicit-conversions)
100  file_manager_strategy_ = std::make_shared<FileManagerStrategy>(options);
101  }
102 
109  explicit FileManager(std::shared_ptr<DataManagerStrategy> file_manager_strategy) {
110 
111  if(file_manager_strategy) {
112  file_manager_strategy_ = file_manager_strategy;
113  }
114  }
115 
116  bool start() override {
117  bool started = true;
118  if(file_manager_strategy_) {
119  started &= file_manager_strategy_->start();
120  if (file_manager_strategy_->isDataAvailable()) {
121  file_status_monitor_->setStatus(Aws::DataFlow::Status::AVAILABLE);
122  }
123  }
124  started &= Service::start();
125  return started;
126  }
127 
128  bool shutdown() override {
129  bool is_shutdown = Service::shutdown();
130  if(file_manager_strategy_) {
131  file_status_monitor_->setStatus(Aws::DataFlow::Status::UNAVAILABLE);
132  is_shutdown &= file_manager_strategy_->shutdown();
133  }
134  return is_shutdown;
135  }
136 
137  virtual ~FileManager() = default;
138 
145  inline DataToken read(std::string &data) {
146  DataToken token = file_manager_strategy_->read(data);
147  if (!file_manager_strategy_->isDataAvailable()) {
148  AWS_LOG_INFO(__func__,
149  "Data is no longer available to read.");
150  file_status_monitor_->setStatus(Aws::DataFlow::Status::UNAVAILABLE);
151  }
152  return token;
153  }
154 
159  virtual void write(const T & data) = 0;
160 
168  const Aws::DataFlow::UploadStatus& upload_status,
169  const FileObject<T> &log_messages) override {
170  if (Aws::DataFlow::UploadStatus::SUCCESS == upload_status) {
171  total_logs_uploaded_ += log_messages.batch_size;
172  AWS_LOG_INFO(__func__,
173  "Total items uploaded: %i",
174  total_logs_uploaded_);
175  }
176 
177  // Delete file if empty log_messages.file_location.
178  for (const auto &token : log_messages.data_tokens) {
179  // this may block, file IO can be expensive
180  try {
181 
182  file_manager_strategy_->resolve(token, upload_status == Aws::DataFlow::UploadStatus::SUCCESS);
183  if (upload_status != Aws::DataFlow::UploadStatus::SUCCESS) {
184  file_status_monitor_->setStatus(Aws::DataFlow::Status::AVAILABLE);
185  }
186  } catch(std::runtime_error& exception) {
187  AWS_LOG_WARN(__func__,
188  "caught runtime_error attempting to resolve token %i",
189  token);
190  }
191  }
192  }
193 
198  void setStatusMonitor(std::shared_ptr<StatusMonitor> status_monitor) override {
199  file_status_monitor_ = status_monitor;
200  }
201 
207  bool isDataAvailableToRead() override {
208 
209  if (file_status_monitor_) {
210  return file_status_monitor_->getStatus() == Aws::DataFlow::Status::AVAILABLE;
211  }
212  return false;
213  }
214 
215  /*
216  If the user cfg options for delete_stale_data and a log is over 14 days old from
217  the latest time,it will be deleted. This is because the AWS API for PutLogEvents
218  rejects batches with log events older than 14 days.
219  */
221  std::lock_guard<std::mutex> lock(active_delete_stale_data_mutex_);
222 
223  if (stale_data_.empty()) {
224  return;
225  }
226 
227  AWS_LOG_INFO(__func__, "Deleting stale data from Logbatch");
228 
229  std::list<FileManagement::DataToken> data_tokens;
230  int logsDeleted = 0;
231  while(!stale_data_.empty()){
232  file_manager_strategy_->resolve(stale_data_.back(), true);
233  logsDeleted++;
234  stale_data_.pop_back();
235  }
236 
237  if(logsDeleted > 0){
238  AWS_LOG_INFO(__func__, "%d logs were deleted since the time"
239  " difference was > 14 days.", logsDeleted
240  );
241  }
242  }
243 
244 protected:
249  size_t total_logs_uploaded_ = 0;
250 
254  std::shared_ptr<DataManagerStrategy> file_manager_strategy_;
255 
259  std::shared_ptr<StatusMonitor> file_status_monitor_;
260 
266 
271  std::vector<FileManagement::DataToken> stale_data_;
272 
273 };
274 
275 
276 } // namespace FileManagement
277 } // namespace Aws
void fileUploadCompleteStatus(const Aws::DataFlow::UploadStatus &upload_status, const FileObject< T > &log_messages) override
Definition: file_manager.h:167
std::shared_ptr< StatusMonitor > file_status_monitor_
Definition: file_manager.h:259
FileManager(std::shared_ptr< DataManagerStrategy > file_manager_strategy)
Definition: file_manager.h:109
void setStatusMonitor(std::shared_ptr< StatusMonitor > status_monitor) override
Definition: file_manager.h:198
bool isDataAvailableToRead() override
Definition: file_manager.h:207
std::vector< FileManagement::DataToken > stale_data_
Definition: file_manager.h:271
DataToken read(std::string &data)
Definition: file_manager.h:145
static const FileManagerStrategyOptions kDefaultFileManagerStrategyOptions
virtual bool shutdown()
FileManager(const FileManagerStrategyOptions &options)
Definition: file_manager.h:99
std::list< DataToken > data_tokens
Definition: file_manager.h:40
virtual bool start()
std::shared_ptr< DataManagerStrategy > file_manager_strategy_
Definition: file_manager.h:254


file_management
Author(s): AWS RoboMaker
autogenerated on Fri May 7 2021 02:18:23