cloudwatch_service.h
Go to the documentation of this file.
1 /*
2  * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  * http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 #pragma once
17 
18 #include <aws/core/Aws.h>
19 
20 #include <file_management/file_upload/file_upload_streamer.h>
21 #include <file_management/file_upload/file_manager.h>
22 
26 
27 #include <chrono>
28 #include <list>
29 #include <stdexcept>
30 #include <string>
31 
32 namespace Aws {
33 namespace CloudWatch {
34 
35 using namespace Aws::FileManagement;
36 
37 static const std::chrono::milliseconds kDequeueDuration = std::chrono::milliseconds(100);
38 
47 template<typename D, typename T>
48 class CloudWatchService : public Aws::DataFlow::InputStage<TaskPtr<std::list<T>>>, public RunnableService {
49 public:
56 CloudWatchService(std::shared_ptr<Publisher<std::list<T>>> publisher,
57  std::shared_ptr<DataBatcher<T>> batcher) : RunnableService() {
58 
59  if (nullptr == publisher) {
60  throw std::invalid_argument("Invalid argument: publisher cannot be null");
61  }
62 
63  if (nullptr == batcher) {
64  throw std::invalid_argument("Invalid argument: batcher cannot be null");
65  }
66 
67  this->publisher_ = publisher;
68  this->batcher_ = batcher;
69  this->file_upload_streamer_ = nullptr;
70  this->dequeue_duration_ = kDequeueDuration;
71  this->number_dequeued_.store(0);
72 }
73 
74 ~CloudWatchService() = default;
75 
81 virtual bool start() override {
82  bool started = true;
83 
84  started &= publisher_->start();
85  started &= batcher_->start();
86 
87  if (file_upload_streamer_) {
88  started &= file_upload_streamer_->start();
89  }
90 
91  //start the thread to dequeue
92  started &= RunnableService::start();
93 
94  return started;
95 }
96 
103 virtual inline bool shutdown() {
104 
105  // stop the work thread immediately, don't hand any more tasks to the publisher
106  bool is_shutdown = RunnableService::shutdown();
107 
108  is_shutdown &= publisher_->shutdown();
109  is_shutdown &= batcher_->shutdown();
110 
111  if (file_upload_streamer_) {
112  is_shutdown &= file_upload_streamer_->shutdown();
113  // wait for file_upload_streamer_ (RunnableService) shutdown to complete
114  file_upload_streamer_->join();
115  }
116 
117  // wait for RunnableService shutdown to complete
118  this->join();
119 
120  return is_shutdown;
121 }
122 
129 virtual inline bool batchData(const D &data_to_batch) {
130 
131  T t = convertInputToBatched(data_to_batch);
132  return batcher_->batchData(t);
133 }
134 
142 virtual inline bool batchData(const D &data_to_batch, const std::chrono::milliseconds &milliseconds) {
143 
144  // convert
145  T t = convertInputToBatched(data_to_batch, milliseconds);
146  return batcher_->batchData(t);
147 }
148 
155 virtual inline bool publishBatchedData() {
156  if (batcher_) {
157  return batcher_->publishBatchedData();
158  }
159  return false;
160 }
161 
167 virtual inline std::chrono::milliseconds getDequeueDuration() {
168  return this->dequeue_duration_;
169 }
170 
171 virtual inline bool setDequeueDuration(std::chrono::milliseconds new_value) {
172  bool is_set = false;
173  if (new_value.count() >= 0) {
174  this->dequeue_duration_ = new_value;
175  is_set = true;
176  }
177  return is_set;
178 }
179 
186  return this->number_dequeued_.load();
187 }
188 
194 virtual bool isConnected() {
195  return this->publisher_->getPublisherState() == PublisherState::CONNECTED;
196 }
197 
198 
204 virtual std::chrono::milliseconds getCurrentTimestamp() {
205  return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch());
206 }
207 
208 protected:
209 
210 virtual T convertInputToBatched(const D &input, const std::chrono::milliseconds &milliseconds) = 0;
211 virtual T convertInputToBatched(const D &input) = 0;
212 
213 
217 void work() {
218 
219  TaskPtr<std::list<T>> task_to_publish;
220  bool is_dequeued = Aws::DataFlow::InputStage<TaskPtr<std::list<T>>>::getSource()
221  ->dequeue(task_to_publish, dequeue_duration_);
222 
223  if (is_dequeued) {
224 
225  if (task_to_publish) {
226  this->number_dequeued_++;
227  AWS_LOGSTREAM_DEBUG(__func__, "Number of tasks dequeued = " << this->number_dequeued_++);
228 
230  // publish mechanism via the TaskFactory
231  task_to_publish->run(publisher_);
232  } else {
233  // unable to publish, fast fail and cancel the current task
234  task_to_publish->cancel();
235  }
236  }
237  }
238 }
239 
240 std::shared_ptr<FileUploadStreamer<std::list<T>>> file_upload_streamer_;
241 std::shared_ptr<Publisher<std::list<T>>> publisher_;
242 std::shared_ptr<DataBatcher<T>> batcher_;
243 
247 std::chrono::milliseconds dequeue_duration_;
248 
249 private:
253 std::atomic<int> number_dequeued_;
254 };
255 
256 } // namespace Cloudwatch
257 } // namespace AWS
258 
virtual std::chrono::milliseconds getCurrentTimestamp()
std::shared_ptr< DataBatcher< T > > batcher_
virtual bool batchData(const D &data_to_batch, const std::chrono::milliseconds &milliseconds)
virtual bool batchData(const D &data_to_batch)
virtual std::chrono::milliseconds getDequeueDuration()
CloudWatchService(std::shared_ptr< Publisher< std::list< T >>> publisher, std::shared_ptr< DataBatcher< T >> batcher)
bool shutdown() override
Definition: service.h:150
std::shared_ptr< FileUploadStreamer< std::list< T > > > file_upload_streamer_
static const std::chrono::milliseconds kDequeueDuration
virtual bool setDequeueDuration(std::chrono::milliseconds new_value)
std::shared_ptr< Publisher< std::list< T > > > publisher_
std::chrono::milliseconds dequeue_duration_
ServiceState getState()
Definition: service.h:101
bool start() override
Definition: service.h:140


dataflow_lite
Author(s): AWS RoboMaker
autogenerated on Fri May 7 2021 02:18:22