log_batcher.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  * http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 #include <aws/core/Aws.h>
17 #include <aws/core/utils/logging/LogMacros.h>
19 
21 
25 
26 #include <chrono>
27 #include <iostream>
28 #include <list>
29 #include <memory>
30 #include <mutex>
31 #include <string>
32 #include <stdexcept>
33 
34 
35 namespace Aws {
36 namespace CloudWatchLogs {
37 
38 LogBatcher::LogBatcher(size_t max_allowable_batch_size,
39  size_t publish_trigger_size)
40  : DataBatcher(max_allowable_batch_size, publish_trigger_size) {
41 }
42 
43 LogBatcher::~LogBatcher() = default;
44 
46  static const char * const kFuncName = __func__;
47 
48  std::lock_guard<std::recursive_mutex> lk(mtx);
49 
50  // is there anything to send?
51  if (getCurrentBatchSize() == 0) {
52  AWS_LOGSTREAM_DEBUG(__func__, "LogBatcher: nothing batched to publish");
53  return false;
54  }
55 
56  std::shared_ptr<LogCollection> log_type = this->batched_data_;
57  std::shared_ptr<Aws::DataFlow::BasicTask<LogCollection>> log_task = std::make_shared<Aws::DataFlow::BasicTask<LogCollection>>(log_type);
58 
59  // connect to the log_file_manager_ to write to disk on task failure
60  if (log_file_manager_) {
61 
62  // register the task failure function
63  auto function = [&log_file_manager = this->log_file_manager_](const DataFlow::UploadStatus &upload_status,
64  const LogCollection &log_messages)
65  {
66  if (!log_messages.empty()) {
67 
68  if (DataFlow::UploadStatus::INVALID_DATA == upload_status) {
69 
70  // publish indicated the task data was bad, this task should be discarded
71  AWS_LOG_WARN(kFuncName, "LogBatcher: Task failed due to invalid log data, dropping");
72 
73  } else if (DataFlow::UploadStatus::SUCCESS != upload_status) {
74 
75  AWS_LOG_INFO(kFuncName, "LogBatcher: Task failed to upload: writing logs to file. Status = %d", upload_status);
76  log_file_manager->write(log_messages);
77 
78  } else {
79  AWS_LOG_DEBUG(kFuncName, "LogBatcher: Task log upload successful");
80  }
81  } else {
82  AWS_LOG_INFO(kFuncName, "LogBatcher: not publishing task as log_messages is empty");
83  }
84  };
85 
86  log_task->setOnCompleteFunction(function);
87  }
88 
89  // dont attempt to queue if not started
90  if(ServiceState::STARTED != this->getState()) {
91  AWS_LOG_WARN(__func__, "current service state is not Started, canceling task: %s", Service::getStatusString().c_str());
92  log_task->cancel();
93  return false;
94  }
95 
96  bool enqueue_success = false;
97 
98  if (getSink()) {
99 
100  enqueue_success = getSink()->tryEnqueue(log_task, this->getTryEnqueueDuration());
101 
102  if (!enqueue_success) {
103  AWS_LOG_WARN(__func__, "Unable to enqueue log data, canceling task");
104  }
105 
106  } else {
107  // if we can't queue, then cancel (write to disk)
108  AWS_LOGSTREAM_WARN(__func__, "Unable to obtain queue, canceling task");
109  }
110 
111  if (!enqueue_success) {
112  log_task->cancel();
113  }
114  this->resetBatchedData();
115  return enqueue_success;
116 }
117 
119  std::lock_guard<std::recursive_mutex> lck(mtx);
120 
121  if (this->log_file_manager_) {
122  AWS_LOG_INFO(__func__, "Writing data to file");
123  log_file_manager_->write(*this->batched_data_);
124  } else {
125  AWS_LOG_WARN(__func__, "Dropping data");
126  }
127  this->resetBatchedData();
128 }
129 
130 
132  if (log_file_manager_ == nullptr) {
133  AWS_LOGSTREAM_WARN(__func__, "FileManager not found: data will be dropped on failure.");
134  }
135  return Service::start();
136 }
137 
139 {
140  if (nullptr == log_file_manager) {
141  throw std::invalid_argument("input FileManager cannot be null");
142  }
143  this->log_file_manager_ = std::move(log_file_manager);
144 }
145 
146 } // namespace CloudWatchLogs
147 } // namespace Aws
bool publishBatchedData() override
Services the log manager by performing periodic tasks when called. Calling the Service function allow...
Definition: log_batcher.cpp:45
std::recursive_mutex mtx
std::shared_ptr< Aws::FileManagement::FileManager< LogCollection > > log_file_manager_
Definition: log_batcher.h:87
~LogBatcher() override
Tears down a LogBatcher object.
LogBatcher(size_t max_allowable_batch_size=DataBatcher::kDefaultMaxBatchSize, size_t publish_trigger_size=DataBatcher::kDefaultTriggerSize)
Creates a new LogBatcher Creates a new LogBatcher that will group/buffer logs. Note: logs are only au...
Definition: log_batcher.cpp:38
std::list< LogType > LogCollection
Definition: definitions.h:29
virtual void setLogFileManager(std::shared_ptr< Aws::FileManagement::FileManager< LogCollection >> log_file_manager)
std::shared_ptr< Sink< Aws::FileManagement::TaskPtr< LogCollection > > > getSink()
std::chrono::microseconds getTryEnqueueDuration()
virtual std::string getStatusString()
std::shared_ptr< std::list< LogType > > batched_data_
ServiceState getState()
virtual bool start()


cloudwatch_logs_common
Author(s): AWS RoboMaker
autogenerated on Fri May 7 2021 02:18:24