.. _program_listing_file_include_rclcpp_topic_statistics_subscription_topic_statistics.hpp: Program Listing for File subscription_topic_statistics.hpp ========================================================== |exhale_lsh| :ref:`Return to documentation for file ` (``include/rclcpp/topic_statistics/subscription_topic_statistics.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp // Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #ifndef RCLCPP__TOPIC_STATISTICS__SUBSCRIPTION_TOPIC_STATISTICS_HPP_ #define RCLCPP__TOPIC_STATISTICS__SUBSCRIPTION_TOPIC_STATISTICS_HPP_ #include #include #include #include #include "libstatistics_collector/collector/generate_statistics_message.hpp" #include "libstatistics_collector/moving_average_statistics/types.hpp" #include "libstatistics_collector/topic_statistics_collector/constants.hpp" #include "libstatistics_collector/topic_statistics_collector/received_message_age.hpp" #include "libstatistics_collector/topic_statistics_collector/received_message_period.hpp" #include "rcl/time.h" #include "rclcpp/time.hpp" #include "rclcpp/publisher.hpp" #include "rclcpp/timer.hpp" #include "statistics_msgs/msg/metrics_message.hpp" namespace rclcpp { namespace topic_statistics { constexpr const char kDefaultPublishTopicName[]{"/statistics"}; constexpr const std::chrono::milliseconds kDefaultPublishingPeriod{std::chrono::seconds(1)}; using libstatistics_collector::collector::GenerateStatisticMessage; using statistics_msgs::msg::MetricsMessage; using libstatistics_collector::moving_average_statistics::StatisticData; template class SubscriptionTopicStatistics { using TopicStatsCollector = libstatistics_collector::topic_statistics_collector::TopicStatisticsCollector< CallbackMessageT>; using ReceivedMessageAge = libstatistics_collector::topic_statistics_collector::ReceivedMessageAgeCollector< CallbackMessageT>; using ReceivedMessagePeriod = libstatistics_collector::topic_statistics_collector::ReceivedMessagePeriodCollector< CallbackMessageT>; public: SubscriptionTopicStatistics( const std::string & node_name, rclcpp::Publisher::SharedPtr publisher) : node_name_(node_name), publisher_(std::move(publisher)) { // TODO(dbbonnie): ros-tooling/aws-roadmap/issues/226, received message age if (nullptr == publisher_) { throw std::invalid_argument("publisher pointer is nullptr"); } bring_up(); } virtual ~SubscriptionTopicStatistics() { tear_down(); } virtual void handle_message( const CallbackMessageT & received_message, const rclcpp::Time now_nanoseconds) const { std::lock_guard lock(mutex_); for (const auto & collector : subscriber_statistics_collectors_) { collector->OnMessageReceived(received_message, now_nanoseconds.nanoseconds()); } } void set_publisher_timer(rclcpp::TimerBase::SharedPtr publisher_timer) { publisher_timer_ = publisher_timer; } virtual void publish_message_and_reset_measurements() { std::vector msgs; rclcpp::Time window_end{get_current_nanoseconds_since_epoch()}; { std::lock_guard lock(mutex_); for (auto & collector : subscriber_statistics_collectors_) { const auto collected_stats = collector->GetStatisticsResults(); collector->ClearCurrentMeasurements(); auto message = libstatistics_collector::collector::GenerateStatisticMessage( node_name_, collector->GetMetricName(), collector->GetMetricUnit(), window_start_, window_end, collected_stats); msgs.push_back(message); } } for (auto & msg : msgs) { publisher_->publish(msg); } window_start_ = window_end; } protected: std::vector get_current_collector_data() const { std::vector data; std::lock_guard lock(mutex_); for (const auto & collector : subscriber_statistics_collectors_) { data.push_back(collector->GetStatisticsResults()); } return data; } private: void bring_up() { auto received_message_age = std::make_unique(); received_message_age->Start(); subscriber_statistics_collectors_.emplace_back(std::move(received_message_age)); auto received_message_period = std::make_unique(); received_message_period->Start(); { std::lock_guard lock(mutex_); subscriber_statistics_collectors_.emplace_back(std::move(received_message_period)); } window_start_ = rclcpp::Time(get_current_nanoseconds_since_epoch()); } void tear_down() { { std::lock_guard lock(mutex_); for (auto & collector : subscriber_statistics_collectors_) { collector->Stop(); } subscriber_statistics_collectors_.clear(); } if (publisher_timer_) { publisher_timer_->cancel(); publisher_timer_.reset(); } publisher_.reset(); } int64_t get_current_nanoseconds_since_epoch() const { const auto now = std::chrono::system_clock::now(); return std::chrono::duration_cast(now.time_since_epoch()).count(); } mutable std::mutex mutex_; std::vector> subscriber_statistics_collectors_{}; const std::string node_name_; rclcpp::Publisher::SharedPtr publisher_; rclcpp::TimerBase::SharedPtr publisher_timer_; rclcpp::Time window_start_; }; } // namespace topic_statistics } // namespace rclcpp #endif // RCLCPP__TOPIC_STATISTICS__SUBSCRIPTION_TOPIC_STATISTICS_HPP_