Program Listing for File message_producer.hpp
↰ Return to documentation for file (include/msg_utils/message_producer.hpp
)
// Copyright 2022 Apex.AI, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef MSG_UTILS__MESSAGE_PRODUCER_HPP_
#define MSG_UTILS__MESSAGE_PRODUCER_HPP_
#include <memory>
#include <string>
#include <stdexcept>
#include "helpers.hpp"
#include "rclcpp/node.hpp"
#include "rosbag2_performance_benchmarking/publisher_group_config.hpp"
namespace msg_utils
{
class ProducerBase
{
public:
virtual void produce() = 0;
virtual void wait_for_matched() = 0;
};
template<typename T>
class MessageProducer : public ProducerBase
{
public:
MessageProducer(rclcpp::Node & node, std::string topic, const PublisherGroupConfig & config);
void produce() override;
void wait_for_matched() override;
private:
void resize(size_t size);
T message_;
std::shared_ptr<rclcpp::Publisher<T>> publisher_;
};
template<typename T>
MessageProducer<T>::MessageProducer(
rclcpp::Node & node, std::string topic,
const PublisherGroupConfig & config)
: publisher_(node.create_publisher<T>(topic, config.qos))
{
if (config.producer_config.message_size > 0) {
resize(config.producer_config.message_size);
}
}
template<typename T>
void MessageProducer<T>::wait_for_matched()
{
const double max_subscription_wait_time = 5.0;
auto start_time = std::chrono::high_resolution_clock::now();
while (publisher_->get_subscription_count() == 0U) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
auto current_time = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed = current_time - start_time;
if (elapsed.count() >= max_subscription_wait_time) {
throw std::runtime_error("Waited too long for subscription");
}
}
}
template<typename T>
void MessageProducer<T>::produce()
{
if (!rclcpp::ok()) {
return;
}
publisher_->publish(message_);
}
template<typename T>
void MessageProducer<T>::resize(size_t size)
{
(void)size;
throw std::runtime_error(
"Resize not implemented for type: " +
std::string(rosidl_generator_traits::data_type<T>()));
}
template<>
void MessageProducer<rosbag2_performance_benchmarking_msgs::msg::ByteArray>::resize(size_t size)
{
helpers::generate_data(message_, size);
}
template<>
void MessageProducer<sensor_msgs::msg::Image>::resize(size_t size)
{
helpers::generate_data(message_, size);
}
template<>
void MessageProducer<sensor_msgs::msg::PointCloud2>::resize(size_t size)
{
helpers::generate_data(message_, size);
}
} // namespace msg_utils
#endif // MSG_UTILS__MESSAGE_PRODUCER_HPP_