17 #include <gtest/gtest.h> 18 #include <gmock/gmock.h> 21 #include <condition_variable> 28 #include <aws/core/Aws.h> 29 #include <aws/core/utils/logging/ConsoleLogSystem.h> 54 force_failure =
false;
55 force_invalid_data_failure =
false;
56 last_upload_status = Aws::DataFlow::UploadStatus::UNKNOWN;
77 force_invalid_data_failure = nv;
81 return last_upload_status;
92 return last_upload_status;
98 return Aws::DataFlow::UploadStatus::FAIL;
100 }
else if (force_invalid_data_failure) {
101 return Aws::DataFlow::UploadStatus::INVALID_DATA;
104 return Aws::DataFlow::UploadStatus::SUCCESS;
121 written_count.store(0);
125 last_data_size = data.size();
137 std::atomic<int> written_count{};
138 std::atomic<size_t> last_data_size{};
139 std::condition_variable
cv;
148 test_publisher = std::make_shared<TestPublisher>();
149 batcher = std::make_shared<MetricBatcher>();
152 cw_service = std::make_shared<MetricService>(test_publisher, batcher);
154 stream_data_queue = std::make_shared<TaskObservedQueue<MetricDatumCollection>>();
155 queue_monitor = std::make_shared<Aws::DataFlow::QueueMonitor<TaskPtr<MetricDatumCollection>>>();
158 batcher->setSink(stream_data_queue);
160 cw_service->setSource(queue_monitor);
163 EXPECT_EQ(Aws::DataFlow::UploadStatus::UNKNOWN, test_publisher->getLastUploadStatus());
169 cw_service->shutdown();
180 std::shared_ptr<Aws::DataFlow::QueueMonitor<TaskPtr<MetricDatumCollection>>>
queue_monitor;
184 const std::string & name,
185 const double value = 2.42,
186 const std::string & unit =
"gigawatts",
187 const int64_t timestamp = 1234,
188 const std::map<std::string, std::string> & dimensions = std::map<std::string, std::string>(),
189 const int storage_resolution = 60
191 return MetricObject {name, value, unit, timestamp, dimensions, storage_resolution};
195 const std::string & name,
196 const double value = 2.42,
197 const std::string & unit =
"gigawatts",
198 const int64_t timestamp = 1234,
199 const std::map<std::string, std::string> & dimensions = std::map<std::string, std::string>(),
200 const int storage_resolution = 60
202 std::map<StatisticValuesType, double> statistic_values = {
208 return MetricObject{name, statistic_values, unit, timestamp, dimensions, storage_resolution};
215 TEST(MetricPipelineTest, TestCreateMetricObject) {
217 std::string name(
"HeWhoShallNotBenamed");
220 EXPECT_EQ(name,
object.metric_name);
221 EXPECT_EQ(2.42,
object.value);
222 EXPECT_EQ(
"gigawatts",
object.unit);
223 EXPECT_EQ(1234,
object.timestamp);
224 EXPECT_TRUE(
object.statistic_values.empty());
225 EXPECT_TRUE(
object.dimensions.empty());
226 EXPECT_EQ(60,
object.storage_resolution);
231 const std::string metric_name =
"test_object";
232 const std::string dimension_name =
"blah";
233 const std::string dimension_value =
"blah blah";
236 object.unit =
"percent";
237 object.statistic_values.clear();
240 object.dimensions[dimension_name] = dimension_value;
242 auto datum = cw_service->convertInputToBatched(
object);
243 EXPECT_EQ(metric_name, datum.GetMetricName().c_str());
244 EXPECT_EQ(Aws::CloudWatch::Model::StandardUnit::Percent, datum.GetUnit());
245 EXPECT_EQ(1234, datum.GetTimestamp().Millis());
246 EXPECT_EQ(60, datum.GetStorageResolution());
248 const auto & statistic_values = datum.GetStatisticValues();
249 EXPECT_DOUBLE_EQ(24, statistic_values.GetSampleCount());
250 EXPECT_DOUBLE_EQ(111.1, statistic_values.GetSum());
252 const auto & dimensions = datum.GetDimensions();
253 EXPECT_EQ(1u, dimensions.size());
254 EXPECT_EQ(dimension_name, dimensions[0].GetName().c_str());
255 EXPECT_EQ(dimension_value, dimensions[0].GetValue().c_str());
264 EXPECT_EQ(0u, batcher->getCurrentBatchSize());
265 bool is_batched = cw_service->batchData(toBatch);
266 EXPECT_EQ(1u, batcher->getCurrentBatchSize());
268 EXPECT_TRUE(is_batched);
270 EXPECT_EQ(PublisherState::UNKNOWN, test_publisher->getPublisherState());
271 EXPECT_EQ(0, test_publisher->getPublishSuccesses());
272 EXPECT_EQ(0, test_publisher->getPublishAttempts());
273 EXPECT_EQ(0.0f, test_publisher->getPublishSuccessPercentage());
276 bool b2 = cw_service->publishBatchedData();
277 test_publisher->wait_for(std::chrono::seconds(1));
280 EXPECT_EQ(0u, batcher->getCurrentBatchSize());
281 EXPECT_EQ(PublisherState::CONNECTED, test_publisher->getPublisherState());
282 EXPECT_EQ(1, test_publisher->getPublishSuccesses());
283 EXPECT_EQ(1, test_publisher->getPublishAttempts());
284 EXPECT_EQ(100.0f, test_publisher->getPublishSuccessPercentage());
285 EXPECT_EQ(Aws::DataFlow::UploadStatus::SUCCESS, test_publisher->getLastUploadStatus());
294 bool is_batched = cw_service->batchData(toBatch);
295 EXPECT_TRUE(is_batched);
297 for(
int i=99; i>0; i--) {
299 is_batched = cw_service->batchData(batchedBottles);
300 EXPECT_TRUE(is_batched);
303 EXPECT_EQ(PublisherState::UNKNOWN, test_publisher->getPublisherState());
304 EXPECT_EQ(100u, batcher->getCurrentBatchSize());
307 is_batched = cw_service->publishBatchedData();
308 test_publisher->wait_for(std::chrono::seconds(1));
310 EXPECT_TRUE(is_batched);
311 EXPECT_EQ(1, test_publisher->getPublishSuccesses());
312 EXPECT_EQ(0u, batcher->getCurrentBatchSize());
313 EXPECT_EQ(PublisherState::CONNECTED, test_publisher->getPublisherState());
314 EXPECT_EQ(Aws::DataFlow::UploadStatus::SUCCESS, test_publisher->getLastUploadStatus());
322 EXPECT_EQ(SIZE_MAX, batcher->getTriggerBatchSize());
324 batcher->setTriggerBatchSize(size);
325 EXPECT_EQ(size, batcher->getTriggerBatchSize());
327 EXPECT_EQ(PublisherState::UNKNOWN, test_publisher->getPublisherState());
329 for(
size_t i=1; i<size; i++) {
331 bool is_batched = cw_service->batchData(toBatch);
333 EXPECT_TRUE(is_batched);
334 EXPECT_EQ(0, test_publisher->getPublishAttempts());
335 EXPECT_EQ(i, batcher->getCurrentBatchSize());
336 EXPECT_EQ(PublisherState::UNKNOWN, test_publisher->getPublisherState());
339 ASSERT_EQ(size, batcher->getTriggerBatchSize());
341 bool is_batched = cw_service->batchData(toBatch);
343 EXPECT_TRUE(is_batched);
345 test_publisher->wait_for(std::chrono::seconds(1));
347 EXPECT_EQ(1, test_publisher->getPublishAttempts());
348 EXPECT_EQ(1, test_publisher->getPublishSuccesses());
349 EXPECT_EQ(0u, batcher->getCurrentBatchSize());
350 EXPECT_EQ(PublisherState::CONNECTED, test_publisher->getPublisherState());
351 EXPECT_EQ(Aws::DataFlow::UploadStatus::SUCCESS, test_publisher->getLastUploadStatus());
356 std::shared_ptr<TestMetricFileManager> fileManager = std::make_shared<TestMetricFileManager>();
358 batcher->setMetricFileManager(fileManager);
362 EXPECT_EQ(0u, batcher->getCurrentBatchSize());
363 bool is_batched = cw_service->batchData(toBatch);
364 EXPECT_EQ(1u, batcher->getCurrentBatchSize());
365 EXPECT_EQ(
true, is_batched);
368 test_publisher->setForceFailure(
true);
371 bool b2 = cw_service->publishBatchedData();
372 test_publisher->wait_for(std::chrono::seconds(1));
375 EXPECT_EQ(0u, batcher->getCurrentBatchSize());
376 EXPECT_EQ(PublisherState::NOT_CONNECTED, test_publisher->getPublisherState());
377 EXPECT_EQ(0, test_publisher->getPublishSuccesses());
378 EXPECT_EQ(1, test_publisher->getPublishAttempts());
379 EXPECT_EQ(0.0f, test_publisher->getPublishSuccessPercentage());
381 fileManager->wait_for(std::chrono::seconds(1));
383 EXPECT_EQ(1, fileManager->written_count);
384 EXPECT_EQ(Aws::DataFlow::UploadStatus::FAIL, test_publisher->getLastUploadStatus());
390 std::shared_ptr<TestMetricFileManager> fileManager = std::make_shared<TestMetricFileManager>();
392 batcher->setMetricFileManager(fileManager);
396 EXPECT_EQ(0u, batcher->getCurrentBatchSize());
397 bool is_batched = cw_service->batchData(toBatch);
398 EXPECT_EQ(1u, batcher->getCurrentBatchSize());
399 EXPECT_EQ(
true, is_batched);
402 test_publisher->setForceInvalidDataFailure(
true);
405 bool b2 = cw_service->publishBatchedData();
406 test_publisher->wait_for(std::chrono::seconds(1));
409 EXPECT_EQ(0u, batcher->getCurrentBatchSize());
410 EXPECT_EQ(PublisherState::NOT_CONNECTED, test_publisher->getPublisherState());
411 EXPECT_EQ(0, test_publisher->getPublishSuccesses());
412 EXPECT_EQ(1, test_publisher->getPublishAttempts());
413 EXPECT_EQ(0.0f, test_publisher->getPublishSuccessPercentage());
415 fileManager->wait_for(std::chrono::seconds(1));
417 EXPECT_EQ(0, fileManager->written_count);
418 EXPECT_EQ(Aws::DataFlow::UploadStatus::INVALID_DATA, test_publisher->getLastUploadStatus());
423 std::random_device rd;
424 std::mt19937 gen(rd());
426 std::bernoulli_distribution d(0.5);
428 int expected_success = 0;
430 for(
int i=1; i<=50; i++) {
432 bool force_failure = d(gen);
434 std::shared_ptr<TestMetricFileManager> fileManager = std::make_shared<TestMetricFileManager>();
436 batcher->setMetricFileManager(fileManager);
440 EXPECT_EQ(0u, batcher->getCurrentBatchSize());
441 bool is_batched = cw_service->batchData(toBatch);
442 EXPECT_EQ(1u, batcher->getCurrentBatchSize());
443 EXPECT_EQ(
true, is_batched);
446 test_publisher->setForceFailure(force_failure);
448 if (!force_failure) {
452 bool b2 = cw_service->publishBatchedData();
453 test_publisher->wait_for(std::chrono::seconds(1));
456 EXPECT_EQ(0u, batcher->getCurrentBatchSize());
458 auto expected_state = force_failure ? PublisherState::NOT_CONNECTED : PublisherState::CONNECTED;
460 EXPECT_EQ(expected_state, test_publisher->getPublisherState());
461 EXPECT_EQ(expected_success, test_publisher->getPublishSuccesses());
462 EXPECT_EQ(i, test_publisher->getPublishAttempts());
463 EXPECT_EQ(force_failure ?
464 Aws::DataFlow::UploadStatus::FAIL: Aws::DataFlow::UploadStatus::SUCCESS,
465 test_publisher->getLastUploadStatus());
467 float expected_percentage =
static_cast<float>(expected_success) / static_cast<float>(i) * 100.0f;
468 EXPECT_FLOAT_EQ(expected_percentage, test_publisher->getPublishSuccessPercentage());
470 fileManager->wait_for_millis(std::chrono::milliseconds(100));
472 EXPECT_EQ(force_failure ? 1 : 0, fileManager->written_count);
479 std::shared_ptr<TestMetricFileManager> fileManager = std::make_shared<TestMetricFileManager>();
481 batcher->setMetricFileManager(fileManager);
482 batcher->setMaxAllowableBatchSize(max);
484 EXPECT_EQ(max, batcher->getMaxAllowableBatchSize());
486 for(
size_t i=1; i<=max; i++) {
488 bool is_batched = cw_service->batchData(toBatch);
490 EXPECT_TRUE(is_batched);
491 EXPECT_EQ(0, test_publisher->getPublishAttempts());
492 EXPECT_EQ(i, batcher->getCurrentBatchSize());
493 EXPECT_EQ(PublisherState::UNKNOWN, test_publisher->getPublisherState());
497 bool b = cw_service->batchData(toBatch);
500 EXPECT_EQ(0u, batcher->getCurrentBatchSize());
501 EXPECT_EQ(PublisherState::UNKNOWN, test_publisher->getPublisherState());
502 EXPECT_EQ(0, test_publisher->getPublishSuccesses());
503 EXPECT_EQ(0, test_publisher->getPublishAttempts());
504 EXPECT_EQ(0.0f, test_publisher->getPublishSuccessPercentage());
506 fileManager->wait_for_millis(std::chrono::milliseconds(200));
509 EXPECT_EQ(1, fileManager->written_count);
510 EXPECT_EQ(max + 1, fileManager->last_data_size);
513 int main(
int argc,
char ** argv)
515 testing::InitGoogleTest(&argc, argv);
516 return RUN_ALL_TESTS();
void setForceInvalidDataFailure(bool nv)
std::list< Aws::CloudWatch::Model::MetricDatum > MetricDatumCollection
std::condition_variable cv
TEST(MetricPipelineTest, TestCreateMetricObject)
MetricObject createTestMetricObjectWithStatisticValues(const std::string &name, const double value=2.42, const std::string &unit="gigawatts", const int64_t timestamp=1234, const std::map< std::string, std::string > &dimensions=std::map< std::string, std::string >(), const int storage_resolution=60)
int main(int argc, char **argv)
Aws::DataFlow::UploadStatus getLastUploadStatus()
std::shared_ptr< TaskObservedQueue< MetricDatumCollection > > stream_data_queue
Aws::DataFlow::UploadStatus last_upload_status
void write(const MetricDatumCollection &data) override
Aws::DataFlow::UploadStatus attemptPublish(MetricDatumCollection &data) override
Aws::DataFlow::UploadStatus attemptPublish(T &data) override
std::shared_ptr< Aws::DataFlow::QueueMonitor< TaskPtr< MetricDatumCollection > > > queue_monitor
std::shared_ptr< MetricBatcher > batcher
std::shared_ptr< MetricService > cw_service
bool force_invalid_data_failure
void setForceFailure(bool nv)
MetricObject createTestMetricObjectWithValue(const std::string &name, const double value=2.42, const std::string &unit="gigawatts", const int64_t timestamp=1234, const std::map< std::string, std::string > &dimensions=std::map< std::string, std::string >(), const int storage_resolution=60)
std::shared_ptr< TestPublisher > test_publisher
TEST_F(PipelineTest, Sanity)
Aws::DataFlow::UploadStatus publishData(MetricDatumCollection &) override
FileObject< MetricDatumCollection > readBatch(size_t batch_size) override