Program Listing for File publication_manager.hpp
↰ Return to documentation for file (/tmp/ws/src/rosbag2/rosbag2_test_common/include/rosbag2_test_common/publication_manager.hpp
)
// Copyright 2018, Bosch Software Innovations GmbH.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef ROSBAG2_TEST_COMMON__PUBLICATION_MANAGER_HPP_
#define ROSBAG2_TEST_COMMON__PUBLICATION_MANAGER_HPP_
#include <cstring>
#include <functional>
#include <future>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "rclcpp/rclcpp.hpp" // rclcpp must be included before the Windows specific includes.
using namespace std::chrono_literals; // NOLINT
namespace rosbag2_test_common
{
class PublicationManager
{
public:
PublicationManager()
: pub_node_(std::make_shared<rclcpp::Node>(
"publication_manager_" + std::to_string(rclcpp::Clock().now().nanoseconds()),
rclcpp::NodeOptions().start_parameter_event_publisher(false).enable_rosout(false)))
{}
template<class MsgT>
void setup_publisher(
const std::string & topic_name,
const std::shared_ptr<MsgT> message,
const size_t repetition,
const rclcpp::QoS & qos = rclcpp::QoS{rclcpp::KeepAll()},
const std::chrono::milliseconds interval = 50ms)
{
return setup_publisher(topic_name, *message, repetition, qos, interval);
}
template<class MsgT>
void setup_publisher(
const std::string & topic_name,
const MsgT & message,
const size_t repetition,
const rclcpp::QoS & qos = rclcpp::QoS{rclcpp::KeepAll()},
const std::chrono::milliseconds interval = 50ms)
{
auto publisher = pub_node_->create_publisher<MsgT>(topic_name, qos);
auto publisher_fcn = [publisher, message, repetition, interval](bool verbose = false) {
for (auto i = 0u; i < repetition; ++i) {
if (rclcpp::ok()) {
publisher->publish(message);
if (verbose) {
RCLCPP_INFO(
rclcpp::get_logger("publication_manager"),
"publish on topic %s", publisher->get_topic_name());
}
std::this_thread::sleep_for(interval);
}
}
};
publishers_.push_back(std::make_pair(publisher, publisher_fcn));
}
void run_publishers(bool verbose = false) const
{
for (const auto & pub : publishers_) {
std::get<PublicationF>(pub)(verbose);
}
}
void run_publishers_async(bool verbose = false) const
{
std::vector<std::future<void>> futures;
for (const auto & pub : publishers_) {
futures.push_back(std::async(std::launch::async, std::get<PublicationF>(pub), verbose));
}
for (auto & publisher_future : futures) {
publisher_future.get();
}
}
template<typename DurationRepT = int64_t, typename DurationT = std::milli>
bool wait_for_matched(
const char * topic_name,
std::chrono::duration<DurationRepT, DurationT> timeout = std::chrono::seconds(10),
size_t n_subscribers_to_match = 1)
{
rclcpp::PublisherBase::SharedPtr publisher = nullptr;
for (const auto pub : publishers_) {
if (!std::strcmp(
std::get<rclcpp::PublisherBase::SharedPtr>(pub)->get_topic_name(),
topic_name))
{
publisher = std::get<rclcpp::PublisherBase::SharedPtr>(pub);
break;
}
}
if (publisher == nullptr) {
return false;
}
if (publisher->get_subscription_count() +
publisher->get_intra_process_subscription_count() >= n_subscribers_to_match)
{
return true;
}
auto sleep_time = std::chrono::milliseconds(10);
if (timeout < std::chrono::seconds(1)) {
sleep_time = timeout;
}
using clock = std::chrono::steady_clock;
auto start = clock::now();
do {
std::this_thread::sleep_for(sleep_time);
if (publisher->get_subscription_count() +
publisher->get_intra_process_subscription_count() >= n_subscribers_to_match)
{
return true;
}
} while ((clock::now() - start) < timeout);
return false;
}
private:
std::shared_ptr<rclcpp::Node> pub_node_;
using PublicationF = std::function<void (bool)>;
std::vector<std::pair<rclcpp::PublisherBase::SharedPtr, PublicationF>> publishers_;
};
} // namespace rosbag2_test_common
#endif // ROSBAG2_TEST_COMMON__PUBLICATION_MANAGER_HPP_