17 #include <gtest/gtest.h> 18 #include <gmock/gmock.h> 21 #include <condition_variable> 27 #include <aws/core/Aws.h> 28 #include <aws/core/utils/logging/ConsoleLogSystem.h> 51 force_failure =
false;
52 force_invalid_data_failure =
false;
53 last_upload_status = Aws::DataFlow::UploadStatus::UNKNOWN;
73 force_invalid_data_failure = nv;
77 return last_upload_status;
88 return last_upload_status;
94 return Aws::DataFlow::UploadStatus::FAIL;
96 }
else if (force_invalid_data_failure) {
97 return Aws::DataFlow::UploadStatus::INVALID_DATA;
100 return Aws::DataFlow::UploadStatus::SUCCESS;
117 written_count.store(0);
121 last_data_size = data.size();
133 std::atomic<int> written_count{};
134 std::atomic<size_t> last_data_size{};
135 std::condition_variable
cv;
144 test_publisher = std::make_shared<TestPublisher>();
145 batcher = std::make_shared<LogBatcher>();
148 cw_service = std::make_shared<LogService>(test_publisher, batcher);
150 stream_data_queue = std::make_shared<TaskObservedQueue<LogCollection>>();
151 queue_monitor = std::make_shared<Aws::DataFlow::QueueMonitor<TaskPtr<LogCollection>>>();
154 batcher->setSink(stream_data_queue);
156 cw_service->setSource(queue_monitor);
159 EXPECT_EQ(Aws::DataFlow::UploadStatus::UNKNOWN, test_publisher->getLastUploadStatus());
165 cw_service->shutdown();
176 std::shared_ptr<Aws::DataFlow::QueueMonitor<TaskPtr<LogCollection>>>
queue_monitor;
188 std::string toBatch(
"TestBatcherManualPublish");
189 EXPECT_EQ(0u, batcher->getCurrentBatchSize());
190 bool is_batched = cw_service->batchData(toBatch);
191 EXPECT_EQ(1u, batcher->getCurrentBatchSize());
193 EXPECT_TRUE(is_batched);
195 EXPECT_EQ(PublisherState::UNKNOWN, test_publisher->getPublisherState());
196 EXPECT_FALSE(cw_service->isConnected());
197 EXPECT_EQ(0, test_publisher->getPublishSuccesses());
198 EXPECT_EQ(0, test_publisher->getPublishAttempts());
199 EXPECT_EQ(0.0f, test_publisher->getPublishSuccessPercentage());
202 bool b2 = cw_service->publishBatchedData();
203 test_publisher->wait_for(std::chrono::seconds(1));
206 EXPECT_EQ(0u, batcher->getCurrentBatchSize());
207 EXPECT_EQ(PublisherState::CONNECTED, test_publisher->getPublisherState());
208 EXPECT_TRUE(cw_service->isConnected());
209 EXPECT_EQ(1, test_publisher->getPublishSuccesses());
210 EXPECT_EQ(1, test_publisher->getPublishAttempts());
211 EXPECT_EQ(100.0f, test_publisher->getPublishSuccessPercentage());
212 EXPECT_EQ(Aws::DataFlow::UploadStatus::SUCCESS, test_publisher->getLastUploadStatus());
220 std::string toBatch(
"TestBatcherManualPublish");
221 bool is_batched = cw_service->batchData(toBatch);
222 EXPECT_TRUE(is_batched);
224 for(
int i=99; i>0; i--) {
225 is_batched = cw_service->batchData(std::to_string(99) + std::string(
" bottles of beer on the wall"));
226 EXPECT_TRUE(is_batched);
229 EXPECT_TRUE(is_batched);
231 EXPECT_EQ(PublisherState::UNKNOWN, test_publisher->getPublisherState());
232 EXPECT_FALSE(cw_service->isConnected());
233 EXPECT_EQ(100u, batcher->getCurrentBatchSize());
236 bool b2 = cw_service->publishBatchedData();
237 test_publisher->wait_for(std::chrono::seconds(1));
240 EXPECT_EQ(1, test_publisher->getPublishSuccesses());
241 EXPECT_EQ(0u, batcher->getCurrentBatchSize());
242 EXPECT_EQ(PublisherState::CONNECTED, test_publisher->getPublisherState());
243 EXPECT_TRUE(cw_service->isConnected());
244 EXPECT_EQ(Aws::DataFlow::UploadStatus::SUCCESS, test_publisher->getLastUploadStatus());
252 EXPECT_EQ(SIZE_MAX, batcher->getTriggerBatchSize());
254 batcher->setTriggerBatchSize(size);
255 EXPECT_EQ(size, batcher->getTriggerBatchSize());
257 EXPECT_EQ(PublisherState::UNKNOWN, test_publisher->getPublisherState());
258 EXPECT_FALSE(cw_service->isConnected());
260 for(
size_t i=1; i<size; i++) {
261 std::string toBatch(
"test message " + std::to_string(i));
262 bool is_batched = cw_service->batchData(toBatch);
264 EXPECT_TRUE(is_batched);
265 EXPECT_EQ(0, test_publisher->getPublishAttempts());
266 EXPECT_EQ(i, batcher->getCurrentBatchSize());
267 EXPECT_EQ(PublisherState::UNKNOWN, test_publisher->getPublisherState());
268 EXPECT_FALSE(cw_service->isConnected());
271 ASSERT_EQ(size, batcher->getTriggerBatchSize());
272 std::string toBatch(
"test message publish trigger");
273 bool is_batched = cw_service->batchData(toBatch);
275 EXPECT_TRUE(is_batched);
277 test_publisher->wait_for(std::chrono::seconds(1));
279 EXPECT_EQ(1, test_publisher->getPublishAttempts());
280 EXPECT_EQ(1, test_publisher->getPublishSuccesses());
281 EXPECT_EQ(0u, batcher->getCurrentBatchSize());
282 EXPECT_EQ(PublisherState::CONNECTED, test_publisher->getPublisherState());
283 EXPECT_TRUE(cw_service->isConnected());
284 EXPECT_EQ(Aws::DataFlow::UploadStatus::SUCCESS, test_publisher->getLastUploadStatus());
289 std::shared_ptr<TestLogFileManager> fileManager = std::make_shared<TestLogFileManager>();
291 batcher->setLogFileManager(fileManager);
294 std::string toBatch(
"TestBatcherManualPublish");
295 EXPECT_EQ(0u, batcher->getCurrentBatchSize());
296 bool is_batched = cw_service->batchData(toBatch);
297 EXPECT_EQ(1u, batcher->getCurrentBatchSize());
298 EXPECT_EQ(
true, is_batched);
301 test_publisher->setForceFailure(
true);
304 bool b2 = cw_service->publishBatchedData();
305 test_publisher->wait_for(std::chrono::seconds(1));
308 EXPECT_EQ(0u, batcher->getCurrentBatchSize());
309 EXPECT_EQ(PublisherState::NOT_CONNECTED, test_publisher->getPublisherState());
310 EXPECT_FALSE(cw_service->isConnected());
311 EXPECT_EQ(0, test_publisher->getPublishSuccesses());
312 EXPECT_EQ(1, test_publisher->getPublishAttempts());
313 EXPECT_EQ(0.0f, test_publisher->getPublishSuccessPercentage());
315 fileManager->wait_for(std::chrono::seconds(1));
317 EXPECT_EQ(1, fileManager->written_count);
318 EXPECT_EQ(Aws::DataFlow::UploadStatus::FAIL, test_publisher->getLastUploadStatus());
323 std::shared_ptr<TestLogFileManager> fileManager = std::make_shared<TestLogFileManager>();
325 batcher->setLogFileManager(fileManager);
328 std::string toBatch(
"TestBatcherManualPublish");
329 EXPECT_EQ(0u, batcher->getCurrentBatchSize());
330 bool is_batched = cw_service->batchData(toBatch);
331 EXPECT_EQ(1u, batcher->getCurrentBatchSize());
332 EXPECT_EQ(
true, is_batched);
335 test_publisher->setForceInvalidDataFailure(
true);
338 bool b2 = cw_service->publishBatchedData();
339 test_publisher->wait_for(std::chrono::seconds(1));
342 EXPECT_EQ(0u, batcher->getCurrentBatchSize());
343 EXPECT_EQ(PublisherState::NOT_CONNECTED, test_publisher->getPublisherState());
344 EXPECT_FALSE(cw_service->isConnected());
345 EXPECT_EQ(0, test_publisher->getPublishSuccesses());
346 EXPECT_EQ(1, test_publisher->getPublishAttempts());
347 EXPECT_EQ(0.0f, test_publisher->getPublishSuccessPercentage());
349 fileManager->wait_for(std::chrono::seconds(1));
350 EXPECT_EQ(0, fileManager->written_count);
351 EXPECT_EQ(Aws::DataFlow::UploadStatus::INVALID_DATA, test_publisher->getLastUploadStatus());
356 std::random_device rd;
357 std::mt19937 gen(rd());
359 std::bernoulli_distribution d(0.5);
361 int expected_success = 0;
363 for(
int i=1; i<=50; i++) {
365 bool force_failure = d(gen);
367 std::shared_ptr<TestLogFileManager> fileManager = std::make_shared<TestLogFileManager>();
369 batcher->setLogFileManager(fileManager);
372 std::string toBatch(
"TestPublisherIntermittant");
373 EXPECT_EQ(0u, batcher->getCurrentBatchSize());
374 bool is_batched = cw_service->batchData(toBatch);
375 EXPECT_EQ(1u, batcher->getCurrentBatchSize());
376 EXPECT_EQ(
true, is_batched);
379 test_publisher->setForceFailure(force_failure);
381 if (!force_failure) {
386 is_batched = cw_service->publishBatchedData();
387 test_publisher->wait_for(std::chrono::seconds(1));
389 EXPECT_TRUE(is_batched);
390 EXPECT_EQ(0u, batcher->getCurrentBatchSize());
392 auto expected_state = force_failure ? PublisherState::NOT_CONNECTED : PublisherState::CONNECTED;
394 EXPECT_EQ(expected_state, test_publisher->getPublisherState());
395 EXPECT_EQ(!force_failure, cw_service->isConnected());
396 EXPECT_EQ(force_failure ?
397 Aws::DataFlow::UploadStatus::FAIL: Aws::DataFlow::UploadStatus::SUCCESS,
398 test_publisher->getLastUploadStatus());
400 EXPECT_EQ(expected_success, test_publisher->getPublishSuccesses());
401 EXPECT_EQ(i, test_publisher->getPublishAttempts());
403 float expected_percentage =
static_cast<float>(expected_success) / static_cast<float>(i) * 100.0f;
404 EXPECT_FLOAT_EQ(expected_percentage, test_publisher->getPublishSuccessPercentage());
406 fileManager->wait_for_millis(std::chrono::milliseconds(100));
408 EXPECT_EQ(force_failure ? 1 : 0, fileManager->written_count);
415 std::shared_ptr<TestLogFileManager> fileManager = std::make_shared<TestLogFileManager>();
417 batcher->setLogFileManager(fileManager);
418 batcher->setMaxAllowableBatchSize(max);
420 EXPECT_EQ(max, batcher->getMaxAllowableBatchSize());
422 for(
size_t i=1; i<=max; i++) {
423 std::string toBatch(
"test message " + std::to_string(i));
424 bool is_batched = cw_service->batchData(toBatch);
426 EXPECT_TRUE(is_batched);
427 EXPECT_EQ(0, test_publisher->getPublishAttempts());
428 EXPECT_EQ(i, batcher->getCurrentBatchSize());
429 EXPECT_EQ(PublisherState::UNKNOWN, test_publisher->getPublisherState());
432 std::string toBatch(
"iocaine powder");
433 bool is_batched = cw_service->batchData(toBatch);
435 EXPECT_FALSE(is_batched);
436 EXPECT_EQ(0u, batcher->getCurrentBatchSize());
437 EXPECT_EQ(PublisherState::UNKNOWN, test_publisher->getPublisherState());
438 EXPECT_FALSE(cw_service->isConnected());
439 EXPECT_EQ(0, test_publisher->getPublishSuccesses());
440 EXPECT_EQ(0, test_publisher->getPublishAttempts());
441 EXPECT_EQ(0.0f, test_publisher->getPublishSuccessPercentage());
443 fileManager->wait_for_millis(std::chrono::milliseconds(200));
446 EXPECT_EQ(1, fileManager->written_count);
447 EXPECT_EQ(max + 1, fileManager->last_data_size);
450 int main(
int argc,
char ** argv)
452 testing::InitGoogleTest(&argc, argv);
453 return RUN_ALL_TESTS();
void setForceInvalidDataFailure(bool nv)
FileObject< LogCollection > readBatch(size_t batch_size) override
std::shared_ptr< Aws::DataFlow::QueueMonitor< TaskPtr< LogCollection > > > queue_monitor
Aws::DataFlow::UploadStatus publishData(std::list< Aws::CloudWatchLogs::Model::InputLogEvent > &) override
Aws::DataFlow::UploadStatus getLastUploadStatus()
Aws::DataFlow::UploadStatus attemptPublish(std::list< Aws::CloudWatchLogs::Model::InputLogEvent > &data) override
Aws::DataFlow::UploadStatus last_upload_status
Aws::DataFlow::UploadStatus attemptPublish(T &data) override
int main(int argc, char **argv)
std::list< LogType > LogCollection
std::shared_ptr< LogService > cw_service
TEST_F(PipelineTest, Sanity)
std::condition_variable cv
bool force_invalid_data_failure
void setForceFailure(bool nv)
void write(const LogCollection &data) override
std::shared_ptr< LogBatcher > batcher
std::shared_ptr< TestPublisher > test_publisher
std::shared_ptr< TaskObservedQueue< LogCollection > > stream_data_queue