metrics_collector.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  * http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 #include <aws/core/Aws.h>
17 #include <aws/core/client/ClientConfiguration.h>
18 #include <aws/core/utils/StringUtils.h>
19 #include <aws/core/utils/logging/AWSLogging.h>
20 #include <aws/core/utils/logging/LogMacros.h>
21 #include <aws/monitoring/CloudWatchClient.h>
22 #include <aws/monitoring/model/PutMetricDataRequest.h>
26 #include <ros/ros.h>
27 #include <ros_monitoring_msgs/MetricData.h>
28 #include <ros_monitoring_msgs/MetricDimension.h>
29 #include <ros_monitoring_msgs/MetricList.h>
30 #include <std_msgs/String.h>
31 
35 #include <string>
36 #include <utility>
37 #include <vector>
38 #include <map>
39 #include <std_srvs/Trigger.h>
40 #include <std_srvs/Empty.h>
42 
43 
44 namespace Aws {
45 namespace CloudWatchMetrics {
46 namespace Utils {
47 
48 void MetricsCollector::Initialize(std::string metric_namespace,
49  std::map<std::string, std::string> & default_dimensions,
50  int storage_resolution,
51  const ros::NodeHandle& node_handle,
52  const Aws::Client::ClientConfiguration & config,
53  const Aws::SDKOptions & sdk_options,
54  const Aws::CloudWatchMetrics::CloudWatchOptions & cloudwatch_options,
55  const std::shared_ptr<MetricServiceFactory>& metric_service_factory) {
56 
57  this->metric_namespace_ = std::move(metric_namespace);
58  this->default_dimensions_ = default_dimensions;
59  this->storage_resolution_.store(storage_resolution);
60  this->node_handle_ = node_handle;
61  this->metric_service_ = metric_service_factory->createMetricService(this->metric_namespace_,
62  config,
63  sdk_options,
64  cloudwatch_options);
65 }
66 
68 {
70  for (auto & topic : topics_) {
71  ros::Subscriber sub = node_handle_.subscribe<ros_monitoring_msgs::MetricList>(
72  topic, kNodeSubQueueSize,
73  [this](const ros_monitoring_msgs::MetricList::ConstPtr & metric_list_msg) -> void {
74  this->RecordMetrics(metric_list_msg);
75  });
76  subscriptions_.push_back(sub);
77  }
78 }
79 
81  const ros_monitoring_msgs::MetricList::ConstPtr & metric_list_msg)
82 {
83  int batched_count = 0;
84  AWS_LOGSTREAM_DEBUG(__func__, "Received " << metric_list_msg->metrics.size() << " metrics");
85 
86  for (auto metric_msg = metric_list_msg->metrics.begin();
87  metric_msg != metric_list_msg->metrics.end(); ++metric_msg) {
88 
89  std::map<std::string, std::string> dimensions;
90 
91  for (auto & default_dimension : default_dimensions_) {
92  dimensions.emplace(default_dimension.first, default_dimension.second); // ignore the return, if we get a duplicate we're
93  // going to stick with the first one
94  }
95  for (const auto & dimension : metric_msg->dimensions) {
96  dimensions.emplace(dimension.name, dimension.value); // ignore the return, if we get a duplicate
97  // we're going to stick with the first one
98  }
99  AWS_LOGSTREAM_DEBUG(__func__, "Recording metric with name=[" << metric_msg->metric_name << "]");
100 
101  // create a MetricObject with message parameters to batch
102  Aws::CloudWatchMetrics::Utils::MetricObject metric_object {metric_msg->metric_name,
103  metric_msg->value,
104  metric_msg->unit,
105  GetMetricDataEpochMillis(*metric_msg),
106  dimensions,
107  this->storage_resolution_.load()};
108  bool batched = metric_service_->batchData(metric_object);
109 
110  if (!batched) {
111  AWS_LOGSTREAM_ERROR(__func__, "Failed to record metric");
112  }
113 
114  batched_count++;
115  }
116  return batched_count;
117 }
118 
119 int64_t MetricsCollector::GetMetricDataEpochMillis(const ros_monitoring_msgs::MetricData & metric_msg)
120 {
121  return metric_msg.time_stamp.toNSec() / 1000000;
122 }
123 
125 {
126  AWS_LOG_DEBUG(__func__, "Flushing metrics");
127  this->metric_service_->publishBatchedData();
128 }
129 
131  bool is_started = true;
132  this->SubscribeAllTopics();
133 
134  if (this->metric_service_) {
135  is_started &= this->metric_service_->start();
136  }
137  is_started &= Service::start();
138  return is_started;
139 }
140 
142  bool is_shutdown = Service::shutdown();
143  if (this->metric_service_) {
144  is_shutdown &= this->metric_service_->shutdown();
145  }
146  return is_shutdown;
147 }
148 
149 bool MetricsCollector::checkIfOnline(std_srvs::Trigger::Request& request, std_srvs::Trigger::Response& response) {
150 
151  AWS_LOGSTREAM_DEBUG(__func__, "received request " << request);
152 
153  if (!this->metric_service_) {
154  response.success = false;
155  response.message = "The MetricsCollector is not initialized";
156  return true;
157  }
158 
159  response.success = this->metric_service_->isConnected();
160  response.message = response.success ? "The MetricsCollector is connected" : "The MetricsCollector is not connected";
161 
162  return true;
163 }
164 
165 } // namespace Utils
166 } // namespace CloudWatchMetrics
167 } // namespace Aws
168 
169 
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
int RecordMetrics(const ros_monitoring_msgs::MetricList::ConstPtr &metric_list_msg)
void Initialize(std::string metric_namespace, std::map< std::string, std::string > &default_dimensions, int storage_resolution, const ros::NodeHandle &node_handle, const Aws::Client::ClientConfiguration &config, const Aws::SDKOptions &sdk_options, const Aws::CloudWatchMetrics::CloudWatchOptions &cloudwatch_options, const std::shared_ptr< MetricServiceFactory > &metric_service_factory=std::make_shared< MetricServiceFactory >())
std::map< std::string, std::string > default_dimensions_
void ReadTopics(std::vector< std::string > &topics)
virtual bool shutdown()
static int64_t GetMetricDataEpochMillis(const ros_monitoring_msgs::MetricData &metric_msg)
bool checkIfOnline(std_srvs::Trigger::Request &request, std_srvs::Trigger::Response &response)
virtual bool start()
std::shared_ptr< MetricService > metric_service_


cloudwatch_metrics_collector
Author(s): AWS RoboMaker
autogenerated on Fri Mar 5 2021 03:38:40