file_upload_streamer_test.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  * http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 
17 #include <tuple>
18 
19 #include <gtest/gtest.h>
20 #include <gmock/gmock.h>
21 
25 
26 using namespace Aws::FileManagement;
27 using namespace Aws::DataFlow;
28 
30  public DataReader<std::string>
31 {
32 public:
33  MOCK_METHOD0(start, bool());
34  MOCK_METHOD0(shutdown, bool());
35  MOCK_METHOD1(readBatch, FileObject<std::string>(size_t batch_size));
36  MOCK_METHOD2(fileUploadCompleteStatus,
37  void(const UploadStatus& upload_status, const FileObject<std::string> &log_messages));
38  MOCK_METHOD1(addStatusMonitor,
39  void(std::shared_ptr<StatusMonitor> monitor));
40  MOCK_METHOD0(isDataAvailableToRead, bool());
41 
42  void deleteStaleData() override {
43  return;
44  }
45 
51  inline void setStatusMonitor(std::shared_ptr<StatusMonitor> status_monitor) override {
52  status_monitor_ = status_monitor;
53  }
54 
58  std::shared_ptr<StatusMonitor> status_monitor_;
59 };
60 
61 using SharedFileUploadTask = std::shared_ptr<Task<std::string>>;
62 
63 class MockSink :
64 public Sink<SharedFileUploadTask>
65 {
66 public:
67  MOCK_METHOD1(enqueue_rvr, bool (SharedFileUploadTask data));
68  MOCK_METHOD1(enqueue, bool (SharedFileUploadTask& data));
69  MOCK_METHOD2(tryEnqueue,
70  bool (SharedFileUploadTask& data,
71  const std::chrono::microseconds &duration));
72  inline bool enqueue(SharedFileUploadTask&& value) override {
73  return enqueue_rvr(value);
74  }
75  inline bool tryEnqueue(
76  SharedFileUploadTask&& value,
77  const std::chrono::microseconds &duration) override
78  {
79  return tryEnqueue(value, duration);
80  }
81  MOCK_METHOD0(clear, void (void));
82 };
83 
85 
86 class FileStreamerTest : public ::testing::Test {
87 public:
88  void SetUp() override
89  {
90  file_manager = std::make_shared<::testing::StrictMock<MockDataReader>>();
91  file_upload_streamer = createFileUploadStreamer<std::string>(file_manager, kFileManagerOptions);
92  mock_sink = std::make_shared<::testing::StrictMock<MockSink>>();
93  }
94 
95  void TearDown() override
96  {
97  file_manager.reset();
98  file_upload_streamer.reset();
99  mock_sink.reset();
100  }
101 
102 protected:
103  std::shared_ptr<::testing::StrictMock<MockDataReader>> file_manager;
104  std::shared_ptr<FileUploadStreamer<std::string>> file_upload_streamer;
105  std::shared_ptr<MockSink> mock_sink;
106 };
107 
108 
109 TEST_F(FileStreamerTest, success_on_network_and_file) {
110  // Create the pipeline
111  file_upload_streamer->setSink(mock_sink);
112 
113  // Set the file and network available
114  file_manager->status_monitor_->setStatus(AVAILABLE);
115  file_upload_streamer->onPublisherStateChange(AVAILABLE);
116 
117  FileObject<std::string> test_file_object;
118  test_file_object.batch_data = "data";
119  test_file_object.batch_size = 1;
120  EXPECT_CALL(*mock_sink, tryEnqueue(testing::_, testing::_))
121  .WillOnce(testing::Return(true));
122  EXPECT_CALL(*file_manager, readBatch(testing::Eq(50u)))
123  .WillOnce(testing::Return(test_file_object));
124  // Expect a batch call and enqueue from the file upload streamer
125  file_upload_streamer->forceWork();
126 }
127 
128 TEST_F(FileStreamerTest, fail_enqueue_retry) {
129  // Create the pipeline
130  file_upload_streamer->setSink(mock_sink);
131 
132  // Set the file and network available
133  file_manager->status_monitor_->setStatus(AVAILABLE);
134  file_upload_streamer->onPublisherStateChange(AVAILABLE);
135 
136  FileObject<std::string> test_file_object;
137  test_file_object.batch_data = "data";
138  test_file_object.batch_size = 1;
140  // TODO(unknown): capture and test equivalence
141  EXPECT_CALL(*mock_sink, tryEnqueue(testing::_, testing::_))
142  .WillOnce(testing::Invoke([&task](SharedFileUploadTask& data,
143  const std::chrono::microseconds&){
144  task = data;
145  return false;
146  }))
147  .WillOnce(testing::Return(true));
148  EXPECT_CALL(*file_manager, readBatch(testing::Eq(50u)))
149  .WillOnce(testing::Return(test_file_object));
150  // Expect a batch call and enqueue from the file upload streamer
151  file_upload_streamer->forceWork();
152  file_upload_streamer->forceWork();
153 }
154 
155 TEST_F(FileStreamerTest, fail_task_clears_queue) {
156  // Create the pipeline
157  file_upload_streamer->setSink(mock_sink);
158 
159  // Set the file and network available
160  file_manager->status_monitor_->setStatus(AVAILABLE);
161  file_upload_streamer->onPublisherStateChange(AVAILABLE);
162 
163  FileObject<std::string> test_file_object;
164  test_file_object.batch_data = "data";
165  test_file_object.batch_size = 1;
167  // TODO(unknown): capture and test equivalence
168  EXPECT_CALL(*mock_sink, tryEnqueue(testing::_, testing::_))
169  .WillOnce(testing::Invoke([&task](SharedFileUploadTask& data,
170  const std::chrono::microseconds&){
171  task = data;
172  return true;
173  }));
174  EXPECT_CALL(*file_manager, readBatch(testing::Eq(50u)))
175  .WillOnce(testing::Return(test_file_object));
176  EXPECT_CALL(*mock_sink, clear());
177  EXPECT_CALL(*file_manager, fileUploadCompleteStatus(FAIL, testing::_));
178  // Expect a batch call and enqueue from the file upload streamer
179  file_upload_streamer->forceWork();
180  task->onComplete(FAIL);
181 }
182 
183 TEST_F(FileStreamerTest, success_task_does_not_clear_queue) {
184  // Create the pipeline
185  file_upload_streamer->setSink(mock_sink);
186 
187  // Set the file and network available
188  file_manager->status_monitor_->setStatus(AVAILABLE);
189  file_upload_streamer->onPublisherStateChange(AVAILABLE);
190 
191  FileObject<std::string> test_file_object;
192  test_file_object.batch_data = "data";
193  test_file_object.batch_size = 1;
195  EXPECT_CALL(*mock_sink, tryEnqueue(testing::_, testing::_))
196  .WillOnce(testing::Invoke([&task](SharedFileUploadTask& data,
197  const std::chrono::microseconds&){
198  task = data;
199  return true;
200  }));
201  EXPECT_CALL(*file_manager, readBatch(testing::Eq(50u)))
202  .WillOnce(testing::Return(test_file_object));
203  EXPECT_CALL(*file_manager, fileUploadCompleteStatus(SUCCESS, testing::_));
204  // Expect a batch call and enqueue from the file upload streamer
205  file_upload_streamer->forceWork();
206  task->onComplete(SUCCESS);
207 }
UploadStatus
std::shared_ptr< StatusMonitor > status_monitor_
AVAILABLE
void deleteStaleData() override
std::shared_ptr< Task< std::string >> SharedFileUploadTask
std::shared_ptr< MockSink > mock_sink
bool enqueue(SharedFileUploadTask &&value) override
TEST_F(FileStreamerTest, success_on_network_and_file)
FAIL
bool tryEnqueue(SharedFileUploadTask &&value, const std::chrono::microseconds &duration) override
void setStatusMonitor(std::shared_ptr< StatusMonitor > status_monitor) override
std::shared_ptr<::testing::StrictMock< MockDataReader > > file_manager
std::shared_ptr< FileUploadStreamer< std::string > > file_upload_streamer
SUCCESS
static const FileUploadStreamerOptions kFileManagerOptions


file_management
Author(s): AWS RoboMaker
autogenerated on Fri May 7 2021 02:18:23