file_upload_streamer.h
Go to the documentation of this file.
1 /*
2  * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  * http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 #pragma once
17 
18 #include <thread>
19 #include <memory>
20 
21 #include <aws/core/utils/logging/LogMacros.h>
22 
29 
32 
33 namespace Aws {
34 namespace FileManagement {
35 
38 
39 static constexpr std::chrono::milliseconds kTimeout = std::chrono::minutes(5);
40 
42 
46  size_t batch_size;
47 
51  size_t queue_size;
52 };
53 
59 template<typename T>
61  public OutputStage<TaskPtr<T>>, public RunnableService {
62 public:
72  std::shared_ptr<DataReader<T>> data_reader,
74  {
75  data_reader_ = data_reader;
76  auto data_status_monitor = std::make_shared<StatusMonitor>();
77  addStatusMonitor(data_status_monitor);
78  network_monitor_ = std::make_shared<StatusMonitor>();
79  addStatusMonitor(network_monitor_);
80 
81  data_reader_->setStatusMonitor(data_status_monitor);
82  batch_size_ = options.batch_size;
83  status_monitor_timeout_ = kTimeout;
84  }
85 
86  ~FileUploadStreamer() override = default;
87 
93  inline void addStatusMonitor(std::shared_ptr<StatusMonitor> &status_monitor) {
94  status_condition_monitor_.addStatusMonitor(status_monitor);
95  }
96 
97  inline bool shutdown() override {
98  bool is_shutdown = true;
99  is_shutdown &= RunnableService::shutdown();
100  is_shutdown &= data_reader_->shutdown();
101  return is_shutdown;
102  }
103 
105  AWS_LOG_INFO(__func__,
106  "Publisher state has changed to: %s",
107  (status == Aws::DataFlow::Status::AVAILABLE) ? "available" : "unavailable");
108  network_monitor_->setStatus(status);
109  }
110 
111  bool initialize() {
112  return true;
113  }
114 
115  void onComplete(const Aws::DataFlow::UploadStatus & upload_status, const FileObject<T> &message) {
116  if (upload_status == Aws::DataFlow::UploadStatus::FAIL) {
117  OutputStage<TaskPtr<T>>::getSink()->clear();
118  }
119  data_reader_->fileUploadCompleteStatus(upload_status, message);
120  }
121 
125  bool start() override {
126  bool is_started = true;
127  is_started &= data_reader_->start();
128  is_started &= RunnableService::start();
129  return is_started;
130  }
131 
132  // todo this is a hack. Should just implement an extension in test
133  inline void forceWork() {
134  this->work();
135  }
136 
137  void setStatusMonitorTimeout(std::chrono::milliseconds new_timeout) {
138  status_monitor_timeout_ = new_timeout;
139  }
140 
141 protected:
142 
152  inline void work() override {
153  if (!stored_task_) {
154  AWS_LOG_DEBUG(__func__,
155  "Waiting for files and work.");
156  auto wait_result = status_condition_monitor_.waitForWork(status_monitor_timeout_);
157 
158  // is there data available?
159 
160  if (wait_result == std::cv_status::timeout) {
161 
162  if (!data_reader_->isDataAvailableToRead()) {
163  AWS_LOG_DEBUG(__func__, "Timed out when waiting for work, no data available to read");
164  return;
165  }
166  AWS_LOG_DEBUG(__func__, "Timed out when waiting for work, but data available to read: attempting to publish");
167  // otherwise attempt to publish as only the network is down but we have data to send
168  }
169 
170  if (!OutputStage<TaskPtr<T>>::getSink()) {
171  AWS_LOG_WARN(__func__,
172  "No Sink Configured");
173  return;
174  }
175  AWS_LOG_DEBUG(__func__,
176  "Found work, batching");
177  FileObject<T> file_object = data_reader_->readBatch(batch_size_);
178  total_logs_uploaded += file_object.batch_size; // todo this is attempted, not truly uploaded
179  stored_task_ = std::make_shared<FileUploadTask<T>>(
180  std::move(file_object),
181  std::bind(
183  this,
184  std::placeholders::_1,
185  std::placeholders::_2));
186  } else {
187  AWS_LOG_DEBUG(__func__,
188  "Previous task found, retrying upload.");
189  }
190  auto is_accepted = OutputStage<TaskPtr<T>>::getSink()->tryEnqueue(stored_task_, kTimeout);
191  if (is_accepted) {
192  AWS_LOG_DEBUG(__func__,
193  "Enqueue_accepted");
194  stored_task_ = nullptr;
195  } else {
196  AWS_LOG_DEBUG(__func__,
197  "Enqueue failed");
198  }
199  data_reader_->deleteStaleData();
200  }
201 
202 private:
207 
211  std::shared_ptr<FileUploadTask<T>> stored_task_;
212 
216  size_t total_logs_uploaded = 0;
217 
221  size_t batch_size_;
222 
226  std::shared_ptr<DataReader<T>> data_reader_;
227 
231  std::shared_ptr<StatusMonitor> network_monitor_;
232 
236  std::chrono::milliseconds status_monitor_timeout_;
237 };
238 
239 } // namespace FileManagement
240 } // namespace Aws
MultiStatusConditionMonitor status_condition_monitor_
void setStatusMonitorTimeout(std::chrono::milliseconds new_timeout)
std::shared_ptr< FileUploadTask< T > > stored_task_
void onPublisherStateChange(const Aws::DataFlow::Status &status)
std::shared_ptr< StatusMonitor > network_monitor_
std::shared_ptr< DataReader< T > > data_reader_
static constexpr std::chrono::milliseconds kTimeout
bool shutdown() override
void addStatusMonitor(std::shared_ptr< StatusMonitor > &status_monitor)
std::chrono::milliseconds status_monitor_timeout_
FileUploadStreamer(std::shared_ptr< DataReader< T >> data_reader, FileUploadStreamerOptions options)
std::shared_ptr< Aws::DataFlow::Task< T >> TaskPtr
void onComplete(const Aws::DataFlow::UploadStatus &upload_status, const FileObject< T > &message)
bool start() override


file_management
Author(s): AWS RoboMaker
autogenerated on Fri May 7 2021 02:18:23