16 #include <aws/core/Aws.h> 17 #include <aws/core/utils/logging/LogMacros.h> 36 namespace CloudWatchLogs {
39 size_t publish_trigger_size)
40 :
DataBatcher(max_allowable_batch_size, publish_trigger_size) {
46 static const char *
const kFuncName = __func__;
48 std::lock_guard<std::recursive_mutex> lk(
mtx);
52 AWS_LOGSTREAM_DEBUG(__func__,
"LogBatcher: nothing batched to publish");
56 std::shared_ptr<LogCollection> log_type = this->
batched_data_;
57 std::shared_ptr<Aws::DataFlow::BasicTask<LogCollection>> log_task = std::make_shared<Aws::DataFlow::BasicTask<LogCollection>>(log_type);
66 if (!log_messages.empty()) {
68 if (DataFlow::UploadStatus::INVALID_DATA == upload_status) {
71 AWS_LOG_WARN(kFuncName,
"LogBatcher: Task failed due to invalid log data, dropping");
73 }
else if (DataFlow::UploadStatus::SUCCESS != upload_status) {
75 AWS_LOG_INFO(kFuncName,
"LogBatcher: Task failed to upload: writing logs to file. Status = %d", upload_status);
76 log_file_manager->write(log_messages);
79 AWS_LOG_DEBUG(kFuncName,
"LogBatcher: Task log upload successful");
82 AWS_LOG_INFO(kFuncName,
"LogBatcher: not publishing task as log_messages is empty");
86 log_task->setOnCompleteFunction(
function);
90 if(ServiceState::STARTED != this->
getState()) {
91 AWS_LOG_WARN(__func__,
"current service state is not Started, canceling task: %s",
Service::getStatusString().c_str());
96 bool enqueue_success =
false;
102 if (!enqueue_success) {
103 AWS_LOG_WARN(__func__,
"Unable to enqueue log data, canceling task");
108 AWS_LOGSTREAM_WARN(__func__,
"Unable to obtain queue, canceling task");
111 if (!enqueue_success) {
115 return enqueue_success;
119 std::lock_guard<std::recursive_mutex> lck(
mtx);
122 AWS_LOG_INFO(__func__,
"Writing data to file");
125 AWS_LOG_WARN(__func__,
"Dropping data");
133 AWS_LOGSTREAM_WARN(__func__,
"FileManager not found: data will be dropped on failure.");
140 if (
nullptr == log_file_manager) {
141 throw std::invalid_argument(
"input FileManager cannot be null");
bool publishBatchedData() override
Services the log manager by performing periodic tasks when called. Calling the Service function allow...
std::shared_ptr< Aws::FileManagement::FileManager< LogCollection > > log_file_manager_
~LogBatcher() override
Tears down a LogBatcher object.
LogBatcher(size_t max_allowable_batch_size=DataBatcher::kDefaultMaxBatchSize, size_t publish_trigger_size=DataBatcher::kDefaultTriggerSize)
Creates a new LogBatcher Creates a new LogBatcher that will group/buffer logs. Note: logs are only au...
void emptyCollection() override
std::list< LogType > LogCollection
virtual void setLogFileManager(std::shared_ptr< Aws::FileManagement::FileManager< LogCollection >> log_file_manager)
std::shared_ptr< Sink< Aws::FileManagement::TaskPtr< LogCollection > > > getSink()
std::chrono::microseconds getTryEnqueueDuration()
virtual std::string getStatusString()
size_t getCurrentBatchSize()
std::shared_ptr< std::list< LogType > > batched_data_