21 #include <aws/core/utils/logging/LogMacros.h> 25 #define KB_TO_BYTES(x) (static_cast<size_t>(x) << 10u) 27 namespace fs = std::experimental::filesystem;
30 namespace FileManagement {
35 if (path.back() !=
'/') {
38 if (path.front() ==
'~') {
39 char * home = getenv(
"HOME");
40 if (
nullptr == home) {
41 AWS_LOG_WARN(__func__,
"No HOME environment variable set. Attempting to use ROS_HOME instead.");
42 home = getenv(
"ROS_HOME");
44 if (
nullptr != home) {
45 path.replace(0, 1, home);
47 throw std::runtime_error(
"The storage directory path uses '~' but no HOME environment variable set.");
63 if (!std::experimental::filesystem::exists(backup_directory)) {
64 AWS_LOG_INFO(__func__,
"TokenStore backup directory %s does not exist, creating.", backup_directory.c_str());
65 std::experimental::filesystem::create_directories(backup_directory);
76 return file_token_info;
79 void printCache(std::unordered_map<DataToken, FileTokenInfo> token_store,
80 std::unordered_map<std::string, std::list<DataToken>> file_tokens,
84 for (
auto& token_info : token_store) {
85 ss << token_info.first <<
": " << token_info.second.file_path_ <<
", " << token_info.second.position_ << std::endl;
87 AWS_LOG_DEBUG(__func__,
88 "Cache Info: token_store \n %s", ss.str().c_str());
92 for (
auto &file_token : file_tokens) {
93 ss << file_token.first <<
": ";
94 for (
auto &tokens : file_token.second) {
99 AWS_LOG_DEBUG(__func__,
100 "Cache Info: file_tokens \n %s", ss.str().c_str());
102 std::stringstream ss;
103 for (
auto& token_info : staged_tokens_) {
104 ss << token_info.first <<
": " << token_info.second.file_path_ <<
", " << token_info.second.position_ << std::endl;
106 AWS_LOG_DEBUG(__func__,
107 "Cache Info: staged_tokens \n %s", ss.str().c_str());
112 AWS_LOG_DEBUG(__func__,
"Creating token");
124 AWS_LOG_DEBUG(__func__,
"Marking token %i as failed (data did not upload successfully)", token);
126 throw std::runtime_error(
"DataToken not found");
131 const std::string &file_path = token_info.
file_path_;
139 AWS_LOG_DEBUG(__func__,
140 "Resolving token %i", token);
143 throw std::runtime_error(
"DataToken not found");
146 const std::string &file_path = token_info.
file_path_;
149 throw std::runtime_error(
"Could not find token set for file: " + file_path);
153 list.erase(std::find(list.begin(), list.end(), token));
165 std::vector<FileTokenInfo> token_backup(vector_size);
166 auto it = token_backup.begin();
168 *it++ = token.second;
178 std::vector<FileTokenInfo> token_store_backup =
backup();
179 if (std::experimental::filesystem::exists(file_path)) {
180 std::experimental::filesystem::remove(file_path);
182 std::ofstream token_store_file;
183 token_store_file.open(file_path);
184 if (token_store_file.bad()) {
185 AWS_LOG_WARN(__func__,
"Unable to open file %s to backup the token store", file_path.c_str());
189 token_store_file << token_info.serialize() << std::endl;
191 token_store_file.close();
195 for (
auto& file_token: file_tokens) {
205 if (!std::experimental::filesystem::exists(file_path)) {
208 AWS_LOG_INFO(__func__,
"Loading existing token store from: %s", file_path.c_str());
209 std::ifstream token_store_read_stream = std::ifstream(file_path);
210 std::vector<FileTokenInfo> file_tokens;
212 while (!token_store_read_stream.eof()) {
213 std::getline(token_store_read_stream, line);
218 }
catch (
const std::runtime_error & e) {
219 AWS_LOG_ERROR(__func__,
"Unable to parse token backup line: %s. Skipping.", line.c_str());
222 file_tokens.push_back(token_info);
225 token_store_read_stream.close();
227 std::experimental::filesystem::remove(file_path);
232 stored_files_size_ = 0;
233 active_write_file_size_ = 0;
240 initializeTokenStore();
241 discoverStoredFiles();
251 AWS_LOG_DEBUG(__func__,
"Initializing offline file storage in directory %s",
options_.storage_directory.c_str());
252 auto storage = std::experimental::filesystem::path(
options_.storage_directory);
253 if (!std::experimental::filesystem::exists(storage)) {
254 AWS_LOG_INFO(__func__,
"File storage directory %s does not exist, creating.", storage.c_str());
255 std::experimental::filesystem::create_directories(storage);
256 stored_files_size_ = 0;
261 AWS_LOG_DEBUG(__func__,
"Initializing token store in directory %s",
options_.storage_directory.c_str());
268 AWS_LOG_DEBUG(__func__,
269 "Is Data Available: %s, %s %s",
270 !active_read_file_.empty() ?
"true" :
"false",
271 !stored_files_.empty() ?
"true" :
"false",
272 active_write_file_size_ > 0 ?
"true" :
"false");
273 return !active_read_file_.empty() || !stored_files_.empty() || active_write_file_size_ > 0;
282 checkIfWriteFileShouldRotate(data.size());
283 checkIfStorageLimitHasBeenReached(data.size());
285 std::lock_guard<std::mutex> write_lock(active_write_file_mutex_);
286 std::ofstream log_file;
287 AWS_LOG_DEBUG(__func__,
"Writing data to file: %s", active_write_file_.c_str())
288 log_file.open(active_write_file_, std::ios_base::app);
289 if (log_file.bad()) {
290 AWS_LOG_WARN(__func__,
"Unable to open file: %s", active_write_file_.c_str());
292 log_file << data << std::endl;
294 active_write_file_size_ += data.size();
295 }
catch(
const std::ios_base::failure& e) {
296 AWS_LOG_WARN(__func__,
"FileManagerStrategy::write caught std::ios_base::failure");
301 std::lock_guard<std::mutex> read_lock(active_read_file_mutex_);
302 if (active_read_file_.empty()) {
303 active_read_file_ = getFileToRead();
305 if (active_read_file_.empty()) {
308 active_read_file_stream_ = std::make_unique<std::ifstream>(active_read_file_);
310 AWS_LOG_DEBUG(__func__,
"Reading from active log file: %s", active_read_file_.c_str());
312 if (
token_store_->isTokenAvailable(active_read_file_)) {
314 active_read_file_stream_->seekg(file_token.
position_);
316 int position = active_read_file_stream_->tellg();
317 auto file_size = active_read_file_stream_->seekg(0, std::ifstream::end).tellg();
318 active_read_file_stream_->seekg(position, std::ifstream::beg);
319 std::getline(*active_read_file_stream_, data);
320 int next_position = active_read_file_stream_->tellg();
321 token =
token_store_->createToken(active_read_file_, position, next_position >= file_size);
323 if (next_position >= file_size) {
324 auto file_loc = std::find(stored_files_.begin(), stored_files_.end(), active_read_file_);
325 if (file_loc != stored_files_.end()) {
326 stored_files_.erase(file_loc);
328 active_read_file_.clear();
329 active_read_file_stream_ =
nullptr;
339 if (file_info.eof_) {
340 deleteFile(file_info.file_path_);
342 }
catch(std::runtime_error& exception) {
343 AWS_LOG_WARN(__func__,
344 "FileManagerStrategy resolve caught runtime_error attempting to resolve token %i",
350 if (file_info.eof_) {
351 AWS_LOG_DEBUG(__func__,
352 "Failed last token %d, pushing file to stored: %s", token, file_info.file_path_.c_str());
353 stored_files_.push_back(file_info.file_path_);
355 }
catch(std::runtime_error& exception) {
356 AWS_LOG_WARN(__func__,
357 "FileManagerStrategy resolve caught runtime_error attempting to resolve token %i",
371 for (
const auto &entry : fs::directory_iterator(
options_.storage_directory)) {
372 const fs::path &path = entry.path();
373 std::regex name_expr(
375 "[0-9]{4}-[0-9]{2}-[0-9]{2}_[0-9]{2}-[0-9]{2}-[0-9]{2}-[0-9]{1}" +
377 if (std::regex_match(path.filename().string(), name_expr)) {
378 addFilePathToStorage(path);
384 AWS_LOG_DEBUG(__func__,
"Deleting file: %s", file_path.c_str());
385 const uintmax_t file_size = fs::file_size(file_path);
386 fs::remove(file_path);
387 stored_files_size_ -= file_size;
393 if (!stored_files_.empty()) {
394 stored_files_.sort();
395 const std::string newest_file = stored_files_.back();
396 stored_files_.pop_back();
400 std::lock_guard<std::mutex> write_lock(active_write_file_mutex_);
401 if (active_write_file_size_ > 0) {
402 const std::string file_path = active_write_file_;
407 throw std::runtime_error(
"No files available for reading");
411 stored_files_.push_back(file_path);
412 stored_files_size_ += fs::file_size(file_path);
416 AWS_LOG_DEBUG(__func__,
"Rotating offline storage file");
417 using std::chrono::system_clock;
418 time_t tt = system_clock::to_time_t (system_clock::now());
419 std::ostringstream oss;
420 auto tm = *std::localtime(&tt);
421 oss << std::put_time(&tm,
"%F_%H-%M-%S");
423 std::string original_file_name =
options_.file_prefix + oss.str();
424 std::string file_name = original_file_name +
"-" + std::to_string(count);
425 std::string file_path =
options_.storage_directory + file_name +
options_.file_extension;
426 while (fs::exists(file_path)) {
428 file_name = original_file_name +
"-" + std::to_string(count);
429 file_path =
options_.storage_directory + file_name +
options_.file_extension;
432 if (!active_write_file_.empty()) {
433 stored_files_.push_back(active_write_file_);
434 stored_files_size_ += active_write_file_size_;
437 AWS_LOG_DEBUG(__func__,
"New active offline storage file is: %s", file_path.c_str());
438 active_write_file_ = file_path;
439 active_write_file_size_ = 0;
443 std::lock_guard<std::mutex> write_lock(active_write_file_mutex_);
444 const uintmax_t new_file_size = active_write_file_size_ + new_data_size;
446 if (new_file_size > max_file_size_in_bytes) {
447 AWS_LOG_DEBUG(__func__,
"New file size %d is larger than max file size %d", new_file_size, max_file_size_in_bytes);
453 const uintmax_t new_storage_size = stored_files_size_ + active_write_file_size_ + new_data_size;
455 if (new_storage_size > max_storage_size_in_bytes) {
456 AWS_LOG_WARN(__func__,
"Maximum offline storage limit has been reached. Deleting oldest log file.");
462 if (!stored_files_.empty()) {
463 std::lock_guard<std::mutex> read_lock(active_read_file_mutex_);
464 stored_files_.sort();
465 const std::string oldest_file = stored_files_.front();
466 if (oldest_file == active_read_file_) {
467 active_read_file_.clear();
468 active_read_file_stream_ =
nullptr;
470 stored_files_.pop_front();
471 AWS_LOG_INFO(__func__,
"Deleting oldest file: %s", oldest_file.c_str());
472 deleteFile(oldest_file);
std::unordered_map< std::string, std::list< DataToken > > file_tokens_
void printCache(std::unordered_map< DataToken, FileTokenInfo > token_store, std::unordered_map< std::string, std::list< DataToken >> file_tokens, std::unordered_map< std::string, FileTokenInfo > staged_tokens_)
std::string backup_directory
void restore(const std::vector< FileTokenInfo > &file_tokens)
std::random_device rand_device
bool isTokenAvailable(const std::string &file_name) const
void deleteFile(const std::string &file_path)
DataToken read(std::string &data) override
void initializeTokenStore()
void discoverStoredFiles()
DataToken createToken(const std::string &file_name, const long streampos, bool is_eof)
std::unordered_map< std::string, FileTokenInfo > staged_tokens_
std::string getFileToRead()
FileTokenInfo popAvailableToken(const std::string &file_name)
void initializeBackupDirectory()
std::unordered_map< DataToken, FileTokenInfo > token_store_
bool isDeleteStaleData() override
static const std::string kTokenStoreFile("token_store.info")
bool isDataAvailable() override
TokenStoreOptions options_
void sanitizePath(std::string &path)
void addFilePathToStorage(const std::experimental::filesystem::path &file_path)
FileManagerStrategy(const FileManagerStrategyOptions &options)
FileTokenInfo fail(const DataToken &token)
void deserialize(const std::string &token_info_json)
void checkIfWriteFileShouldRotate(const uintmax_t &new_data_size)
std::vector< FileTokenInfo > backup()
void checkIfStorageLimitHasBeenReached(const uintmax_t &new_data_size)
void write(const std::string &data) override
void resolve(const DataToken &token, bool is_success) override
FileTokenInfo resolve(const DataToken &token)