metric_batcher.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  * http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 #include <aws/core/Aws.h>
17 #include <aws/core/utils/logging/LogMacros.h>
18 
20 
22 
24 
26 
27 #include <chrono>
28 #include <iostream>
29 #include <list>
30 #include <memory>
31 #include <mutex>
32 #include <string>
33 #include <stdexcept>
34 
35 
36 namespace Aws {
37 namespace CloudWatchMetrics {
38 
39 MetricBatcher::MetricBatcher(size_t max_allowable_batch_size,
40  size_t publish_trigger_size)
41  : DataBatcher(max_allowable_batch_size, publish_trigger_size) {
42 }
43 
45 
47  static const char * const kFuncName = __func__;
48 
49  std::lock_guard<std::recursive_mutex> lk(mtx);
50 
51  // is there anything to send?
52  if (getCurrentBatchSize() == 0) {
53  AWS_LOGSTREAM_DEBUG(__func__, "Nothing batched to publish");
54  return false;
55  }
56 
57  auto metrics_to_publish = this->batched_data_;
58  std::shared_ptr<Aws::DataFlow::BasicTask<MetricDatumCollection>> metric_task = std::make_shared<Aws::DataFlow::BasicTask<MetricDatumCollection>>(metrics_to_publish);
59 
60  // connect to the file manager to write to disk on fail / cancel
61  if (metric_file_manager_ ) {
62 
63  // register the task failure function
64  auto function = [&metric_file_manager = this->metric_file_manager_](const DataFlow::UploadStatus &upload_status,
65  const MetricDatumCollection &metrics_to_publish)
66  {
67  if (!metrics_to_publish.empty()) {
68 
69  if (DataFlow::UploadStatus::INVALID_DATA == upload_status) {
70 
71  // publish indicated the task data was bad, this task should be discarded
72  AWS_LOG_WARN(kFuncName, "MetricBatcher: Task failed due to invalid metric data, dropping");
73 
74  } else if (DataFlow::UploadStatus::SUCCESS != upload_status) {
75 
76  AWS_LOG_INFO(kFuncName, "MetricBatcher: Task failed: writing metrics to file");
77  metric_file_manager->write(metrics_to_publish);
78 
79  } else {
80  AWS_LOG_DEBUG(kFuncName, "MetricBatcher: Task metric upload successful");
81  }
82  } else {
83  AWS_LOG_INFO(kFuncName, "MetricBatcher: not publishing task as metrics_to_publish is empty");
84  }
85  };
86 
87  metric_task->setOnCompleteFunction(function);
88  }
89 
90  // dont attempt to queue if not started
91  if(ServiceState::STARTED != this->getState()) {
92  AWS_LOG_WARN(__func__, "current service state is not Started, canceling task: %s", Service::getStatusString().c_str());
93  metric_task->cancel();
94  return false;
95  }
96 
97  bool enqueue_success = false;
98 
99  // try to enqueue
100  if (getSink()) {
101 
102  enqueue_success = getSink()->tryEnqueue(metric_task, this->getTryEnqueueDuration());
103 
104  if (!enqueue_success) {
105  AWS_LOG_WARN(__func__, "Unable to enqueue data, canceling task");
106  }
107 
108  } else {
109  AWS_LOGSTREAM_WARN(__func__, "Unable to obtain queue, canceling task");
110  }
111 
112  if (!enqueue_success) {
113  metric_task->cancel();
114  }
115  this->resetBatchedData();
116  return enqueue_success;
117 }
118 
120  std::lock_guard<std::recursive_mutex> lk(mtx);
121 
122  if (this->metric_file_manager_) {
123  AWS_LOG_INFO(__func__, "Writing data to file");
124  metric_file_manager_->write(*this->batched_data_);
125  } else {
126  AWS_LOG_WARN(__func__, "Dropping data");
127  }
128  this->resetBatchedData();
129 }
130 
131 
133  if (metric_file_manager_ == nullptr) {
134  AWS_LOGSTREAM_WARN(__func__, "FileManager not found: data will be dropped on failure.");
135  }
136  return Service::start();
137 }
138 
140 {
141  if (nullptr == metric_file_manager) {
142  throw std::invalid_argument("input FileManager cannot be null");
143  }
144  this->metric_file_manager_ = std::move(metric_file_manager);
145 }
146 
147 } // namespace CloudWatchMetrics
148 } // namespace Aws
std::list< Aws::CloudWatch::Model::MetricDatum > MetricDatumCollection
Definition: definitions.h:23
std::recursive_mutex mtx
~MetricBatcher() override
Tears down a MetricBatcher object.
MetricBatcher(size_t max_allowable_batch_size=DataBatcher::kDefaultMaxBatchSize, size_t publish_trigger_size=DataBatcher::kDefaultTriggerSize)
Creates a new MetricBatcher Creates a new MetricBatcher that will group/buffer metrics. Note: metrics are only automatically published if the size is set, otherwise the publishBatchedData is necessary to push data to be published.
virtual void setMetricFileManager(std::shared_ptr< Aws::FileManagement::FileManager< MetricDatumCollection >> file_manager)
std::shared_ptr< Aws::FileManagement::FileManager< MetricDatumCollection > > metric_file_manager_
std::shared_ptr< Sink< Aws::FileManagement::TaskPtr< MetricDatumCollection > > > getSink()
std::chrono::microseconds getTryEnqueueDuration()
virtual std::string getStatusString()
std::shared_ptr< std::list< MetricDatum > > batched_data_
ServiceState getState()
virtual bool start()


cloudwatch_metrics_common
Author(s): AWS RoboMaker
autogenerated on Fri May 7 2021 02:18:25