pipeline_test.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  * http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 
17 #include <gtest/gtest.h>
18 #include <gmock/gmock.h>
19 
20 #include <chrono>
21 #include <condition_variable>
22 #include <mutex>
23 #include <random>
24 #include <string>
25 #include <thread>
26 
27 #include <aws/core/Aws.h>
28 #include <aws/core/utils/logging/ConsoleLogSystem.h>
29 
33 
35 
38 
39 using namespace Aws::CloudWatchLogs;
40 using namespace Aws::CloudWatchLogs::Utils;
41 using namespace Aws::FileManagement;
42 using namespace std::chrono_literals;
43 
47 class TestPublisher : public Publisher<std::list<Aws::CloudWatchLogs::Model::InputLogEvent>>, public Waiter
48 {
49 public:
51  force_failure = false;
52  force_invalid_data_failure = false;
53  last_upload_status = Aws::DataFlow::UploadStatus::UNKNOWN;
54  };
55  ~TestPublisher() override = default;
56 
57  bool start() override {
58  return Publisher::start();
59  }
60 
61  // notify just in case anyone is waiting
62  bool shutdown() override {
63  bool is_shutdown = Publisher::shutdown();
64  this->notify(); //don't leave anyone blocking
65  return is_shutdown;
66  };
67 
68  void setForceFailure(bool nv) {
69  force_failure = nv;
70  }
71 
73  force_invalid_data_failure = nv;
74  }
75 
77  return last_upload_status;
78  }
79 
80 protected:
81 
82  // override so we can notify when internal state changes, as attemptPublish sets state
83  Aws::DataFlow::UploadStatus attemptPublish(std::list<Aws::CloudWatchLogs::Model::InputLogEvent> &data) override {
84  last_upload_status = Publisher::attemptPublish(data);
85  {
86  this->notify();
87  }
88  return last_upload_status;
89  }
90 
91  Aws::DataFlow::UploadStatus publishData(std::list<Aws::CloudWatchLogs::Model::InputLogEvent>&) override {
92 
93  if (force_failure) {
94  return Aws::DataFlow::UploadStatus::FAIL;
95 
96  } else if (force_invalid_data_failure) {
97  return Aws::DataFlow::UploadStatus::INVALID_DATA;
98 
99  } else {
100  return Aws::DataFlow::UploadStatus::SUCCESS;
101  }
102  }
103 
107 };
108 
112 class TestLogFileManager : public FileManager<LogCollection>, public Waiter
113 {
114 public:
115 
117  written_count.store(0);
118  }
119 
120  void write(const LogCollection & data) override {
121  last_data_size = data.size();
122  written_count++;
123  this->notify();
124  };
125 
126  FileObject<LogCollection> readBatch(size_t batch_size) override {
127  // do nothing
128  FileObject<LogCollection> testFile;
129  testFile.batch_size = batch_size;
130  return testFile;
131  }
132 
133  std::atomic<int> written_count{};
134  std::atomic<size_t> last_data_size{};
135  std::condition_variable cv;
136  mutable std::mutex mtx;
137 };
138 
139 
140 class PipelineTest : public ::testing::Test {
141 public:
142  void SetUp() override
143  {
144  test_publisher = std::make_shared<TestPublisher>();
145  batcher = std::make_shared<LogBatcher>();
146 
147  // log service owns the streamer, batcher, and publisher
148  cw_service = std::make_shared<LogService>(test_publisher, batcher);
149 
150  stream_data_queue = std::make_shared<TaskObservedQueue<LogCollection>>();
151  queue_monitor = std::make_shared<Aws::DataFlow::QueueMonitor<TaskPtr<LogCollection>>>();
152 
153  // create pipeline
154  batcher->setSink(stream_data_queue);
155  queue_monitor->addSource(stream_data_queue, Aws::DataFlow::PriorityOptions{Aws::DataFlow::HIGHEST_PRIORITY});
156  cw_service->setSource(queue_monitor);
157 
158  cw_service->start(); //this starts the worker thread
159  EXPECT_EQ(Aws::DataFlow::UploadStatus::UNKNOWN, test_publisher->getLastUploadStatus());
160  }
161 
162  void TearDown() override
163  {
164  if (cw_service) {
165  cw_service->shutdown();
166  cw_service->join();
167  }
168  }
169 
170 protected:
171  std::shared_ptr<TestPublisher> test_publisher;
172  std::shared_ptr<LogBatcher> batcher;
173  std::shared_ptr<LogService> cw_service;
174 
175  std::shared_ptr<TaskObservedQueue<LogCollection>> stream_data_queue;
176  std::shared_ptr<Aws::DataFlow::QueueMonitor<TaskPtr<LogCollection>>>queue_monitor;
177 };
178 
180  ASSERT_TRUE(true);
181 }
182 
186 TEST_F(PipelineTest, TestBatcherManualPublish) {
187 
188  std::string toBatch("TestBatcherManualPublish");
189  EXPECT_EQ(0u, batcher->getCurrentBatchSize());
190  bool is_batched = cw_service->batchData(toBatch);
191  EXPECT_EQ(1u, batcher->getCurrentBatchSize());
192 
193  EXPECT_TRUE(is_batched);
194 
195  EXPECT_EQ(PublisherState::UNKNOWN, test_publisher->getPublisherState());
196  EXPECT_FALSE(cw_service->isConnected());
197  EXPECT_EQ(0, test_publisher->getPublishSuccesses());
198  EXPECT_EQ(0, test_publisher->getPublishAttempts());
199  EXPECT_EQ(0.0f, test_publisher->getPublishSuccessPercentage());
200 
201  // force a publish
202  bool b2 = cw_service->publishBatchedData();
203  test_publisher->wait_for(std::chrono::seconds(1));
204 
205  EXPECT_TRUE(b2);
206  EXPECT_EQ(0u, batcher->getCurrentBatchSize());
207  EXPECT_EQ(PublisherState::CONNECTED, test_publisher->getPublisherState());
208  EXPECT_TRUE(cw_service->isConnected());
209  EXPECT_EQ(1, test_publisher->getPublishSuccesses());
210  EXPECT_EQ(1, test_publisher->getPublishAttempts());
211  EXPECT_EQ(100.0f, test_publisher->getPublishSuccessPercentage());
212  EXPECT_EQ(Aws::DataFlow::UploadStatus::SUCCESS, test_publisher->getLastUploadStatus());
213 }
214 
218 TEST_F(PipelineTest, TestBatcherManualPublishMultipleItems) {
219 
220  std::string toBatch("TestBatcherManualPublish");
221  bool is_batched = cw_service->batchData(toBatch);
222  EXPECT_TRUE(is_batched);
223 
224  for(int i=99; i>0; i--) {
225  is_batched = cw_service->batchData(std::to_string(99) + std::string(" bottles of beer on the wall"));
226  EXPECT_TRUE(is_batched);
227  }
228 
229  EXPECT_TRUE(is_batched);
230 
231  EXPECT_EQ(PublisherState::UNKNOWN, test_publisher->getPublisherState());
232  EXPECT_FALSE(cw_service->isConnected());
233  EXPECT_EQ(100u, batcher->getCurrentBatchSize());
234 
235  // force a publish
236  bool b2 = cw_service->publishBatchedData();
237  test_publisher->wait_for(std::chrono::seconds(1));
238 
239  EXPECT_TRUE(b2);
240  EXPECT_EQ(1, test_publisher->getPublishSuccesses());
241  EXPECT_EQ(0u, batcher->getCurrentBatchSize());
242  EXPECT_EQ(PublisherState::CONNECTED, test_publisher->getPublisherState());
243  EXPECT_TRUE(cw_service->isConnected());
244  EXPECT_EQ(Aws::DataFlow::UploadStatus::SUCCESS, test_publisher->getLastUploadStatus());
245 }
246 
250 TEST_F(PipelineTest, TestBatcherSize) {
251 
252  EXPECT_EQ(SIZE_MAX, batcher->getTriggerBatchSize()); // not initialized
253  size_t size = 5;
254  batcher->setTriggerBatchSize(size); // setting the size will trigger a publish when the collection is full
255  EXPECT_EQ(size, batcher->getTriggerBatchSize());
256 
257  EXPECT_EQ(PublisherState::UNKNOWN, test_publisher->getPublisherState());
258  EXPECT_FALSE(cw_service->isConnected());
259 
260  for(size_t i=1; i<size; i++) {
261  std::string toBatch("test message " + std::to_string(i));
262  bool is_batched = cw_service->batchData(toBatch);
263 
264  EXPECT_TRUE(is_batched);
265  EXPECT_EQ(0, test_publisher->getPublishAttempts());
266  EXPECT_EQ(i, batcher->getCurrentBatchSize());
267  EXPECT_EQ(PublisherState::UNKNOWN, test_publisher->getPublisherState());
268  EXPECT_FALSE(cw_service->isConnected());
269  }
270 
271  ASSERT_EQ(size, batcher->getTriggerBatchSize());
272  std::string toBatch("test message publish trigger");
273  bool is_batched = cw_service->batchData(toBatch);
274 
275  EXPECT_TRUE(is_batched);
276 
277  test_publisher->wait_for(std::chrono::seconds(1));
278 
279  EXPECT_EQ(1, test_publisher->getPublishAttempts());
280  EXPECT_EQ(1, test_publisher->getPublishSuccesses());
281  EXPECT_EQ(0u, batcher->getCurrentBatchSize());
282  EXPECT_EQ(PublisherState::CONNECTED, test_publisher->getPublisherState());
283  EXPECT_TRUE(cw_service->isConnected());
284  EXPECT_EQ(Aws::DataFlow::UploadStatus::SUCCESS, test_publisher->getLastUploadStatus());
285 }
286 
287 TEST_F(PipelineTest, TestSinglePublisherFailureToFileManager) {
288 
289  std::shared_ptr<TestLogFileManager> fileManager = std::make_shared<TestLogFileManager>();
290  // hookup to the service
291  batcher->setLogFileManager(fileManager);
292 
293  // batch
294  std::string toBatch("TestBatcherManualPublish");
295  EXPECT_EQ(0u, batcher->getCurrentBatchSize());
296  bool is_batched = cw_service->batchData(toBatch);
297  EXPECT_EQ(1u, batcher->getCurrentBatchSize());
298  EXPECT_EQ(true, is_batched);
299 
300  // force failure
301  test_publisher->setForceFailure(true);
302 
303  // publish
304  bool b2 = cw_service->publishBatchedData();
305  test_publisher->wait_for(std::chrono::seconds(1));
306 
307  EXPECT_TRUE(b2);
308  EXPECT_EQ(0u, batcher->getCurrentBatchSize());
309  EXPECT_EQ(PublisherState::NOT_CONNECTED, test_publisher->getPublisherState());
310  EXPECT_FALSE(cw_service->isConnected());
311  EXPECT_EQ(0, test_publisher->getPublishSuccesses());
312  EXPECT_EQ(1, test_publisher->getPublishAttempts());
313  EXPECT_EQ(0.0f, test_publisher->getPublishSuccessPercentage());
314 
315  fileManager->wait_for(std::chrono::seconds(1));
316  //check that the filemanger callback worked
317  EXPECT_EQ(1, fileManager->written_count);
318  EXPECT_EQ(Aws::DataFlow::UploadStatus::FAIL, test_publisher->getLastUploadStatus());
319 }
320 
321 TEST_F(PipelineTest, TestInvalidDataNotPassedToFileManager) {
322 
323  std::shared_ptr<TestLogFileManager> fileManager = std::make_shared<TestLogFileManager>();
324  // hookup to the service
325  batcher->setLogFileManager(fileManager);
326 
327  // batch
328  std::string toBatch("TestBatcherManualPublish");
329  EXPECT_EQ(0u, batcher->getCurrentBatchSize());
330  bool is_batched = cw_service->batchData(toBatch);
331  EXPECT_EQ(1u, batcher->getCurrentBatchSize());
332  EXPECT_EQ(true, is_batched);
333 
334  // force failure
335  test_publisher->setForceInvalidDataFailure(true);
336 
337  // publish
338  bool b2 = cw_service->publishBatchedData();
339  test_publisher->wait_for(std::chrono::seconds(1));
340 
341  EXPECT_TRUE(b2);
342  EXPECT_EQ(0u, batcher->getCurrentBatchSize());
343  EXPECT_EQ(PublisherState::NOT_CONNECTED, test_publisher->getPublisherState());
344  EXPECT_FALSE(cw_service->isConnected());
345  EXPECT_EQ(0, test_publisher->getPublishSuccesses());
346  EXPECT_EQ(1, test_publisher->getPublishAttempts());
347  EXPECT_EQ(0.0f, test_publisher->getPublishSuccessPercentage());
348 
349  fileManager->wait_for(std::chrono::seconds(1)); // wait just in case for writes
350  EXPECT_EQ(0, fileManager->written_count);
351  EXPECT_EQ(Aws::DataFlow::UploadStatus::INVALID_DATA, test_publisher->getLastUploadStatus());
352 }
353 
354 TEST_F(PipelineTest, TestPublisherIntermittant) {
355 
356  std::random_device rd;
357  std::mt19937 gen(rd());
358  // 50-50 chance
359  std::bernoulli_distribution d(0.5);
360 
361  int expected_success = 0;
362 
363  for(int i=1; i<=50; i++) {
364 
365  bool force_failure = d(gen);
366 
367  std::shared_ptr<TestLogFileManager> fileManager = std::make_shared<TestLogFileManager>();
368  // hookup to the service
369  batcher->setLogFileManager(fileManager);
370 
371  // batch
372  std::string toBatch("TestPublisherIntermittant");
373  EXPECT_EQ(0u, batcher->getCurrentBatchSize());
374  bool is_batched = cw_service->batchData(toBatch);
375  EXPECT_EQ(1u, batcher->getCurrentBatchSize());
376  EXPECT_EQ(true, is_batched);
377 
378  // force failure
379  test_publisher->setForceFailure(force_failure);
380 
381  if (!force_failure) {
382  expected_success++;
383  }
384 
385  // publish
386  is_batched = cw_service->publishBatchedData();
387  test_publisher->wait_for(std::chrono::seconds(1));
388 
389  EXPECT_TRUE(is_batched);
390  EXPECT_EQ(0u, batcher->getCurrentBatchSize());
391 
392  auto expected_state = force_failure ? PublisherState::NOT_CONNECTED : PublisherState::CONNECTED;
393 
394  EXPECT_EQ(expected_state, test_publisher->getPublisherState());
395  EXPECT_EQ(!force_failure, cw_service->isConnected()); // if failure forced then not connected
396  EXPECT_EQ(force_failure ?
397  Aws::DataFlow::UploadStatus::FAIL: Aws::DataFlow::UploadStatus::SUCCESS,
398  test_publisher->getLastUploadStatus());
399 
400  EXPECT_EQ(expected_success, test_publisher->getPublishSuccesses());
401  EXPECT_EQ(i, test_publisher->getPublishAttempts());
402 
403  float expected_percentage = static_cast<float>(expected_success) / static_cast<float>(i) * 100.0f;
404  EXPECT_FLOAT_EQ(expected_percentage, test_publisher->getPublishSuccessPercentage());
405 
406  fileManager->wait_for_millis(std::chrono::milliseconds(100));
407  //check that the filemanger callback worked
408  EXPECT_EQ(force_failure ? 1 : 0, fileManager->written_count);
409  }
410 }
411 
412 TEST_F(PipelineTest, TestBatchDataTooFast) {
413 
414  size_t max = 10;
415  std::shared_ptr<TestLogFileManager> fileManager = std::make_shared<TestLogFileManager>();
416  // hookup to the service
417  batcher->setLogFileManager(fileManager);
418  batcher->setMaxAllowableBatchSize(max);
419 
420  EXPECT_EQ(max, batcher->getMaxAllowableBatchSize());
421 
422  for(size_t i=1; i<=max; i++) {
423  std::string toBatch("test message " + std::to_string(i));
424  bool is_batched = cw_service->batchData(toBatch);
425 
426  EXPECT_TRUE(is_batched);
427  EXPECT_EQ(0, test_publisher->getPublishAttempts());
428  EXPECT_EQ(i, batcher->getCurrentBatchSize());
429  EXPECT_EQ(PublisherState::UNKNOWN, test_publisher->getPublisherState());
430  }
431 
432  std::string toBatch("iocaine powder");
433  bool is_batched = cw_service->batchData(toBatch);
434 
435  EXPECT_FALSE(is_batched);
436  EXPECT_EQ(0u, batcher->getCurrentBatchSize());
437  EXPECT_EQ(PublisherState::UNKNOWN, test_publisher->getPublisherState()); // hasn't changed since not attempted
438  EXPECT_FALSE(cw_service->isConnected());
439  EXPECT_EQ(0, test_publisher->getPublishSuccesses());
440  EXPECT_EQ(0, test_publisher->getPublishAttempts());
441  EXPECT_EQ(0.0f, test_publisher->getPublishSuccessPercentage());
442 
443  fileManager->wait_for_millis(std::chrono::milliseconds(200));
444 
445  // check that the filemanger callback worked
446  EXPECT_EQ(1, fileManager->written_count);
447  EXPECT_EQ(max + 1, fileManager->last_data_size);
448 }
449 
450 int main(int argc, char ** argv)
451 {
452  testing::InitGoogleTest(&argc, argv);
453  return RUN_ALL_TESTS();
454 }
void setForceInvalidDataFailure(bool nv)
FileObject< LogCollection > readBatch(size_t batch_size) override
std::shared_ptr< Aws::DataFlow::QueueMonitor< TaskPtr< LogCollection > > > queue_monitor
void SetUp() override
bool shutdown() override
Aws::DataFlow::UploadStatus publishData(std::list< Aws::CloudWatchLogs::Model::InputLogEvent > &) override
Aws::DataFlow::UploadStatus getLastUploadStatus()
Aws::DataFlow::UploadStatus attemptPublish(std::list< Aws::CloudWatchLogs::Model::InputLogEvent > &data) override
Aws::DataFlow::UploadStatus last_upload_status
bool shutdown() override
Aws::DataFlow::UploadStatus attemptPublish(T &data) override
int main(int argc, char **argv)
std::list< LogType > LogCollection
Definition: definitions.h:29
std::shared_ptr< LogService > cw_service
bool start() override
TEST_F(PipelineTest, Sanity)
std::condition_variable cv
bool force_invalid_data_failure
void setForceFailure(bool nv)
void write(const LogCollection &data) override
std::shared_ptr< LogBatcher > batcher
std::shared_ptr< TestPublisher > test_publisher
virtual bool start()
void TearDown() override
std::shared_ptr< TaskObservedQueue< LogCollection > > stream_data_queue


cloudwatch_logs_common
Author(s): AWS RoboMaker
autogenerated on Fri May 7 2021 02:18:24