file_manager_strategy.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  * http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 #include <chrono>
17 #include <iostream>
18 #include <regex>
19 #include <fstream>
21 #include <aws/core/utils/logging/LogMacros.h>
22 #include <iomanip>
24 
25 #define KB_TO_BYTES(x) (static_cast<size_t>(x) << 10u)
26 
27 namespace fs = std::experimental::filesystem;
28 
29 namespace Aws {
30 namespace FileManagement {
31 
32 static const std::string kTokenStoreFile("token_store.info");
33 
34 void sanitizePath(std::string & path) {
35  if (path.back() != '/') {
36  path += '/';
37  }
38  if (path.front() == '~') {
39  char * home = getenv("HOME");
40  if (nullptr == home) {
41  AWS_LOG_WARN(__func__, "No HOME environment variable set. Attempting to use ROS_HOME instead.");
42  home = getenv("ROS_HOME");
43  }
44  if (nullptr != home) {
45  path.replace(0, 1, home);
46  } else {
47  throw std::runtime_error("The storage directory path uses '~' but no HOME environment variable set.");
48  }
49  }
50 }
51 
52 TokenStore::TokenStore(const TokenStoreOptions &options) : options_{options}{
55 }
56 
59 }
60 
62  auto backup_directory = std::experimental::filesystem::path(options_.backup_directory);
63  if (!std::experimental::filesystem::exists(backup_directory)) {
64  AWS_LOG_INFO(__func__, "TokenStore backup directory %s does not exist, creating.", backup_directory.c_str());
65  std::experimental::filesystem::create_directories(backup_directory);
66  }
67 }
68 
69 bool TokenStore::isTokenAvailable(const std::string &file_name) const {
70  return !(staged_tokens_.find(file_name) == staged_tokens_.end());
71 }
72 
73 FileTokenInfo TokenStore::popAvailableToken(const std::string &file_name) {
74  auto file_token_info = staged_tokens_[file_name];
75  staged_tokens_.erase(file_name);
76  return file_token_info;
77 }
78 
79 void printCache(std::unordered_map<DataToken, FileTokenInfo> token_store,
80  std::unordered_map<std::string, std::list<DataToken>> file_tokens,
81  std::unordered_map<std::string, FileTokenInfo> staged_tokens_) {
82  {
83  std::stringstream ss;
84  for (auto& token_info : token_store) {
85  ss << token_info.first << ": " << token_info.second.file_path_ << ", " << token_info.second.position_ << std::endl;
86  }
87  AWS_LOG_DEBUG(__func__,
88  "Cache Info: token_store \n %s", ss.str().c_str());
89  }
90  {
91  std::stringstream ss;
92  for (auto &file_token : file_tokens) {
93  ss << file_token.first << ": ";
94  for (auto &tokens : file_token.second) {
95  ss << tokens;
96  }
97  ss << std::endl;
98  }
99  AWS_LOG_DEBUG(__func__,
100  "Cache Info: file_tokens \n %s", ss.str().c_str());
101  }
102  std::stringstream ss;
103  for (auto& token_info : staged_tokens_) {
104  ss << token_info.first << ": " << token_info.second.file_path_ << ", " << token_info.second.position_ << std::endl;
105  }
106  AWS_LOG_DEBUG(__func__,
107  "Cache Info: staged_tokens \n %s", ss.str().c_str());
108 }
109 
110 // NOLINTNEXTLINE(google-runtime-int)
111 DataToken TokenStore::createToken(const std::string &file_name, const long streampos, bool is_eof) {
112  AWS_LOG_DEBUG(__func__, "Creating token");
113  std::mt19937_64 rand( rand_device() );
114  DataToken token = rand();
115  token_store_.emplace(token, FileTokenInfo(file_name, streampos, is_eof));
116  if (file_tokens_.find(file_name) == file_tokens_.end()) {
117  file_tokens_[file_name] = std::list<DataToken>();
118  }
119  file_tokens_[file_name].push_back(token);
120  return token;
121 }
122 
124  AWS_LOG_DEBUG(__func__, "Marking token %i as failed (data did not upload successfully)", token);
125  if (token_store_.find(token) == token_store_.end()) {
126  throw std::runtime_error("DataToken not found");
127  }
128  FileTokenInfo token_info = token_store_[token];
129  token_store_.erase(token);
130  if (file_tokens_.find(token_info.file_path_) != file_tokens_.end()) {
131  const std::string &file_path = token_info.file_path_;
132  staged_tokens_[file_path] = token_info;
133  file_tokens_.erase(file_path);
134  }
135  return token_info;
136 }
137 
139  AWS_LOG_DEBUG(__func__,
140  "Resolving token %i", token);
141 
142  if (token_store_.find(token) == token_store_.end()) {
143  throw std::runtime_error("DataToken not found");
144  }
145  FileTokenInfo token_info = token_store_[token];
146  const std::string &file_path = token_info.file_path_;
147 
148  if (file_tokens_.find(file_path) == file_tokens_.end()) {
149  throw std::runtime_error("Could not find token set for file: " + file_path);
150  }
151  // this find should be O(1), as we expect data to be resolved in order
152  auto list = file_tokens_[file_path];
153  list.erase(std::find(list.begin(), list.end(), token));
154 
155  if (file_tokens_[file_path].empty()) {
156  file_tokens_.erase(file_path);
157  }
158  token_store_.erase(token);
159 // printCache(token_store_, file_tokens_, staged_tokens_);
160  return token_info;
161 }
162 
163 std::vector<FileTokenInfo> TokenStore::backup() {
164  auto vector_size = file_tokens_.size() + staged_tokens_.size();
165  std::vector<FileTokenInfo> token_backup(vector_size);
166  auto it = token_backup.begin();
167  for (auto& token : staged_tokens_) {
168  *it++ = token.second;
169  }
170  for (auto& token : file_tokens_) {
171  *it++ = token_store_[*token.second.begin()];
172  }
173  return token_backup;
174 }
175 
177  auto file_path = std::experimental::filesystem::path(options_.backup_directory + kTokenStoreFile);
178  std::vector<FileTokenInfo> token_store_backup = backup();
179  if (std::experimental::filesystem::exists(file_path)) {
180  std::experimental::filesystem::remove(file_path);
181  }
182  std::ofstream token_store_file;
183  token_store_file.open(file_path);
184  if (token_store_file.bad()) {
185  AWS_LOG_WARN(__func__, "Unable to open file %s to backup the token store", file_path.c_str());
186  return;
187  }
188  for (const FileTokenInfo &token_info : token_store_backup) {
189  token_store_file << token_info.serialize() << std::endl;
190  }
191  token_store_file.close();
192 }
193 
194 void TokenStore::restore(const std::vector<FileTokenInfo> &file_tokens) {
195  for (auto& file_token: file_tokens) {
196  staged_tokens_[file_token.file_path_] = file_token;
197  }
198 }
199 
201  // read through each line.
202  // For each line the first 4 bytes are position, next byte is eof, the remainder are a string of file path
203  // Will this change depending on OS / platform? Will that matter? Should I use another serialization library.
204  auto file_path = std::experimental::filesystem::path(options_.backup_directory + kTokenStoreFile);
205  if (!std::experimental::filesystem::exists(file_path)) {
206  return;
207  }
208  AWS_LOG_INFO(__func__, "Loading existing token store from: %s", file_path.c_str());
209  std::ifstream token_store_read_stream = std::ifstream(file_path);
210  std::vector<FileTokenInfo> file_tokens;
211  std::string line;
212  while (!token_store_read_stream.eof()) {
213  std::getline(token_store_read_stream, line);
214  if (!line.empty()) {
215  FileTokenInfo token_info;
216  try {
217  token_info.deserialize(line);
218  } catch (const std::runtime_error & e) {
219  AWS_LOG_ERROR(__func__, "Unable to parse token backup line: %s. Skipping.", line.c_str());
220  continue;
221  }
222  file_tokens.push_back(token_info);
223  }
224  }
225  token_store_read_stream.close();
226  restore(file_tokens);
227  std::experimental::filesystem::remove(file_path);
228 }
229 
230 
232  stored_files_size_ = 0;
233  active_write_file_size_ = 0;
234  options_ = options;
235  validateOptions();
236 }
237 
239  initializeStorage();
240  initializeTokenStore();
241  discoverStoredFiles();
242  rotateWriteFile();
243  return Service::start();
244 }
245 
247  sanitizePath(options_.storage_directory);
248 }
249 
251  AWS_LOG_DEBUG(__func__, "Initializing offline file storage in directory %s", options_.storage_directory.c_str());
252  auto storage = std::experimental::filesystem::path(options_.storage_directory);
253  if (!std::experimental::filesystem::exists(storage)) {
254  AWS_LOG_INFO(__func__, "File storage directory %s does not exist, creating.", storage.c_str());
255  std::experimental::filesystem::create_directories(storage);
256  stored_files_size_ = 0;
257  }
258 }
259 
261  AWS_LOG_DEBUG(__func__, "Initializing token store in directory %s", options_.storage_directory.c_str());
262  TokenStoreOptions options{options_.storage_directory};
263  token_store_ = std::make_unique<TokenStore>(options);
264  token_store_->restoreFromDisk();
265 }
266 
268  AWS_LOG_DEBUG(__func__,
269  "Is Data Available: %s, %s %s",
270  !active_read_file_.empty() ? "true" : "false",
271  !stored_files_.empty() ? "true" : "false",
272  active_write_file_size_ > 0 ? "true" : "false");
273  return !active_read_file_.empty() || !stored_files_.empty() || active_write_file_size_ > 0;
274 }
275 
277  return options_.delete_stale_data;
278 }
279 
280 void FileManagerStrategy::write(const std::string &data) {
281  try {
282  checkIfWriteFileShouldRotate(data.size());
283  checkIfStorageLimitHasBeenReached(data.size());
284 
285  std::lock_guard<std::mutex> write_lock(active_write_file_mutex_);
286  std::ofstream log_file;
287  AWS_LOG_DEBUG(__func__, "Writing data to file: %s", active_write_file_.c_str())
288  log_file.open(active_write_file_, std::ios_base::app);
289  if (log_file.bad()) {
290  AWS_LOG_WARN(__func__, "Unable to open file: %s", active_write_file_.c_str());
291  }
292  log_file << data << std::endl;
293  log_file.close();
294  active_write_file_size_ += data.size();
295  } catch(const std::ios_base::failure& e) {
296  AWS_LOG_WARN(__func__, "FileManagerStrategy::write caught std::ios_base::failure");
297  }
298 }
299 
301  std::lock_guard<std::mutex> read_lock(active_read_file_mutex_);
302  if (active_read_file_.empty()) {
303  active_read_file_ = getFileToRead();
304  // if the file is still empty, return an empty token.
305  if (active_read_file_.empty()) {
306  return 0;
307  }
308  active_read_file_stream_ = std::make_unique<std::ifstream>(active_read_file_);
309  }
310  AWS_LOG_DEBUG(__func__, "Reading from active log file: %s", active_read_file_.c_str());
311  DataToken token;
312  if (token_store_->isTokenAvailable(active_read_file_)) {
313  FileTokenInfo file_token = token_store_->popAvailableToken(active_read_file_);
314  active_read_file_stream_->seekg(file_token.position_);
315  }
316  int position = active_read_file_stream_->tellg();
317  auto file_size = active_read_file_stream_->seekg(0, std::ifstream::end).tellg();
318  active_read_file_stream_->seekg(position, std::ifstream::beg);
319  std::getline(*active_read_file_stream_, data);
320  int next_position = active_read_file_stream_->tellg();
321  token = token_store_->createToken(active_read_file_, position, next_position >= file_size);
322 
323  if (next_position >= file_size) {
324  auto file_loc = std::find(stored_files_.begin(), stored_files_.end(), active_read_file_);
325  if (file_loc != stored_files_.end()) {
326  stored_files_.erase(file_loc);
327  }
328  active_read_file_.clear();
329  active_read_file_stream_ = nullptr;
330 
331  }
332  return token;
333 }
334 
335 void FileManagerStrategy::resolve(const DataToken &token, bool is_success) {
336  if (is_success) {
337  try {
338  auto file_info = token_store_->resolve(token);
339  if (file_info.eof_) {
340  deleteFile(file_info.file_path_);
341  }
342  } catch(std::runtime_error& exception) {
343  AWS_LOG_WARN(__func__,
344  "FileManagerStrategy resolve caught runtime_error attempting to resolve token %i",
345  token);
346  }
347  } else {
348  try {
349  auto file_info = token_store_->fail(token);
350  if (file_info.eof_) {
351  AWS_LOG_DEBUG(__func__,
352  "Failed last token %d, pushing file to stored: %s", token, file_info.file_path_.c_str());
353  stored_files_.push_back(file_info.file_path_);
354  }
355  } catch(std::runtime_error& exception) {
356  AWS_LOG_WARN(__func__,
357  "FileManagerStrategy resolve caught runtime_error attempting to resolve token %i",
358  token);
359  }
360  }
361 }
362 
364  bool b = Service::shutdown();
365  token_store_->backupToDisk();
366  return b;
367 }
368 
369 
371  for (const auto &entry : fs::directory_iterator(options_.storage_directory)) {
372  const fs::path &path = entry.path();
373  std::regex name_expr(
374  options_.file_prefix +
375  "[0-9]{4}-[0-9]{2}-[0-9]{2}_[0-9]{2}-[0-9]{2}-[0-9]{2}-[0-9]{1}" +
376  options_.file_extension);
377  if (std::regex_match(path.filename().string(), name_expr)) {
378  addFilePathToStorage(path);
379  }
380  }
381 }
382 
383 void FileManagerStrategy::deleteFile(const std::string &file_path) {
384  AWS_LOG_DEBUG(__func__, "Deleting file: %s", file_path.c_str());
385  const uintmax_t file_size = fs::file_size(file_path);
386  fs::remove(file_path);
387  stored_files_size_ -= file_size;
388 }
389 
391  // if we have stored files, pop from the end of the list and return that filename
392  // if we do not have stored files, and the active file has data, switch active file and return the existing active file.
393  if (!stored_files_.empty()) {
394  stored_files_.sort();
395  const std::string newest_file = stored_files_.back();
396  stored_files_.pop_back();
397  return newest_file;
398  }
399 
400  std::lock_guard<std::mutex> write_lock(active_write_file_mutex_);
401  if (active_write_file_size_ > 0) {
402  const std::string file_path = active_write_file_;
403  rotateWriteFile();
404  return file_path;
405  }
406 
407  throw std::runtime_error("No files available for reading");
408 }
409 
410 void FileManagerStrategy::addFilePathToStorage(const fs::path &file_path) {
411  stored_files_.push_back(file_path);
412  stored_files_size_ += fs::file_size(file_path);
413 }
414 
416  AWS_LOG_DEBUG(__func__, "Rotating offline storage file");
417  using std::chrono::system_clock;
418  time_t tt = system_clock::to_time_t (system_clock::now());
419  std::ostringstream oss;
420  auto tm = *std::localtime(&tt);
421  oss << std::put_time(&tm, "%F_%H-%M-%S");
422  uint count = 0;
423  std::string original_file_name = options_.file_prefix + oss.str();
424  std::string file_name = original_file_name + "-" + std::to_string(count);
425  std::string file_path = options_.storage_directory + file_name + options_.file_extension;
426  while (fs::exists(file_path)) {
427  ++count;
428  file_name = original_file_name + "-" + std::to_string(count);
429  file_path = options_.storage_directory + file_name + options_.file_extension;
430  }
431 
432  if (!active_write_file_.empty()) {
433  stored_files_.push_back(active_write_file_);
434  stored_files_size_ += active_write_file_size_;
435  }
436 
437  AWS_LOG_DEBUG(__func__, "New active offline storage file is: %s", file_path.c_str());
438  active_write_file_ = file_path;
439  active_write_file_size_ = 0;
440 }
441 
442 void FileManagerStrategy::checkIfWriteFileShouldRotate(const uintmax_t &new_data_size) {
443  std::lock_guard<std::mutex> write_lock(active_write_file_mutex_);
444  const uintmax_t new_file_size = active_write_file_size_ + new_data_size;
445  const uintmax_t max_file_size_in_bytes = KB_TO_BYTES(options_.maximum_file_size_in_kb);
446  if (new_file_size > max_file_size_in_bytes) {
447  AWS_LOG_DEBUG(__func__, "New file size %d is larger than max file size %d", new_file_size, max_file_size_in_bytes);
448  rotateWriteFile();
449  }
450 }
451 
452 void FileManagerStrategy::checkIfStorageLimitHasBeenReached(const uintmax_t &new_data_size) {
453  const uintmax_t new_storage_size = stored_files_size_ + active_write_file_size_ + new_data_size;
454  const uintmax_t max_storage_size_in_bytes = KB_TO_BYTES(options_.storage_limit_in_kb);
455  if (new_storage_size > max_storage_size_in_bytes) {
456  AWS_LOG_WARN(__func__, "Maximum offline storage limit has been reached. Deleting oldest log file.");
457  deleteOldestFile();
458  }
459 }
460 
462  if (!stored_files_.empty()) {
463  std::lock_guard<std::mutex> read_lock(active_read_file_mutex_);
464  stored_files_.sort();
465  const std::string oldest_file = stored_files_.front();
466  if (oldest_file == active_read_file_) {
467  active_read_file_.clear();
468  active_read_file_stream_ = nullptr;
469  }
470  stored_files_.pop_front();
471  AWS_LOG_INFO(__func__, "Deleting oldest file: %s", oldest_file.c_str());
472  deleteFile(oldest_file);
473  }
474 }
475 
476 } // namespace FileManagement
477 } // namespace Aws
std::unordered_map< std::string, std::list< DataToken > > file_tokens_
void printCache(std::unordered_map< DataToken, FileTokenInfo > token_store, std::unordered_map< std::string, std::list< DataToken >> file_tokens, std::unordered_map< std::string, FileTokenInfo > staged_tokens_)
void restore(const std::vector< FileTokenInfo > &file_tokens)
bool isTokenAvailable(const std::string &file_name) const
void deleteFile(const std::string &file_path)
DataToken read(std::string &data) override
DataToken createToken(const std::string &file_name, const long streampos, bool is_eof)
std::unordered_map< std::string, FileTokenInfo > staged_tokens_
FileTokenInfo popAvailableToken(const std::string &file_name)
std::unordered_map< DataToken, FileTokenInfo > token_store_
static const std::string kTokenStoreFile("token_store.info")
virtual bool shutdown()
void sanitizePath(std::string &path)
#define KB_TO_BYTES(x)
void addFilePathToStorage(const std::experimental::filesystem::path &file_path)
FileManagerStrategy(const FileManagerStrategyOptions &options)
FileTokenInfo fail(const DataToken &token)
void deserialize(const std::string &token_info_json)
void checkIfWriteFileShouldRotate(const uintmax_t &new_data_size)
std::vector< FileTokenInfo > backup()
void checkIfStorageLimitHasBeenReached(const uintmax_t &new_data_size)
void write(const std::string &data) override
void resolve(const DataToken &token, bool is_success) override
virtual bool start()
FileTokenInfo resolve(const DataToken &token)


file_management
Author(s): AWS RoboMaker
autogenerated on Fri May 7 2021 02:18:23