16 #include <aws/core/Aws.h> 17 #include <aws/core/client/ClientConfiguration.h> 18 #include <aws/core/utils/StringUtils.h> 19 #include <aws/core/utils/logging/AWSLogging.h> 20 #include <aws/core/utils/logging/LogMacros.h> 21 #include <aws/monitoring/CloudWatchClient.h> 22 #include <aws/monitoring/model/PutMetricDataRequest.h> 27 #include <ros_monitoring_msgs/MetricData.h> 28 #include <ros_monitoring_msgs/MetricDimension.h> 29 #include <ros_monitoring_msgs/MetricList.h> 30 #include <std_msgs/String.h> 39 #include <std_srvs/Trigger.h> 40 #include <std_srvs/Empty.h> 45 namespace CloudWatchMetrics {
49 std::map<std::string, std::string> & default_dimensions,
50 int storage_resolution,
52 const Aws::Client::ClientConfiguration & config,
53 const Aws::SDKOptions & sdk_options,
55 const std::shared_ptr<MetricServiceFactory>& metric_service_factory) {
73 [
this](
const ros_monitoring_msgs::MetricList::ConstPtr & metric_list_msg) ->
void {
81 const ros_monitoring_msgs::MetricList::ConstPtr & metric_list_msg)
83 int batched_count = 0;
84 AWS_LOGSTREAM_DEBUG(__func__,
"Received " << metric_list_msg->metrics.size() <<
" metrics");
86 for (
auto metric_msg = metric_list_msg->metrics.begin();
87 metric_msg != metric_list_msg->metrics.end(); ++metric_msg) {
89 std::map<std::string, std::string> dimensions;
92 dimensions.emplace(default_dimension.first, default_dimension.second);
95 for (
const auto & dimension : metric_msg->dimensions) {
96 dimensions.emplace(dimension.name, dimension.value);
99 AWS_LOGSTREAM_DEBUG(__func__,
"Recording metric with name=[" << metric_msg->metric_name <<
"]");
111 AWS_LOGSTREAM_ERROR(__func__,
"Failed to record metric");
116 return batched_count;
121 return metric_msg.time_stamp.toNSec() / 1000000;
126 AWS_LOG_DEBUG(__func__,
"Flushing metrics");
131 bool is_started =
true;
151 AWS_LOGSTREAM_DEBUG(__func__,
"received request " << request);
154 response.success =
false;
155 response.message =
"The MetricsCollector is not initialized";
160 response.message = response.success ?
"The MetricsCollector is connected" :
"The MetricsCollector is not connected";
std::string metric_namespace_
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
int RecordMetrics(const ros_monitoring_msgs::MetricList::ConstPtr &metric_list_msg)
std::vector< ros::Subscriber > subscriptions_
constexpr int kNodeSubQueueSize
void Initialize(std::string metric_namespace, std::map< std::string, std::string > &default_dimensions, int storage_resolution, const ros::NodeHandle &node_handle, const Aws::Client::ClientConfiguration &config, const Aws::SDKOptions &sdk_options, const Aws::CloudWatchMetrics::CloudWatchOptions &cloudwatch_options, const std::shared_ptr< MetricServiceFactory > &metric_service_factory=std::make_shared< MetricServiceFactory >())
std::map< std::string, std::string > default_dimensions_
std::atomic< int > storage_resolution_
void ReadTopics(std::vector< std::string > &topics)
ros::NodeHandle node_handle_
void SubscribeAllTopics()
static int64_t GetMetricDataEpochMillis(const ros_monitoring_msgs::MetricData &metric_msg)
void TriggerPublish(const ros::TimerEvent &)
bool checkIfOnline(std_srvs::Trigger::Request &request, std_srvs::Trigger::Response &response)
std::vector< std::string > topics_
std::shared_ptr< MetricService > metric_service_