publisher.h
Go to the documentation of this file.
1 /*
2  * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  * http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 #pragma once
17 
18 #include <atomic>
19 #include <chrono>
20 #include <list>
21 #include <functional>
22 #include <iostream>
23 #include <mutex>
24 
28 
33  UNKNOWN, // constructed
34  CONNECTED, //configured, ready to receive data
35  NOT_CONNECTED, //unable to send data
36 };
37 
45 template <typename T>
46 class Publisher : public Aws::DataFlow::IPublisher<T>, public Service
47 {
48 
49 public:
50 
52  publish_successes_.store(0);
53  publish_attempts_.store(0);
54  last_publish_duration_.store(std::chrono::milliseconds(0));
55  }
56 
57  ~Publisher() override = default;
58 
65  return publisher_state_.getValue();
66  }
67 
76 
77  // don't attempt to publish if not in the started state
78  if(ServiceState::STARTED != this->getState()) {
80  }
81 
83  auto published_status = Aws::DataFlow::UploadStatus::FAIL;
84 
85  // acquire lock to publish
86  std::lock_guard<std::mutex> lck (publish_mutex_);
87  {
88  auto start = std::chrono::high_resolution_clock::now();
89  published_status = publishData(data); // always at least try
90  last_publish_duration_.store(std::chrono::duration_cast<std::chrono::milliseconds>(
91  std::chrono::high_resolution_clock::now() - start));
92  }
93 
94  if (Aws::DataFlow::UploadStatus::SUCCESS == published_status) {
97  } else {
99  }
100 
101  return published_status;
102  }
103 
108  bool shutdown() override {
109  bool b = Service::shutdown(); // set shutdown state to try and fast fail any publish calls
110 
111  std::lock_guard<std::mutex> lck (publish_mutex_);
112  //acquire the lock to ensure attemptPublish has finished
114  return b;
115  }
116 
121  bool canPublish() {
122  auto current_state = publisher_state_.getValue();
123  return (current_state == UNKNOWN || current_state == CONNECTED) && ServiceState::STARTED == this->getState();
124  }
125 
131  return publish_successes_.load();
132  }
133 
139  return publish_attempts_.load();
140  }
141 
147  std::chrono::milliseconds getLastPublishDuration() {
148  return last_publish_duration_.load();
149  }
150 
158  int attempts = publish_attempts_.load();
159  if (attempts == 0) {
160  return 0;
161  }
162  int successes = publish_successes_.load();
163  return static_cast<float>(successes) / static_cast<float>(attempts) * 100.0f;
164  }
165 
172  virtual void addPublisherStateListener(const std::function<void(const PublisherState&)> & listener) {
173  publisher_state_.addListener(listener);
174  }
175 
176 protected:
177 
184  virtual Aws::DataFlow::UploadStatus publishData(T &data) = 0;
185 
186 private:
194  std::atomic<int> publish_successes_{};
198  std::atomic<int> publish_attempts_{};
202  std::atomic<std::chrono::milliseconds> last_publish_duration_{};
206  mutable std::mutex publish_mutex_;
207 
208 };
int getPublishSuccesses()
Definition: publisher.h:130
UploadStatus
Definition: task.h:33
virtual bool addListener(const std::function< void(const T &)> &listener)
ObservableObject< PublisherState > publisher_state_
Definition: publisher.h:190
std::atomic< int > publish_attempts_
Definition: publisher.h:198
bool shutdown() override
Definition: publisher.h:108
Aws::DataFlow::UploadStatus attemptPublish(T &data) override
Definition: publisher.h:75
bool canPublish()
Definition: publisher.h:121
virtual void setValue(const T &v)
std::atomic< int > publish_successes_
Definition: publisher.h:194
std::mutex publish_mutex_
Definition: publisher.h:206
Publisher()
Definition: publisher.h:51
virtual void addPublisherStateListener(const std::function< void(const PublisherState &)> &listener)
Definition: publisher.h:172
std::chrono::milliseconds getLastPublishDuration()
Definition: publisher.h:147
virtual bool shutdown()
Definition: service.h:83
std::atomic< std::chrono::milliseconds > last_publish_duration_
Definition: publisher.h:202
float getPublishSuccessPercentage()
Definition: publisher.h:157
PublisherState
Definition: publisher.h:32
virtual T getValue()
virtual Aws::DataFlow::UploadStatus publishData(T &data)=0
~Publisher() override=default
PublisherState getPublisherState()
Definition: publisher.h:64
ServiceState getState()
Definition: service.h:101
virtual bool start()
Definition: service.h:64
int getPublishAttempts()
Definition: publisher.h:138


dataflow_lite
Author(s): AWS RoboMaker
autogenerated on Fri May 7 2021 02:18:22