.. _program_listing_file__tmp_ws_src_rosbag2_rosbag2_test_common_include_rosbag2_test_common_publication_manager.hpp: Program Listing for File publication_manager.hpp ================================================ |exhale_lsh| :ref:`Return to documentation for file ` (``/tmp/ws/src/rosbag2/rosbag2_test_common/include/rosbag2_test_common/publication_manager.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp // Copyright 2018, Bosch Software Innovations GmbH. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #ifndef ROSBAG2_TEST_COMMON__PUBLICATION_MANAGER_HPP_ #define ROSBAG2_TEST_COMMON__PUBLICATION_MANAGER_HPP_ #include #include #include #include #include #include #include #include "rclcpp/rclcpp.hpp" // rclcpp must be included before the Windows specific includes. using namespace std::chrono_literals; // NOLINT namespace rosbag2_test_common { class PublicationManager { public: PublicationManager() : pub_node_(std::make_shared( "publication_manager_" + std::to_string(rclcpp::Clock().now().nanoseconds()), rclcpp::NodeOptions().start_parameter_event_publisher(false).enable_rosout(false))) {} template void setup_publisher( const std::string & topic_name, const std::shared_ptr message, const size_t repetition, const rclcpp::QoS & qos = rclcpp::QoS{rclcpp::KeepAll()}, const std::chrono::milliseconds interval = 50ms) { return setup_publisher(topic_name, *message, repetition, qos, interval); } template void setup_publisher( const std::string & topic_name, const MsgT & message, const size_t repetition, const rclcpp::QoS & qos = rclcpp::QoS{rclcpp::KeepAll()}, const std::chrono::milliseconds interval = 50ms) { auto publisher = pub_node_->create_publisher(topic_name, qos); auto publisher_fcn = [publisher, message, repetition, interval](bool verbose = false) { for (auto i = 0u; i < repetition; ++i) { if (rclcpp::ok()) { publisher->publish(message); if (verbose) { RCLCPP_INFO( rclcpp::get_logger("publication_manager"), "publish on topic %s", publisher->get_topic_name()); } std::this_thread::sleep_for(interval); } } }; publishers_.push_back(std::make_pair(publisher, publisher_fcn)); } void run_publishers(bool verbose = false) const { for (const auto & pub : publishers_) { std::get(pub)(verbose); } } void run_publishers_async(bool verbose = false) const { std::vector> futures; for (const auto & pub : publishers_) { futures.push_back(std::async(std::launch::async, std::get(pub), verbose)); } for (auto & publisher_future : futures) { publisher_future.get(); } } template bool wait_for_matched( const char * topic_name, std::chrono::duration timeout = std::chrono::seconds(10), size_t n_subscribers_to_match = 1) { rclcpp::PublisherBase::SharedPtr publisher = nullptr; for (const auto pub : publishers_) { if (!std::strcmp( std::get(pub)->get_topic_name(), topic_name)) { publisher = std::get(pub); break; } } if (publisher == nullptr) { return false; } if (publisher->get_subscription_count() + publisher->get_intra_process_subscription_count() >= n_subscribers_to_match) { return true; } auto sleep_time = std::chrono::milliseconds(10); if (timeout < std::chrono::seconds(1)) { sleep_time = timeout; } using clock = std::chrono::steady_clock; auto start = clock::now(); do { std::this_thread::sleep_for(sleep_time); if (publisher->get_subscription_count() + publisher->get_intra_process_subscription_count() >= n_subscribers_to_match) { return true; } } while ((clock::now() - start) < timeout); return false; } private: std::shared_ptr pub_node_; using PublicationF = std::function; std::vector> publishers_; }; } // namespace rosbag2_test_common #endif // ROSBAG2_TEST_COMMON__PUBLICATION_MANAGER_HPP_