log_publisher.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  * http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 #include <functional>
17 #include <aws/core/Aws.h>
18 #include <aws/core/utils/logging/LogMacros.h>
19 #include <aws/logs/CloudWatchLogsClient.h>
20 #include <aws/logs/model/PutLogEventsRequest.h>
23 
26 
29 
30 #include <list>
31 #include <memory>
32 #include <fstream>
33 #include <utility>
34 
35 
36 namespace Aws {
37 namespace CloudWatchLogs {
38 
39 constexpr int kMaxRetries = 1; // todo this should probably be configurable, maybe part of the generic publisher interface
40 
42  const std::string & log_group,
43  const std::string & log_stream,
44  const Aws::Client::ClientConfiguration & client_config)
47 {
48  this->client_config_ = client_config;
49  this->log_group_ = log_group;
50  this->log_stream_ = log_stream;
51  this->cloudwatch_facade_ = nullptr;
52 }
53 
55  const std::string & log_group,
56  const std::string & log_stream,
57  std::shared_ptr<Aws::CloudWatchLogs::Utils::CloudWatchLogsFacade> cloudwatch_facade)
59 {
60  this->cloudwatch_facade_ = std::move(cloudwatch_facade);
61  this->log_group_ = log_group;
62  this->log_stream_ = log_stream;
63 }
64 
65 LogPublisher::~LogPublisher() = default;
66 
68  if (CW_LOGS_NOT_CONNECTED == error) {
69  return false;
70  }
71  return true;
72 }
73 
78 {
79  auto status =
80  this->cloudwatch_facade_->CheckLogGroupExists(this->log_group_);
81 
82  if (!checkIfConnected(status)) {
83  return false;
84  }
85 
86  AWS_LOGSTREAM_DEBUG(__func__, "CheckLogGroupExists code:" << status);
87 
88  if (CW_LOGS_SUCCEEDED == status) {
89 
91  AWS_LOGSTREAM_DEBUG(__func__, "Found existing log group: " << log_group_);
92  return true;
93  }
94 
95  status = this->cloudwatch_facade_->CreateLogGroup(this->log_group_);
96 
97  if (!checkIfConnected(status)) {
98  return false;
99  }
100 
101  if (CW_LOGS_SUCCEEDED == status) {
102 
104  AWS_LOGSTREAM_DEBUG(__func__, "Successfully created log group.");
105  return true;
106 
107  } else if (CW_LOGS_LOG_GROUP_ALREADY_EXISTS == status) {
108 
110  AWS_LOGSTREAM_INFO(__func__, "Log group already exists.");
111  return true;
112 
113  } else {
114 
115  AWS_LOGSTREAM_ERROR(__func__, "Failed to create log group, status: "
116  << status);
117  return false;
118  }
119 }
120 
125 {
127  this->cloudwatch_facade_->CheckLogStreamExists(this->log_group_, this->log_stream_, nullptr);
128  if (!checkIfConnected(status)) {
129  return false;
130  }
131 
132  if (CW_LOGS_SUCCEEDED == status) {
134  AWS_LOGSTREAM_DEBUG(__func__, "Found existing log stream: " << this->log_stream_);
135  return true;
136  }
137 
138  status = this->cloudwatch_facade_->CreateLogStream(this->log_group_, this->log_stream_);
139  if (!checkIfConnected(status)) {
140  return false;
141  }
142 
143  if (CW_LOGS_SUCCEEDED == status) {
145  AWS_LOG_DEBUG(__func__, "Successfully created log stream.");
146  return true;
147  } else if (CW_LOGS_LOG_STREAM_ALREADY_EXISTS == status) {
149  AWS_LOG_DEBUG(__func__, "Log stream already exists");
150  return true;
151 
152  } else {
153  AWS_LOGSTREAM_ERROR(__func__, "Failed to create log stream, status: "
154  << status);
155  return false;
156  }
157 }
158 
163 {
165  this->cloudwatch_facade_->GetLogStreamToken(this->log_group_, this->log_stream_,
166  next_token);
167  if (!checkIfConnected(status)) {
168  // don't reset token, could still be valid
169  return false;
170  }
171 
172  if (CW_LOGS_SUCCEEDED == status) {
173  AWS_LOG_DEBUG(__func__, "Get Token succeeded");
174  return true;
175  } else {
176 
177  AWS_LOGSTREAM_ERROR(__func__, "Unable to obtain the sequence token to use, status: "
178  << status);
179  resetInitToken(); // reset token given error
180  return false;
181  }
182 }
183 
185 
186  AWS_LOG_DEBUG(__func__,
187  "Attempting to use logs of size %i", data.size());
188 
190  if (!data.empty()) {
191  int tries = kMaxRetries;
192  while (CW_LOGS_SUCCEEDED != send_logs_status && tries > 0) {
193 
194  AWS_LOG_INFO(__func__, "Sending logs to CW");
195 
196  if (!std::ifstream("/tmp/internet").good()) {
197  send_logs_status = this->cloudwatch_facade_->SendLogsToCloudWatch(
198  next_token, this->log_group_, this->log_stream_, data);
199  AWS_LOG_DEBUG(__func__, "SendLogs status=%d", send_logs_status);
200  }
201 
202  if (CW_LOGS_SUCCEEDED != send_logs_status) {
203  AWS_LOG_WARN(__func__, "Unable to send logs to CloudWatch, retrying ...");
205  this->cloudwatch_facade_->GetLogStreamToken(this->log_group_, this->log_stream_,
206  next_token);
207  if (CW_LOGS_SUCCEEDED != get_token_status) {
208  AWS_LOG_WARN(__func__,
209  "Unable to obtain the sequence token to use");
210  break;
211  }
212  }
213  tries--;
214  }
215  if (CW_LOGS_SUCCEEDED != send_logs_status) {
216  AWS_LOG_WARN(
217  __func__,
218  "Unable to send logs to CloudWatch");
219  }
220  } else {
221  AWS_LOG_DEBUG(__func__,
222  "Unable to obtain the sequence token to use");
223  }
224 
225  checkIfConnected(send_logs_status); // mark offline if needed
226  return send_logs_status;
227 }
228 
230 {
232 }
234  return this->run_state_.getValue();
235 }
236 
238 
240  // attempt to create group
241  if (!CreateGroup()) {
242  AWS_LOG_WARN(__func__, "CreateGroup FAILED");
243  return false;
244  }
245  AWS_LOG_DEBUG(__func__, "CreateGroup succeeded");
246  }
247 
249  // attempt to create stream
250  if (!CreateStream()) {
251  AWS_LOG_WARN(__func__, "CreateStream FAILED");
252  return false;
253  }
254  AWS_LOG_DEBUG(__func__, "CreateGroup succeeded");
255  }
256 
258 
259  // init and check if we have a valid token
260  bool token_success = InitToken(next_token);
261 
262  if(!token_success || next_token == Aws::CloudWatchLogs::UNINITIALIZED_TOKEN) {
263  AWS_LOG_WARN(__func__, "INIT TOKEN FAILED");
264  return false;
265  }
266  AWS_LOG_DEBUG(__func__, "INIT TOKEN succeeded");
267  }
268 
269  return true;
270 }
271 
273 {
274 
275  // if no data don't attempt to configure or publish
276  if (data.empty()) {
277  AWS_LOG_DEBUG(__func__, "no data to publish");
279  }
280 
281  // attempt to configure (if needed, based on run_state_)
282  if (!configure()) {
283  return Aws::DataFlow::FAIL;
284  }
285 
286  AWS_LOG_DEBUG(__func__, "attempting to SendLogFiles");
287 
288  // all config succeeded: attempt to publish
290  auto status = SendLogs(next_token, data);
291 
292  // if failed attempt to get the token next time
293  // otherwise everything succeeded to attempt to send logs again
295  AWS_LOG_DEBUG(__func__, "finished SendLogs");
296 
297  switch(status) {
298 
299  case CW_LOGS_SUCCEEDED:
300  return Aws::DataFlow::SUCCESS;
303  default:
304  AWS_LOG_WARN(__func__, "error finishing SendLogs %d", status);
305  return Aws::DataFlow::FAIL;
306  }
307 }
308 
310 
311  if (!this->cloudwatch_facade_) {
312  this->cloudwatch_facade_ = std::make_shared<Aws::CloudWatchLogs::Utils::CloudWatchLogsFacade>(this->client_config_);
313  }
314 
315  return Service::start();
316 }
317 
319  bool is_shutdown = Publisher::shutdown();
320  resetInitToken();
321  Aws::ShutdownAPI(this->options_);
322  return is_shutdown;
323 }
324 
325 } // namespace CloudWatchLogs
326 } // namespace Aws
LogPublisherRunState getRunState()
bool checkIfConnected(Aws::CloudWatchLogs::ROSCloudWatchLogsErrors error)
Aws::DataFlow::UploadStatus publishData(std::list< Aws::CloudWatchLogs::Model::InputLogEvent > &data) override
bool shutdown() override
Aws::CloudWatchLogs::ROSCloudWatchLogsErrors SendLogs(Aws::String &next_token, std::list< Aws::CloudWatchLogs::Model::InputLogEvent > &data)
ObservableObject< LogPublisherRunState > run_state_
std::list< LogType > LogCollection
Definition: definitions.h:29
constexpr int kMaxRetries
virtual void setValue(const T &v)
bool InitToken(Aws::String &next_token)
Aws::Client::ClientConfiguration client_config_
Contains Error handling functionality for ROS AWS CloudWatch Logs libraries.
static const Aws::String UNINITIALIZED_TOKEN
Definition: log_publisher.h:49
std::shared_ptr< Aws::CloudWatchLogs::Utils::CloudWatchLogsFacade > cloudwatch_facade_
LogPublisher(const std::string &log_group, const std::string &log_stream, const Aws::Client::ClientConfiguration &client_config)
Creates a LogPublisher object that uses the provided client and SDK configuration Constructs a LogPub...
virtual T getValue()
LogPublisherRunState
Defines the different runtime states for the Publisher This enum is used by the LogPublisher to track...
Definition: log_publisher.h:42
~LogPublisher() override
Tears down the LogPublisher object.
virtual bool start()
ROSCloudWatchLogsErrors
Defines error return codes for functions This enum defines standard error codes that will be returned...


cloudwatch_logs_common
Author(s): AWS RoboMaker
autogenerated on Fri May 7 2021 02:18:24