27 #include <aws/core/utils/json/JsonSerializer.h> 28 #include <aws/core/utils/logging/LogMacros.h> 32 namespace CloudWatchLogs {
40 AWS_LOG_INFO(__func__,
"Reading Logbatch");
42 std::priority_queue<std::tuple<Timestamp, std::string, FileManagement::DataToken>> pq;
43 for (
size_t i = 0; i < batch_size; ++i) {
45 if (!file_manager_strategy_->isDataAvailable()) {
48 data_token = read(line);
49 Aws::String aws_line(line.c_str());
50 Aws::Utils::Json::JsonValue value(aws_line);
51 Aws::CloudWatchLogs::Model::InputLogEvent input_event(value);
52 pq.push(std::make_tuple(input_event.GetTimestamp(), line, data_token));
57 std::list<FileManagement::DataToken> data_tokens;
59 Timestamp curTime = std::get<0>(pq.top());
60 std::string line = std::get<1>(pq.top());
63 Aws::String aws_line(line.c_str());
64 Aws::Utils::Json::JsonValue value(aws_line);
65 Aws::CloudWatchLogs::Model::InputLogEvent input_event(value);
66 log_data.push_front(input_event);
67 data_tokens.push_back(new_data_token);
71 std::lock_guard<std::mutex> lock(active_delete_stale_data_mutex_);
72 stale_data_.push_back(new_data_token);
78 if(batch_size != log_data.size()){
79 AWS_LOG_WARN(__func__,
"%d logs were not batched since the time" 80 " difference was > 24 hours. Will try again in a separate batch." 81 , batch_size - log_data.size()
85 FileObject<LogCollection> file_object;
86 file_object.batch_data = log_data;
87 file_object.batch_size = log_data.size();
88 file_object.data_tokens = data_tokens;
93 for (
const Model::InputLogEvent &log: data) {
94 auto aws_str = log.Jsonize().View().WriteCompact();
95 std::string str(aws_str.c_str());
96 file_manager_strategy_->write(str);
98 if (FileManager::file_status_monitor_) {
99 AWS_LOG_INFO(__func__,
100 "Set file status available");
101 FileManager::file_status_monitor_->setStatus(Aws::DataFlow::Status::AVAILABLE);
void write(const LogCollection &data) override
const long TWO_WEEK_IN_MILLISEC
std::list< LogType > LogCollection
FileObject< LogCollection > readBatch(size_t batch_size) override
const long ONE_DAY_IN_MILLISEC