16 #include <aws/core/Aws.h> 17 #include <aws/core/utils/logging/LogMacros.h> 37 namespace CloudWatchMetrics {
40 size_t publish_trigger_size)
41 :
DataBatcher(max_allowable_batch_size, publish_trigger_size) {
47 static const char *
const kFuncName = __func__;
49 std::lock_guard<std::recursive_mutex> lk(
mtx);
53 AWS_LOGSTREAM_DEBUG(__func__,
"Nothing batched to publish");
58 std::shared_ptr<Aws::DataFlow::BasicTask<MetricDatumCollection>> metric_task = std::make_shared<Aws::DataFlow::BasicTask<MetricDatumCollection>>(metrics_to_publish);
67 if (!metrics_to_publish.empty()) {
69 if (DataFlow::UploadStatus::INVALID_DATA == upload_status) {
72 AWS_LOG_WARN(kFuncName,
"MetricBatcher: Task failed due to invalid metric data, dropping");
74 }
else if (DataFlow::UploadStatus::SUCCESS != upload_status) {
76 AWS_LOG_INFO(kFuncName,
"MetricBatcher: Task failed: writing metrics to file");
77 metric_file_manager->write(metrics_to_publish);
80 AWS_LOG_DEBUG(kFuncName,
"MetricBatcher: Task metric upload successful");
83 AWS_LOG_INFO(kFuncName,
"MetricBatcher: not publishing task as metrics_to_publish is empty");
87 metric_task->setOnCompleteFunction(
function);
91 if(ServiceState::STARTED != this->
getState()) {
92 AWS_LOG_WARN(__func__,
"current service state is not Started, canceling task: %s",
Service::getStatusString().c_str());
93 metric_task->cancel();
97 bool enqueue_success =
false;
104 if (!enqueue_success) {
105 AWS_LOG_WARN(__func__,
"Unable to enqueue data, canceling task");
109 AWS_LOGSTREAM_WARN(__func__,
"Unable to obtain queue, canceling task");
112 if (!enqueue_success) {
113 metric_task->cancel();
116 return enqueue_success;
120 std::lock_guard<std::recursive_mutex> lk(
mtx);
123 AWS_LOG_INFO(__func__,
"Writing data to file");
126 AWS_LOG_WARN(__func__,
"Dropping data");
134 AWS_LOGSTREAM_WARN(__func__,
"FileManager not found: data will be dropped on failure.");
141 if (
nullptr == metric_file_manager) {
142 throw std::invalid_argument(
"input FileManager cannot be null");
std::list< Aws::CloudWatch::Model::MetricDatum > MetricDatumCollection
bool publishBatchedData() override
void emptyCollection() override
~MetricBatcher() override
Tears down a MetricBatcher object.
MetricBatcher(size_t max_allowable_batch_size=DataBatcher::kDefaultMaxBatchSize, size_t publish_trigger_size=DataBatcher::kDefaultTriggerSize)
Creates a new MetricBatcher Creates a new MetricBatcher that will group/buffer metrics. Note: metrics are only automatically published if the size is set, otherwise the publishBatchedData is necessary to push data to be published.
virtual void setMetricFileManager(std::shared_ptr< Aws::FileManagement::FileManager< MetricDatumCollection >> file_manager)
std::shared_ptr< Aws::FileManagement::FileManager< MetricDatumCollection > > metric_file_manager_
std::shared_ptr< Sink< Aws::FileManagement::TaskPtr< MetricDatumCollection > > > getSink()
std::chrono::microseconds getTryEnqueueDuration()
virtual std::string getStatusString()
size_t getCurrentBatchSize()
std::shared_ptr< std::list< MetricDatum > > batched_data_