metric_pipeline_test.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  * http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 
17 #include <gtest/gtest.h>
18 #include <gmock/gmock.h>
19 
20 #include <chrono>
21 #include <condition_variable>
22 #include <mutex>
23 #include <random>
24 #include <string>
25 #include <thread>
26 #include <map>
27 
28 #include <aws/core/Aws.h>
29 #include <aws/core/utils/logging/ConsoleLogSystem.h>
30 
35 
37 
41 
42 using namespace Aws::CloudWatchMetrics;
43 using namespace Aws::CloudWatchMetrics::Utils;
44 using namespace Aws::FileManagement;
45 using namespace std::chrono_literals;
46 
50 class TestPublisher : public Publisher<MetricDatumCollection>, public Waiter
51 {
52 public:
54  force_failure = false;
55  force_invalid_data_failure = false;
56  last_upload_status = Aws::DataFlow::UploadStatus::UNKNOWN;
57  };
58 
59  ~TestPublisher() override = default ;
60 
61  bool start() override {
62  return Publisher::start();
63  }
64 
65  // notify just in case anyone is waiting
66  bool shutdown() override {
67  bool is_shutdown = Publisher::shutdown();
68  this->notify(); //don't leave anyone blocking
69  return is_shutdown;
70  };
71 
72  void setForceFailure(bool nv) {
73  force_failure = nv;
74  }
75 
77  force_invalid_data_failure = nv;
78  }
79 
81  return last_upload_status;
82  }
83 
84 protected:
85 
86  // override so we can notify when internal state changes, as attemptPublish sets state
88  last_upload_status = Publisher::attemptPublish(data);
89  {
90  this->notify();
91  }
92  return last_upload_status;
93  }
94 
96 
97  if (force_failure) {
98  return Aws::DataFlow::UploadStatus::FAIL;
99 
100  } else if (force_invalid_data_failure) {
101  return Aws::DataFlow::UploadStatus::INVALID_DATA;
102 
103  } else {
104  return Aws::DataFlow::UploadStatus::SUCCESS;
105  }
106  }
107 
111 };
112 
116 class TestMetricFileManager : public FileManager<MetricDatumCollection>, public Waiter
117 {
118 public:
119 
121  written_count.store(0);
122  }
123 
124  void write(const MetricDatumCollection & data) override {
125  last_data_size = data.size();
126  written_count++;
127  this->notify();
128  };
129 
130  FileObject<MetricDatumCollection> readBatch(size_t batch_size) override {
131  // do nothing
133  testFile.batch_size = batch_size;
134  return testFile;
135  }
136 
137  std::atomic<int> written_count{};
138  std::atomic<size_t> last_data_size{};
139  std::condition_variable cv; // todo think about adding this into the interface
140  mutable std::mutex mtx; // todo think about adding this into the interface
141 };
142 
143 
144 class PipelineTest : public ::testing::Test {
145 public:
146  void SetUp() override
147  {
148  test_publisher = std::make_shared<TestPublisher>();
149  batcher = std::make_shared<MetricBatcher>();
150 
151  // log service owns the streamer, batcher, and publisher
152  cw_service = std::make_shared<MetricService>(test_publisher, batcher);
153 
154  stream_data_queue = std::make_shared<TaskObservedQueue<MetricDatumCollection>>();
155  queue_monitor = std::make_shared<Aws::DataFlow::QueueMonitor<TaskPtr<MetricDatumCollection>>>();
156 
157  // create pipeline
158  batcher->setSink(stream_data_queue);
159  queue_monitor->addSource(stream_data_queue, Aws::DataFlow::PriorityOptions{Aws::DataFlow::HIGHEST_PRIORITY});
160  cw_service->setSource(queue_monitor);
161 
162  cw_service->start(); //this starts the worker thread
163  EXPECT_EQ(Aws::DataFlow::UploadStatus::UNKNOWN, test_publisher->getLastUploadStatus());
164  }
165 
166  void TearDown() override
167  {
168  if (cw_service) {
169  cw_service->shutdown();
170  cw_service->join(); // todo wait for shutdown is broken
171  }
172  }
173 
174 protected:
175  std::shared_ptr<TestPublisher> test_publisher;
176  std::shared_ptr<MetricBatcher> batcher;
177  std::shared_ptr<MetricService> cw_service;
178 
179  std::shared_ptr<TaskObservedQueue<MetricDatumCollection>> stream_data_queue;
180  std::shared_ptr<Aws::DataFlow::QueueMonitor<TaskPtr<MetricDatumCollection>>>queue_monitor;
181 };
182 
184  const std::string & name,
185  const double value = 2.42,
186  const std::string & unit = "gigawatts",
187  const int64_t timestamp = 1234,
188  const std::map<std::string, std::string> & dimensions = std::map<std::string, std::string>(),
189  const int storage_resolution = 60
190  ) {
191  return MetricObject {name, value, unit, timestamp, dimensions, storage_resolution};
192 }
193 
195  const std::string & name,
196  const double value = 2.42,
197  const std::string & unit = "gigawatts",
198  const int64_t timestamp = 1234,
199  const std::map<std::string, std::string> & dimensions = std::map<std::string, std::string>(),
200  const int storage_resolution = 60
201  ) {
202  std::map<StatisticValuesType, double> statistic_values = {
205  {StatisticValuesType::SUM, value},
207  };
208  return MetricObject{name, statistic_values, unit, timestamp, dimensions, storage_resolution};
209 }
210 
212  ASSERT_TRUE(true);
213 }
214 
215 TEST(MetricPipelineTest, TestCreateMetricObject) {
216 
217  std::string name("HeWhoShallNotBenamed");
218 
219  auto object = createTestMetricObjectWithValue(name);
220  EXPECT_EQ(name, object.metric_name);
221  EXPECT_EQ(2.42, object.value);
222  EXPECT_EQ("gigawatts", object.unit);
223  EXPECT_EQ(1234, object.timestamp);
224  EXPECT_TRUE(object.statistic_values.empty());
225  EXPECT_TRUE(object.dimensions.empty());
226  EXPECT_EQ(60, object.storage_resolution);
227 }
228 
229 TEST_F(PipelineTest, TestConvertToBatchedData) {
230 
231  const std::string metric_name = "test_object";
232  const std::string dimension_name = "blah";
233  const std::string dimension_value = "blah blah";
234 
235  auto object = createTestMetricObjectWithStatisticValues(metric_name);
236  object.unit = "percent";
237  object.statistic_values.clear();
238  object.statistic_values[StatisticValuesType::SUM] = 111.1;
239  object.statistic_values[StatisticValuesType::SAMPLE_COUNT] = 24;
240  object.dimensions[dimension_name] = dimension_value;
241 
242  auto datum = cw_service->convertInputToBatched(object);
243  EXPECT_EQ(metric_name, datum.GetMetricName().c_str());
244  EXPECT_EQ(Aws::CloudWatch::Model::StandardUnit::Percent, datum.GetUnit());
245  EXPECT_EQ(1234, datum.GetTimestamp().Millis());
246  EXPECT_EQ(60, datum.GetStorageResolution());
247 
248  const auto & statistic_values = datum.GetStatisticValues();
249  EXPECT_DOUBLE_EQ(24, statistic_values.GetSampleCount());
250  EXPECT_DOUBLE_EQ(111.1, statistic_values.GetSum());
251 
252  const auto & dimensions = datum.GetDimensions();
253  EXPECT_EQ(1u, dimensions.size());
254  EXPECT_EQ(dimension_name, dimensions[0].GetName().c_str());
255  EXPECT_EQ(dimension_value, dimensions[0].GetValue().c_str());
256 }
257 
261 TEST_F(PipelineTest, TestBatcherManualPublish) {
262 
263  auto toBatch = createTestMetricObjectWithValue(std::string("testMetric"));
264  EXPECT_EQ(0u, batcher->getCurrentBatchSize());
265  bool is_batched = cw_service->batchData(toBatch);
266  EXPECT_EQ(1u, batcher->getCurrentBatchSize());
267 
268  EXPECT_TRUE(is_batched);
269 
270  EXPECT_EQ(PublisherState::UNKNOWN, test_publisher->getPublisherState());
271  EXPECT_EQ(0, test_publisher->getPublishSuccesses());
272  EXPECT_EQ(0, test_publisher->getPublishAttempts());
273  EXPECT_EQ(0.0f, test_publisher->getPublishSuccessPercentage());
274 
275  // force a publish
276  bool b2 = cw_service->publishBatchedData();
277  test_publisher->wait_for(std::chrono::seconds(1));
278 
279  EXPECT_TRUE(b2);
280  EXPECT_EQ(0u, batcher->getCurrentBatchSize());
281  EXPECT_EQ(PublisherState::CONNECTED, test_publisher->getPublisherState());
282  EXPECT_EQ(1, test_publisher->getPublishSuccesses());
283  EXPECT_EQ(1, test_publisher->getPublishAttempts());
284  EXPECT_EQ(100.0f, test_publisher->getPublishSuccessPercentage());
285  EXPECT_EQ(Aws::DataFlow::UploadStatus::SUCCESS, test_publisher->getLastUploadStatus());
286 }
287 
291 TEST_F(PipelineTest, TestBatcherManualPublishMultipleItems) {
292 
293  auto toBatch = createTestMetricObjectWithValue(std::string("TestBatcherManualPublish"));
294  bool is_batched = cw_service->batchData(toBatch);
295  EXPECT_TRUE(is_batched);
296 
297  for(int i=99; i>0; i--) {
298  auto batchedBottles = createTestMetricObjectWithValue(std::to_string(99) + std::string(" bottles of beer on the wall"));
299  is_batched = cw_service->batchData(batchedBottles);
300  EXPECT_TRUE(is_batched);
301  }
302 
303  EXPECT_EQ(PublisherState::UNKNOWN, test_publisher->getPublisherState());
304  EXPECT_EQ(100u, batcher->getCurrentBatchSize());
305 
306  // force a publish
307  is_batched = cw_service->publishBatchedData();
308  test_publisher->wait_for(std::chrono::seconds(1));
309 
310  EXPECT_TRUE(is_batched);
311  EXPECT_EQ(1, test_publisher->getPublishSuccesses());
312  EXPECT_EQ(0u, batcher->getCurrentBatchSize());
313  EXPECT_EQ(PublisherState::CONNECTED, test_publisher->getPublisherState());
314  EXPECT_EQ(Aws::DataFlow::UploadStatus::SUCCESS, test_publisher->getLastUploadStatus());
315 }
316 
320 TEST_F(PipelineTest, TestBatcherSize) {
321 
322  EXPECT_EQ(SIZE_MAX, batcher->getTriggerBatchSize()); // not initialized
323  size_t size = 5;
324  batcher->setTriggerBatchSize(size); // setting the size will trigger a publish when the collection is full
325  EXPECT_EQ(size, batcher->getTriggerBatchSize());
326 
327  EXPECT_EQ(PublisherState::UNKNOWN, test_publisher->getPublisherState());
328 
329  for(size_t i=1; i<size; i++) {
330  auto toBatch = createTestMetricObjectWithValue(std::string("test message ") + std::to_string(i));
331  bool is_batched = cw_service->batchData(toBatch);
332 
333  EXPECT_TRUE(is_batched);
334  EXPECT_EQ(0, test_publisher->getPublishAttempts());
335  EXPECT_EQ(i, batcher->getCurrentBatchSize());
336  EXPECT_EQ(PublisherState::UNKNOWN, test_publisher->getPublisherState());
337  }
338 
339  ASSERT_EQ(size, batcher->getTriggerBatchSize());
340  auto toBatch = createTestMetricObjectWithValue(("test message " + std::to_string(size)));
341  bool is_batched = cw_service->batchData(toBatch);
342 
343  EXPECT_TRUE(is_batched);
344 
345  test_publisher->wait_for(std::chrono::seconds(1));
346 
347  EXPECT_EQ(1, test_publisher->getPublishAttempts());
348  EXPECT_EQ(1, test_publisher->getPublishSuccesses());
349  EXPECT_EQ(0u, batcher->getCurrentBatchSize());
350  EXPECT_EQ(PublisherState::CONNECTED, test_publisher->getPublisherState());
351  EXPECT_EQ(Aws::DataFlow::UploadStatus::SUCCESS, test_publisher->getLastUploadStatus());
352 }
353 
354 TEST_F(PipelineTest, TestSinglePublisherFailureToFileManager) {
355 
356  std::shared_ptr<TestMetricFileManager> fileManager = std::make_shared<TestMetricFileManager>();
357  // hookup to the service
358  batcher->setMetricFileManager(fileManager);
359 
360  // batch
361  auto toBatch = createTestMetricObjectWithValue(std::string("TestBatcherManualPublish"));
362  EXPECT_EQ(0u, batcher->getCurrentBatchSize());
363  bool is_batched = cw_service->batchData(toBatch);
364  EXPECT_EQ(1u, batcher->getCurrentBatchSize());
365  EXPECT_EQ(true, is_batched);
366 
367  // force failure
368  test_publisher->setForceFailure(true);
369 
370  // publish
371  bool b2 = cw_service->publishBatchedData();
372  test_publisher->wait_for(std::chrono::seconds(1));
373 
374  EXPECT_TRUE(b2);
375  EXPECT_EQ(0u, batcher->getCurrentBatchSize());
376  EXPECT_EQ(PublisherState::NOT_CONNECTED, test_publisher->getPublisherState());
377  EXPECT_EQ(0, test_publisher->getPublishSuccesses());
378  EXPECT_EQ(1, test_publisher->getPublishAttempts());
379  EXPECT_EQ(0.0f, test_publisher->getPublishSuccessPercentage());
380 
381  fileManager->wait_for(std::chrono::seconds(1));
382  //check that the filemanger callback worked
383  EXPECT_EQ(1, fileManager->written_count);
384  EXPECT_EQ(Aws::DataFlow::UploadStatus::FAIL, test_publisher->getLastUploadStatus());
385 }
386 
387 
388 TEST_F(PipelineTest, TestInvalidDataNotPassedToFileManager) {
389 
390  std::shared_ptr<TestMetricFileManager> fileManager = std::make_shared<TestMetricFileManager>();
391  // hookup to the service
392  batcher->setMetricFileManager(fileManager);
393 
394  // batch
395  auto toBatch = createTestMetricObjectWithValue(std::string("TestBatcherManualPublish"));
396  EXPECT_EQ(0u, batcher->getCurrentBatchSize());
397  bool is_batched = cw_service->batchData(toBatch);
398  EXPECT_EQ(1u, batcher->getCurrentBatchSize());
399  EXPECT_EQ(true, is_batched);
400 
401  // force failure
402  test_publisher->setForceInvalidDataFailure(true);
403 
404  // publish
405  bool b2 = cw_service->publishBatchedData();
406  test_publisher->wait_for(std::chrono::seconds(1));
407 
408  EXPECT_TRUE(b2);
409  EXPECT_EQ(0u, batcher->getCurrentBatchSize());
410  EXPECT_EQ(PublisherState::NOT_CONNECTED, test_publisher->getPublisherState());
411  EXPECT_EQ(0, test_publisher->getPublishSuccesses());
412  EXPECT_EQ(1, test_publisher->getPublishAttempts());
413  EXPECT_EQ(0.0f, test_publisher->getPublishSuccessPercentage());
414 
415  fileManager->wait_for(std::chrono::seconds(1));
416  //check that the filemanger callback worked
417  EXPECT_EQ(0, fileManager->written_count);
418  EXPECT_EQ(Aws::DataFlow::UploadStatus::INVALID_DATA, test_publisher->getLastUploadStatus());
419 }
420 
421 TEST_F(PipelineTest, TestPublisherIntermittant) {
422 
423  std::random_device rd;
424  std::mt19937 gen(rd());
425  // 50-50 chance
426  std::bernoulli_distribution d(0.5);
427 
428  int expected_success = 0;
429 
430  for(int i=1; i<=50; i++) {
431 
432  bool force_failure = d(gen);
433 
434  std::shared_ptr<TestMetricFileManager> fileManager = std::make_shared<TestMetricFileManager>();
435  // hookup to the service
436  batcher->setMetricFileManager(fileManager);
437 
438  // batch
439  auto toBatch = createTestMetricObjectWithValue(std::string("TestPublisherIntermittant"));
440  EXPECT_EQ(0u, batcher->getCurrentBatchSize());
441  bool is_batched = cw_service->batchData(toBatch);
442  EXPECT_EQ(1u, batcher->getCurrentBatchSize());
443  EXPECT_EQ(true, is_batched);
444 
445  // force failure
446  test_publisher->setForceFailure(force_failure);
447 
448  if (!force_failure) {
449  expected_success++;
450  }
451  // publish
452  bool b2 = cw_service->publishBatchedData();
453  test_publisher->wait_for(std::chrono::seconds(1));
454 
455  EXPECT_TRUE(b2);
456  EXPECT_EQ(0u, batcher->getCurrentBatchSize());
457 
458  auto expected_state = force_failure ? PublisherState::NOT_CONNECTED : PublisherState::CONNECTED;
459 
460  EXPECT_EQ(expected_state, test_publisher->getPublisherState());
461  EXPECT_EQ(expected_success, test_publisher->getPublishSuccesses());
462  EXPECT_EQ(i, test_publisher->getPublishAttempts());
463  EXPECT_EQ(force_failure ?
464  Aws::DataFlow::UploadStatus::FAIL: Aws::DataFlow::UploadStatus::SUCCESS,
465  test_publisher->getLastUploadStatus());
466 
467  float expected_percentage = static_cast<float>(expected_success) / static_cast<float>(i) * 100.0f;
468  EXPECT_FLOAT_EQ(expected_percentage, test_publisher->getPublishSuccessPercentage());
469 
470  fileManager->wait_for_millis(std::chrono::milliseconds(100));
471  //check that the filemanger callback worked
472  EXPECT_EQ(force_failure ? 1 : 0, fileManager->written_count);
473  }
474 }
475 
476 TEST_F(PipelineTest, TestBatchDataTooFast) {
477 
478  size_t max = 10;
479  std::shared_ptr<TestMetricFileManager> fileManager = std::make_shared<TestMetricFileManager>();
480  // hookup to the service
481  batcher->setMetricFileManager(fileManager);
482  batcher->setMaxAllowableBatchSize(max);
483 
484  EXPECT_EQ(max, batcher->getMaxAllowableBatchSize());
485 
486  for(size_t i=1; i<=max; i++) {
487  auto toBatch = createTestMetricObjectWithValue(std::string("test message " + std::to_string(i)));
488  bool is_batched = cw_service->batchData(toBatch);
489 
490  EXPECT_TRUE(is_batched);
491  EXPECT_EQ(0, test_publisher->getPublishAttempts());
492  EXPECT_EQ(i, batcher->getCurrentBatchSize());
493  EXPECT_EQ(PublisherState::UNKNOWN, test_publisher->getPublisherState());
494  }
495 
496  auto toBatch = createTestMetricObjectWithValue(std::string("iocaine powder"));
497  bool b = cw_service->batchData(toBatch);
498 
499  EXPECT_FALSE(b);
500  EXPECT_EQ(0u, batcher->getCurrentBatchSize());
501  EXPECT_EQ(PublisherState::UNKNOWN, test_publisher->getPublisherState()); // hasn't changed since not attempted
502  EXPECT_EQ(0, test_publisher->getPublishSuccesses());
503  EXPECT_EQ(0, test_publisher->getPublishAttempts());
504  EXPECT_EQ(0.0f, test_publisher->getPublishSuccessPercentage());
505 
506  fileManager->wait_for_millis(std::chrono::milliseconds(200));
507 
508  // check that the filemanger callback worked
509  EXPECT_EQ(1, fileManager->written_count);
510  EXPECT_EQ(max + 1, fileManager->last_data_size);
511 }
512 
513 int main(int argc, char ** argv)
514 {
515  testing::InitGoogleTest(&argc, argv);
516  return RUN_ALL_TESTS();
517 }
void setForceInvalidDataFailure(bool nv)
void SetUp() override
std::list< Aws::CloudWatch::Model::MetricDatum > MetricDatumCollection
Definition: definitions.h:23
std::condition_variable cv
TEST(MetricPipelineTest, TestCreateMetricObject)
bool shutdown() override
MetricObject createTestMetricObjectWithStatisticValues(const std::string &name, const double value=2.42, const std::string &unit="gigawatts", const int64_t timestamp=1234, const std::map< std::string, std::string > &dimensions=std::map< std::string, std::string >(), const int storage_resolution=60)
int main(int argc, char **argv)
Aws::DataFlow::UploadStatus getLastUploadStatus()
std::shared_ptr< TaskObservedQueue< MetricDatumCollection > > stream_data_queue
Aws::DataFlow::UploadStatus last_upload_status
void write(const MetricDatumCollection &data) override
bool shutdown() override
Aws::DataFlow::UploadStatus attemptPublish(MetricDatumCollection &data) override
Aws::DataFlow::UploadStatus attemptPublish(T &data) override
std::shared_ptr< Aws::DataFlow::QueueMonitor< TaskPtr< MetricDatumCollection > > > queue_monitor
std::shared_ptr< MetricBatcher > batcher
bool start() override
std::shared_ptr< MetricService > cw_service
void setForceFailure(bool nv)
MetricObject createTestMetricObjectWithValue(const std::string &name, const double value=2.42, const std::string &unit="gigawatts", const int64_t timestamp=1234, const std::map< std::string, std::string > &dimensions=std::map< std::string, std::string >(), const int storage_resolution=60)
std::shared_ptr< TestPublisher > test_publisher
TEST_F(PipelineTest, Sanity)
virtual bool start()
Aws::DataFlow::UploadStatus publishData(MetricDatumCollection &) override
void TearDown() override
FileObject< MetricDatumCollection > readBatch(size_t batch_size) override


cloudwatch_metrics_common
Author(s): AWS RoboMaker
autogenerated on Fri May 7 2021 02:18:25