.. _program_listing_file__tmp_ws_src_rosbag2_rosbag2_test_common_include_rosbag2_test_common_subscription_manager.hpp: Program Listing for File subscription_manager.hpp ================================================= |exhale_lsh| :ref:`Return to documentation for file ` (``/tmp/ws/src/rosbag2/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp // Copyright 2018, Bosch Software Innovations GmbH. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #ifndef ROSBAG2_TEST_COMMON__SUBSCRIPTION_MANAGER_HPP_ #define ROSBAG2_TEST_COMMON__SUBSCRIPTION_MANAGER_HPP_ #include #include #include #include #include #include "rclcpp/rclcpp.hpp" // rclcpp must be included before the Windows specific includes. #include "rclcpp/serialization.hpp" namespace rosbag2_test_common { class SubscriptionManager { public: SubscriptionManager() { subscriber_node_ = std::make_shared( "subscription_manager_" + std::to_string(rclcpp::Clock().now().nanoseconds()), rclcpp::NodeOptions().start_parameter_event_publisher(false).enable_rosout(false) ); } template void add_subscription( const std::string & topic_name, size_t expected_number_of_messages, const rclcpp::QoS & qos = rclcpp::QoS{rmw_qos_profile_default.depth}) { expected_topics_with_size_[topic_name] = expected_number_of_messages; auto options = rclcpp::SubscriptionOptions(); options.use_intra_process_comm = rclcpp::IntraProcessSetting::Disable; subscriptions_.push_back( subscriber_node_->create_subscription( topic_name, qos, [this, topic_name](std::shared_ptr msg) { subscribed_messages_[topic_name].push_back(msg); }, options)); } template std::vector> get_received_messages(const std::string & topic_name) { static rclcpp::Serialization serializer; auto messages = subscribed_messages_[topic_name]; auto received_messages_on_topic = std::vector>(); for (const auto & serialized_msg : messages) { auto msg = std::make_shared(); serializer.deserialize_message(serialized_msg.get(), msg.get()); received_messages_on_topic.push_back(msg); } return received_messages_on_topic; } template bool spin_and_wait_for_matched( const std::vector & publishers, Timeout timeout, size_t n_subscribers_to_match = 1) { // Sanity check that we have valid input for (const auto publisher_ptr : publishers) { if (publisher_ptr == nullptr) { throw std::invalid_argument("Null pointer in publisher list"); } std::string topic_name{publisher_ptr->get_topic_name()}; if (expected_topics_with_size_.find(topic_name) == expected_topics_with_size_.end()) { throw std::invalid_argument( "Publisher's topic name = `" + topic_name + "` not found in expected topics list"); } } using clock = std::chrono::steady_clock; auto start = clock::now(); rclcpp::executors::SingleThreadedExecutor exec; bool matched = false; while (!matched && ((clock::now() - start) < timeout)) { exec.spin_node_some(subscriber_node_); matched = true; for (const auto publisher_ptr : publishers) { if (publisher_ptr->get_subscription_count() + publisher_ptr->get_intra_process_subscription_count() < n_subscribers_to_match) { matched = false; break; } } } return matched; } std::future spin_subscriptions() { return async( std::launch::async, [this]() { rclcpp::executors::SingleThreadedExecutor exec; auto start = std::chrono::high_resolution_clock::now(); while (rclcpp::ok() && continue_spinning(expected_topics_with_size_, start)) { exec.spin_node_some(subscriber_node_); } }); } void spin_subscriptions_sync() { rclcpp::executors::SingleThreadedExecutor exec; auto start = std::chrono::high_resolution_clock::now(); while (rclcpp::ok() && continue_spinning(expected_topics_with_size_, start)) { exec.spin_node_some(subscriber_node_); } } private: bool continue_spinning( const std::unordered_map & expected_topics_with_sizes, std::chrono::time_point start_time) { auto current = std::chrono::high_resolution_clock::now(); if (current - start_time > std::chrono::seconds(10)) { return false; } for (const auto & topic_expected : expected_topics_with_sizes) { if (subscribed_messages_[topic_expected.first].size() < topic_expected.second) { return true; } } return false; } std::vector subscriptions_; std::unordered_map>> subscribed_messages_; std::unordered_map expected_topics_with_size_; rclcpp::Node::SharedPtr subscriber_node_; }; } // namespace rosbag2_test_common #endif // ROSBAG2_TEST_COMMON__SUBSCRIPTION_MANAGER_HPP_