Program Listing for File publisher.hpp
↰ Return to documentation for file (include/rclcpp/publisher.hpp
)
// Copyright 2014 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP__PUBLISHER_HPP_
#define RCLCPP__PUBLISHER_HPP_
#include <functional>
#include <iostream>
#include <memory>
#include <sstream>
#include <string>
#include <type_traits>
#include <utility>
#include "rcl/error_handling.h"
#include "rcl/publisher.h"
#include "rmw/error_handling.h"
#include "rmw/rmw.h"
#include "rosidl_runtime_cpp/traits.hpp"
#include "rclcpp/allocator/allocator_common.hpp"
#include "rclcpp/allocator/allocator_deleter.hpp"
#include "rclcpp/detail/resolve_use_intra_process.hpp"
#include "rclcpp/detail/resolve_intra_process_buffer_type.hpp"
#include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
#include "rclcpp/experimental/create_intra_process_buffer.hpp"
#include "rclcpp/experimental/intra_process_manager.hpp"
#include "rclcpp/get_message_type_support_handle.hpp"
#include "rclcpp/is_ros_compatible_type.hpp"
#include "rclcpp/loaned_message.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/publisher_base.hpp"
#include "rclcpp/publisher_options.hpp"
#include "rclcpp/type_adapter.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/visibility_control.hpp"
#include "tracetools/tracetools.h"
namespace rclcpp
{
template<typename MessageT, typename AllocatorT>
class LoanedMessage;
template<typename MessageT, typename AllocatorT = std::allocator<void>>
class Publisher : public PublisherBase
{
public:
static_assert(
rclcpp::is_ros_compatible_type<MessageT>::value,
"given message type is not compatible with ROS and cannot be used with a Publisher");
using PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type;
using ROSMessageType = typename rclcpp::TypeAdapter<MessageT>::ros_message_type;
using PublishedTypeAllocatorTraits = allocator::AllocRebind<PublishedType, AllocatorT>;
using PublishedTypeAllocator = typename PublishedTypeAllocatorTraits::allocator_type;
using PublishedTypeDeleter = allocator::Deleter<PublishedTypeAllocator, PublishedType>;
using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, AllocatorT>;
using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type;
using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;
using BufferSharedPtr = typename rclcpp::experimental::buffers::IntraProcessBuffer<
ROSMessageType,
ROSMessageTypeAllocator,
ROSMessageTypeDeleter
>::SharedPtr;
RCLCPP_SMART_PTR_DEFINITIONS(Publisher<MessageT, AllocatorT>)
Publisher(
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic,
const rclcpp::QoS & qos,
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options)
: PublisherBase(
node_base,
topic,
rclcpp::get_message_type_support_handle<MessageT>(),
options.template to_rcl_publisher_options<MessageT>(qos),
// NOTE(methylDragon): Passing these args separately is necessary for event binding
options.event_callbacks,
options.use_default_callbacks),
options_(options),
published_type_allocator_(*options.get_allocator()),
ros_message_type_allocator_(*options.get_allocator())
{
allocator::set_allocator_for_deleter(&published_type_deleter_, &published_type_allocator_);
allocator::set_allocator_for_deleter(&ros_message_type_deleter_, &ros_message_type_allocator_);
// Setup continues in the post construction method, post_init_setup().
}
virtual
void
post_init_setup(
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic,
const rclcpp::QoS & qos,
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options)
{
(void)qos;
(void)options;
// If needed, setup intra process communication.
if (rclcpp::detail::resolve_use_intra_process(options_, *node_base)) {
auto context = node_base->get_context();
// Get the intra process manager instance for this context.
auto ipm = context->get_sub_context<rclcpp::experimental::IntraProcessManager>();
// Check if the QoS is compatible with intra-process.
auto qos_profile = get_actual_qos();
if (qos_profile.history() != rclcpp::HistoryPolicy::KeepLast) {
throw std::invalid_argument(
"intraprocess communication on topic '" + topic +
"' allowed only with keep last history qos policy");
}
if (qos_profile.depth() == 0) {
throw std::invalid_argument(
"intraprocess communication on topic '" + topic +
"' is not allowed with a zero qos history depth value");
}
if (qos_profile.durability() == rclcpp::DurabilityPolicy::TransientLocal) {
buffer_ = rclcpp::experimental::create_intra_process_buffer<
ROSMessageType, ROSMessageTypeAllocator, ROSMessageTypeDeleter>(
rclcpp::detail::resolve_intra_process_buffer_type(options_.intra_process_buffer_type),
qos_profile,
std::make_shared<ROSMessageTypeAllocator>(ros_message_type_allocator_));
}
// Register the publisher with the intra process manager.
uint64_t intra_process_publisher_id = ipm->add_publisher(this->shared_from_this(), buffer_);
this->setup_intra_process(
intra_process_publisher_id,
ipm);
}
}
virtual ~Publisher()
{}
rclcpp::LoanedMessage<ROSMessageType, AllocatorT>
borrow_loaned_message()
{
return rclcpp::LoanedMessage<ROSMessageType, AllocatorT>(
*this,
this->get_ros_message_type_allocator());
}
template<typename T>
typename std::enable_if_t<
rosidl_generator_traits::is_message<T>::value &&
std::is_same<T, ROSMessageType>::value
>
publish(std::unique_ptr<T, ROSMessageTypeDeleter> msg)
{
if (!intra_process_is_enabled_) {
this->do_inter_process_publish(*msg);
return;
}
// If an interprocess subscription exist, then the unique_ptr is promoted
// to a shared_ptr and published.
// This allows doing the intraprocess publish first and then doing the
// interprocess publish, resulting in lower publish-to-subscribe latency.
// It's not possible to do that with an unique_ptr,
// as do_intra_process_publish takes the ownership of the message.
// When durability is set to TransientLocal (i.e. there is a buffer),
// inter process publish should always take place to ensure
// late joiners receive past data.
bool inter_process_publish_needed =
get_subscription_count() > get_intra_process_subscription_count() || buffer_;
if (inter_process_publish_needed) {
auto shared_msg =
this->do_intra_process_ros_message_publish_and_return_shared(std::move(msg));
if (buffer_) {
buffer_->add_shared(shared_msg);
}
this->do_inter_process_publish(*shared_msg);
} else {
if (buffer_) {
auto shared_msg =
this->do_intra_process_ros_message_publish_and_return_shared(std::move(msg));
buffer_->add_shared(shared_msg);
} else {
this->do_intra_process_ros_message_publish(std::move(msg));
}
}
}
template<typename T>
typename std::enable_if_t<
rosidl_generator_traits::is_message<T>::value &&
std::is_same<T, ROSMessageType>::value
>
publish(const T & msg)
{
// Avoid allocating when not using intra process.
if (!intra_process_is_enabled_) {
this->do_inter_process_publish(msg);
return;
}
// Otherwise we have to allocate memory in a unique_ptr and pass it along.
// As the message is not const, a copy should be made.
// A shared_ptr<const MessageT> could also be constructed here.
auto unique_msg = this->duplicate_ros_message_as_unique_ptr(msg);
this->publish(std::move(unique_msg));
}
template<typename T>
typename std::enable_if_t<
rclcpp::TypeAdapter<MessageT>::is_specialized::value &&
std::is_same<T, PublishedType>::value
>
publish(std::unique_ptr<T, PublishedTypeDeleter> msg)
{
if (!intra_process_is_enabled_) {
// In this case we're not using intra process.
auto ros_msg_ptr = std::make_unique<ROSMessageType>();
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*msg, *ros_msg_ptr);
this->do_inter_process_publish(*ros_msg_ptr);
return;
}
// When durability is set to TransientLocal (i.e. there is a buffer),
// inter process publish should always take place to ensure
// late joiners receive past data.
bool inter_process_publish_needed =
get_subscription_count() > get_intra_process_subscription_count() || buffer_;
if (inter_process_publish_needed) {
auto ros_msg_ptr = std::make_shared<ROSMessageType>();
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*msg, *ros_msg_ptr);
this->do_intra_process_publish(std::move(msg));
this->do_inter_process_publish(*ros_msg_ptr);
if (buffer_) {
buffer_->add_shared(ros_msg_ptr);
}
} else {
if (buffer_) {
auto ros_msg_ptr = std::make_shared<ROSMessageType>();
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*msg, *ros_msg_ptr);
buffer_->add_shared(ros_msg_ptr);
}
this->do_intra_process_publish(std::move(msg));
}
}
template<typename T>
typename std::enable_if_t<
rclcpp::TypeAdapter<MessageT>::is_specialized::value &&
std::is_same<T, PublishedType>::value
>
publish(const T & msg)
{
if (!intra_process_is_enabled_) {
// Convert to the ROS message equivalent and publish it.
auto ros_msg_ptr = std::make_unique<ROSMessageType>();
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(msg, *ros_msg_ptr);
this->do_inter_process_publish(*ros_msg_ptr);
return;
}
// Otherwise we have to allocate memory in a unique_ptr and pass it along.
// As the message is not const, a copy should be made.
// A shared_ptr<const MessageT> could also be constructed here.
auto unique_msg = this->duplicate_type_adapt_message_as_unique_ptr(msg);
this->publish(std::move(unique_msg));
}
void
publish(const rcl_serialized_message_t & serialized_msg)
{
return this->do_serialized_publish(&serialized_msg);
}
void
publish(const SerializedMessage & serialized_msg)
{
return this->do_serialized_publish(&serialized_msg.get_rcl_serialized_message());
}
void
publish(rclcpp::LoanedMessage<ROSMessageType, AllocatorT> && loaned_msg)
{
if (!loaned_msg.is_valid()) {
throw std::runtime_error("loaned message is not valid");
}
// verify that publisher supports loaned messages
// TODO(Karsten1987): This case separation has to be done in rclcpp
// otherwise we have to ensure that every middleware implements
// `rmw_publish_loaned_message` explicitly the same way as `rmw_publish`
// by taking a copy of the ros message.
if (this->can_loan_messages()) {
// we release the ownership from the rclpp::LoanedMessage instance
// and let the middleware clean up the memory.
this->do_loaned_message_publish(loaned_msg.release());
} else {
// we don't release the ownership, let the middleware copy the ros message
// and thus the destructor of rclcpp::LoanedMessage cleans up the memory.
this->publish(loaned_msg.get());
}
}
PublishedTypeAllocator
get_published_type_allocator() const
{
return published_type_allocator_;
}
ROSMessageTypeAllocator
get_ros_message_type_allocator() const
{
return ros_message_type_allocator_;
}
protected:
void
do_inter_process_publish(const ROSMessageType & msg)
{
TRACETOOLS_TRACEPOINT(rclcpp_publish, nullptr, static_cast<const void *>(&msg));
auto status = rcl_publish(publisher_handle_.get(), &msg, nullptr);
if (RCL_RET_PUBLISHER_INVALID == status) {
rcl_reset_error(); // next call will reset error message if not context
if (rcl_publisher_is_valid_except_context(publisher_handle_.get())) {
rcl_context_t * context = rcl_publisher_get_context(publisher_handle_.get());
if (nullptr != context && !rcl_context_is_valid(context)) {
// publisher is invalid due to context being shutdown
return;
}
}
}
if (RCL_RET_OK != status) {
rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish message");
}
}
void
do_serialized_publish(const rcl_serialized_message_t * serialized_msg)
{
if (intra_process_is_enabled_) {
// TODO(Karsten1987): support serialized message passed by intraprocess
throw std::runtime_error("storing serialized messages in intra process is not supported yet");
}
auto status = rcl_publish_serialized_message(publisher_handle_.get(), serialized_msg, nullptr);
if (RCL_RET_OK != status) {
rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish serialized message");
}
}
void
do_loaned_message_publish(
std::unique_ptr<ROSMessageType, std::function<void(ROSMessageType *)>> msg)
{
TRACETOOLS_TRACEPOINT(rclcpp_publish, nullptr, static_cast<const void *>(msg.get()));
auto status = rcl_publish_loaned_message(publisher_handle_.get(), msg.get(), nullptr);
if (RCL_RET_PUBLISHER_INVALID == status) {
rcl_reset_error(); // next call will reset error message if not context
if (rcl_publisher_is_valid_except_context(publisher_handle_.get())) {
rcl_context_t * context = rcl_publisher_get_context(publisher_handle_.get());
if (nullptr != context && !rcl_context_is_valid(context)) {
// publisher is invalid due to context being shutdown
return;
}
}
}
if (RCL_RET_OK != status) {
rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish message");
}
}
void
do_intra_process_publish(std::unique_ptr<PublishedType, PublishedTypeDeleter> msg)
{
auto ipm = weak_ipm_.lock();
if (!ipm) {
throw std::runtime_error(
"intra process publish called after destruction of intra process manager");
}
if (!msg) {
throw std::runtime_error("cannot publish msg which is a null pointer");
}
TRACETOOLS_TRACEPOINT(
rclcpp_intra_publish,
static_cast<const void *>(publisher_handle_.get()),
msg.get());
ipm->template do_intra_process_publish<PublishedType, ROSMessageType, AllocatorT>(
intra_process_publisher_id_,
std::move(msg),
published_type_allocator_);
}
void
do_intra_process_ros_message_publish(std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter> msg)
{
auto ipm = weak_ipm_.lock();
if (!ipm) {
throw std::runtime_error(
"intra process publish called after destruction of intra process manager");
}
if (!msg) {
throw std::runtime_error("cannot publish msg which is a null pointer");
}
TRACETOOLS_TRACEPOINT(
rclcpp_intra_publish,
static_cast<const void *>(publisher_handle_.get()),
msg.get());
ipm->template do_intra_process_publish<ROSMessageType, ROSMessageType, AllocatorT>(
intra_process_publisher_id_,
std::move(msg),
ros_message_type_allocator_);
}
std::shared_ptr<const ROSMessageType>
do_intra_process_ros_message_publish_and_return_shared(
std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter> msg)
{
auto ipm = weak_ipm_.lock();
if (!ipm) {
throw std::runtime_error(
"intra process publish called after destruction of intra process manager");
}
if (!msg) {
throw std::runtime_error("cannot publish msg which is a null pointer");
}
TRACETOOLS_TRACEPOINT(
rclcpp_intra_publish,
static_cast<const void *>(publisher_handle_.get()),
msg.get());
return ipm->template do_intra_process_publish_and_return_shared<ROSMessageType, ROSMessageType,
AllocatorT>(
intra_process_publisher_id_,
std::move(msg),
ros_message_type_allocator_);
}
std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>
create_ros_message_unique_ptr()
{
auto ptr = ROSMessageTypeAllocatorTraits::allocate(ros_message_type_allocator_, 1);
ROSMessageTypeAllocatorTraits::construct(ros_message_type_allocator_, ptr);
return std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>(ptr, ros_message_type_deleter_);
}
std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>
duplicate_ros_message_as_unique_ptr(const ROSMessageType & msg)
{
auto ptr = ROSMessageTypeAllocatorTraits::allocate(ros_message_type_allocator_, 1);
ROSMessageTypeAllocatorTraits::construct(ros_message_type_allocator_, ptr, msg);
return std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>(ptr, ros_message_type_deleter_);
}
std::unique_ptr<PublishedType, PublishedTypeDeleter>
duplicate_type_adapt_message_as_unique_ptr(const PublishedType & msg)
{
auto ptr = PublishedTypeAllocatorTraits::allocate(published_type_allocator_, 1);
PublishedTypeAllocatorTraits::construct(published_type_allocator_, ptr, msg);
return std::unique_ptr<PublishedType, PublishedTypeDeleter>(ptr, published_type_deleter_);
}
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> options_;
PublishedTypeAllocator published_type_allocator_;
PublishedTypeDeleter published_type_deleter_;
ROSMessageTypeAllocator ros_message_type_allocator_;
ROSMessageTypeDeleter ros_message_type_deleter_;
BufferSharedPtr buffer_{nullptr};
};
} // namespace rclcpp
#endif // RCLCPP__PUBLISHER_HPP_