Program Listing for File publisher.hpp

Return to documentation for file (include/rclcpp/publisher.hpp)

// Copyright 2014 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__PUBLISHER_HPP_
#define RCLCPP__PUBLISHER_HPP_

#include <functional>
#include <iostream>
#include <memory>
#include <sstream>
#include <string>
#include <type_traits>
#include <utility>

#include "rcl/error_handling.h"
#include "rcl/publisher.h"
#include "rmw/error_handling.h"
#include "rmw/rmw.h"
#include "rosidl_runtime_cpp/traits.hpp"

#include "rclcpp/allocator/allocator_common.hpp"
#include "rclcpp/allocator/allocator_deleter.hpp"
#include "rclcpp/detail/resolve_use_intra_process.hpp"
#include "rclcpp/detail/resolve_intra_process_buffer_type.hpp"
#include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
#include "rclcpp/experimental/create_intra_process_buffer.hpp"
#include "rclcpp/experimental/intra_process_manager.hpp"
#include "rclcpp/get_message_type_support_handle.hpp"
#include "rclcpp/is_ros_compatible_type.hpp"
#include "rclcpp/loaned_message.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/publisher_base.hpp"
#include "rclcpp/publisher_options.hpp"
#include "rclcpp/type_adapter.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/visibility_control.hpp"

#include "tracetools/tracetools.h"

namespace rclcpp
{

template<typename MessageT, typename AllocatorT>
class LoanedMessage;


template<typename MessageT, typename AllocatorT = std::allocator<void>>
class Publisher : public PublisherBase
{
public:
  static_assert(
    rclcpp::is_ros_compatible_type<MessageT>::value,
    "given message type is not compatible with ROS and cannot be used with a Publisher");

  using PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type;
  using ROSMessageType = typename rclcpp::TypeAdapter<MessageT>::ros_message_type;

  using PublishedTypeAllocatorTraits = allocator::AllocRebind<PublishedType, AllocatorT>;
  using PublishedTypeAllocator = typename PublishedTypeAllocatorTraits::allocator_type;
  using PublishedTypeDeleter = allocator::Deleter<PublishedTypeAllocator, PublishedType>;

  using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, AllocatorT>;
  using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type;
  using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;

  using BufferSharedPtr = typename rclcpp::experimental::buffers::IntraProcessBuffer<
    ROSMessageType,
    ROSMessageTypeAllocator,
    ROSMessageTypeDeleter
    >::SharedPtr;

  RCLCPP_SMART_PTR_DEFINITIONS(Publisher<MessageT, AllocatorT>)


  Publisher(
    rclcpp::node_interfaces::NodeBaseInterface * node_base,
    const std::string & topic,
    const rclcpp::QoS & qos,
    const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options)
  : PublisherBase(
      node_base,
      topic,
      rclcpp::get_message_type_support_handle<MessageT>(),
      options.template to_rcl_publisher_options<MessageT>(qos),
      // NOTE(methylDragon): Passing these args separately is necessary for event binding
      options.event_callbacks,
      options.use_default_callbacks),
    options_(options),
    published_type_allocator_(*options.get_allocator()),
    ros_message_type_allocator_(*options.get_allocator())
  {
    allocator::set_allocator_for_deleter(&published_type_deleter_, &published_type_allocator_);
    allocator::set_allocator_for_deleter(&ros_message_type_deleter_, &ros_message_type_allocator_);
    // Setup continues in the post construction method, post_init_setup().
  }

  virtual
  void
  post_init_setup(
    rclcpp::node_interfaces::NodeBaseInterface * node_base,
    const std::string & topic,
    const rclcpp::QoS & qos,
    const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options)
  {
    (void)qos;
    (void)options;

    // If needed, setup intra process communication.
    if (rclcpp::detail::resolve_use_intra_process(options_, *node_base)) {
      auto context = node_base->get_context();
      // Get the intra process manager instance for this context.
      auto ipm = context->get_sub_context<rclcpp::experimental::IntraProcessManager>();
      // Check if the QoS is compatible with intra-process.
      auto qos_profile = get_actual_qos();
      if (qos_profile.history() != rclcpp::HistoryPolicy::KeepLast) {
        throw std::invalid_argument(
                "intraprocess communication on topic '" + topic +
                "' allowed only with keep last history qos policy");
      }
      if (qos_profile.depth() == 0) {
        throw std::invalid_argument(
                "intraprocess communication on topic '" + topic +
                "' is not allowed with a zero qos history depth value");
      }
      if (qos_profile.durability() == rclcpp::DurabilityPolicy::TransientLocal) {
        buffer_ = rclcpp::experimental::create_intra_process_buffer<
          ROSMessageType, ROSMessageTypeAllocator, ROSMessageTypeDeleter>(
          rclcpp::detail::resolve_intra_process_buffer_type(options_.intra_process_buffer_type),
          qos_profile,
          std::make_shared<ROSMessageTypeAllocator>(ros_message_type_allocator_));
      }
      // Register the publisher with the intra process manager.
      uint64_t intra_process_publisher_id = ipm->add_publisher(this->shared_from_this(), buffer_);
      this->setup_intra_process(
        intra_process_publisher_id,
        ipm);
    }
  }

  virtual ~Publisher()
  {}


  rclcpp::LoanedMessage<ROSMessageType, AllocatorT>
  borrow_loaned_message()
  {
    return rclcpp::LoanedMessage<ROSMessageType, AllocatorT>(
      *this,
      this->get_ros_message_type_allocator());
  }


  template<typename T>
  typename std::enable_if_t<
    rosidl_generator_traits::is_message<T>::value &&
    std::is_same<T, ROSMessageType>::value
  >
  publish(std::unique_ptr<T, ROSMessageTypeDeleter> msg)
  {
    if (!intra_process_is_enabled_) {
      this->do_inter_process_publish(*msg);
      return;
    }
    // If an interprocess subscription exist, then the unique_ptr is promoted
    // to a shared_ptr and published.
    // This allows doing the intraprocess publish first and then doing the
    // interprocess publish, resulting in lower publish-to-subscribe latency.
    // It's not possible to do that with an unique_ptr,
    // as do_intra_process_publish takes the ownership of the message.

    // When durability is set to TransientLocal (i.e. there is a buffer),
    // inter process publish should always take place to ensure
    // late joiners receive past data.
    bool inter_process_publish_needed =
      get_subscription_count() > get_intra_process_subscription_count() || buffer_;

    if (inter_process_publish_needed) {
      auto shared_msg =
        this->do_intra_process_ros_message_publish_and_return_shared(std::move(msg));
      if (buffer_) {
        buffer_->add_shared(shared_msg);
      }
      this->do_inter_process_publish(*shared_msg);
    } else {
      if (buffer_) {
        auto shared_msg =
          this->do_intra_process_ros_message_publish_and_return_shared(std::move(msg));
        buffer_->add_shared(shared_msg);
      } else {
        this->do_intra_process_ros_message_publish(std::move(msg));
      }
    }
  }


  template<typename T>
  typename std::enable_if_t<
    rosidl_generator_traits::is_message<T>::value &&
    std::is_same<T, ROSMessageType>::value
  >
  publish(const T & msg)
  {
    // Avoid allocating when not using intra process.
    if (!intra_process_is_enabled_) {
      this->do_inter_process_publish(msg);
      return;
    }
    // Otherwise we have to allocate memory in a unique_ptr and pass it along.
    // As the message is not const, a copy should be made.
    // A shared_ptr<const MessageT> could also be constructed here.
    auto unique_msg = this->duplicate_ros_message_as_unique_ptr(msg);
    this->publish(std::move(unique_msg));
  }


  template<typename T>
  typename std::enable_if_t<
    rclcpp::TypeAdapter<MessageT>::is_specialized::value &&
    std::is_same<T, PublishedType>::value
  >
  publish(std::unique_ptr<T, PublishedTypeDeleter> msg)
  {
    if (!intra_process_is_enabled_) {
      // In this case we're not using intra process.
      auto ros_msg_ptr = std::make_unique<ROSMessageType>();
      rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*msg, *ros_msg_ptr);
      this->do_inter_process_publish(*ros_msg_ptr);
      return;
    }

    // When durability is set to TransientLocal (i.e. there is a buffer),
    // inter process publish should always take place to ensure
    // late joiners receive past data.
    bool inter_process_publish_needed =
      get_subscription_count() > get_intra_process_subscription_count() || buffer_;

    if (inter_process_publish_needed) {
      auto ros_msg_ptr = std::make_shared<ROSMessageType>();
      rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*msg, *ros_msg_ptr);
      this->do_intra_process_publish(std::move(msg));
      this->do_inter_process_publish(*ros_msg_ptr);
      if (buffer_) {
        buffer_->add_shared(ros_msg_ptr);
      }
    } else {
      if (buffer_) {
        auto ros_msg_ptr = std::make_shared<ROSMessageType>();
        rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*msg, *ros_msg_ptr);
        buffer_->add_shared(ros_msg_ptr);
      }
      this->do_intra_process_publish(std::move(msg));
    }
  }


  template<typename T>
  typename std::enable_if_t<
    rclcpp::TypeAdapter<MessageT>::is_specialized::value &&
    std::is_same<T, PublishedType>::value
  >
  publish(const T & msg)
  {
    if (!intra_process_is_enabled_) {
      // Convert to the ROS message equivalent and publish it.
      auto ros_msg_ptr = std::make_unique<ROSMessageType>();
      rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(msg, *ros_msg_ptr);
      this->do_inter_process_publish(*ros_msg_ptr);
      return;
    }

    // Otherwise we have to allocate memory in a unique_ptr and pass it along.
    // As the message is not const, a copy should be made.
    // A shared_ptr<const MessageT> could also be constructed here.
    auto unique_msg = this->duplicate_type_adapt_message_as_unique_ptr(msg);
    this->publish(std::move(unique_msg));
  }

  void
  publish(const rcl_serialized_message_t & serialized_msg)
  {
    return this->do_serialized_publish(&serialized_msg);
  }

  void
  publish(const SerializedMessage & serialized_msg)
  {
    return this->do_serialized_publish(&serialized_msg.get_rcl_serialized_message());
  }


  void
  publish(rclcpp::LoanedMessage<ROSMessageType, AllocatorT> && loaned_msg)
  {
    if (!loaned_msg.is_valid()) {
      throw std::runtime_error("loaned message is not valid");
    }

    // verify that publisher supports loaned messages
    // TODO(Karsten1987): This case separation has to be done in rclcpp
    // otherwise we have to ensure that every middleware implements
    // `rmw_publish_loaned_message` explicitly the same way as `rmw_publish`
    // by taking a copy of the ros message.
    if (this->can_loan_messages()) {
      // we release the ownership from the rclpp::LoanedMessage instance
      // and let the middleware clean up the memory.
      this->do_loaned_message_publish(loaned_msg.release());
    } else {
      // we don't release the ownership, let the middleware copy the ros message
      // and thus the destructor of rclcpp::LoanedMessage cleans up the memory.
      this->publish(loaned_msg.get());
    }
  }

  PublishedTypeAllocator
  get_published_type_allocator() const
  {
    return published_type_allocator_;
  }

  ROSMessageTypeAllocator
  get_ros_message_type_allocator() const
  {
    return ros_message_type_allocator_;
  }

protected:
  void
  do_inter_process_publish(const ROSMessageType & msg)
  {
    TRACETOOLS_TRACEPOINT(rclcpp_publish, nullptr, static_cast<const void *>(&msg));
    auto status = rcl_publish(publisher_handle_.get(), &msg, nullptr);

    if (RCL_RET_PUBLISHER_INVALID == status) {
      rcl_reset_error();  // next call will reset error message if not context
      if (rcl_publisher_is_valid_except_context(publisher_handle_.get())) {
        rcl_context_t * context = rcl_publisher_get_context(publisher_handle_.get());
        if (nullptr != context && !rcl_context_is_valid(context)) {
          // publisher is invalid due to context being shutdown
          return;
        }
      }
    }
    if (RCL_RET_OK != status) {
      rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish message");
    }
  }

  void
  do_serialized_publish(const rcl_serialized_message_t * serialized_msg)
  {
    if (intra_process_is_enabled_) {
      // TODO(Karsten1987): support serialized message passed by intraprocess
      throw std::runtime_error("storing serialized messages in intra process is not supported yet");
    }
    auto status = rcl_publish_serialized_message(publisher_handle_.get(), serialized_msg, nullptr);
    if (RCL_RET_OK != status) {
      rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish serialized message");
    }
  }

  void
  do_loaned_message_publish(
    std::unique_ptr<ROSMessageType, std::function<void(ROSMessageType *)>> msg)
  {
    TRACETOOLS_TRACEPOINT(rclcpp_publish, nullptr, static_cast<const void *>(msg.get()));
    auto status = rcl_publish_loaned_message(publisher_handle_.get(), msg.get(), nullptr);

    if (RCL_RET_PUBLISHER_INVALID == status) {
      rcl_reset_error();  // next call will reset error message if not context
      if (rcl_publisher_is_valid_except_context(publisher_handle_.get())) {
        rcl_context_t * context = rcl_publisher_get_context(publisher_handle_.get());
        if (nullptr != context && !rcl_context_is_valid(context)) {
          // publisher is invalid due to context being shutdown
          return;
        }
      }
    }
    if (RCL_RET_OK != status) {
      rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish message");
    }
  }

  void
  do_intra_process_publish(std::unique_ptr<PublishedType, PublishedTypeDeleter> msg)
  {
    auto ipm = weak_ipm_.lock();
    if (!ipm) {
      throw std::runtime_error(
              "intra process publish called after destruction of intra process manager");
    }
    if (!msg) {
      throw std::runtime_error("cannot publish msg which is a null pointer");
    }
    TRACETOOLS_TRACEPOINT(
      rclcpp_intra_publish,
      static_cast<const void *>(publisher_handle_.get()),
      msg.get());

    ipm->template do_intra_process_publish<PublishedType, ROSMessageType, AllocatorT>(
      intra_process_publisher_id_,
      std::move(msg),
      published_type_allocator_);
  }

  void
  do_intra_process_ros_message_publish(std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter> msg)
  {
    auto ipm = weak_ipm_.lock();
    if (!ipm) {
      throw std::runtime_error(
              "intra process publish called after destruction of intra process manager");
    }
    if (!msg) {
      throw std::runtime_error("cannot publish msg which is a null pointer");
    }
    TRACETOOLS_TRACEPOINT(
      rclcpp_intra_publish,
      static_cast<const void *>(publisher_handle_.get()),
      msg.get());

    ipm->template do_intra_process_publish<ROSMessageType, ROSMessageType, AllocatorT>(
      intra_process_publisher_id_,
      std::move(msg),
      ros_message_type_allocator_);
  }

  std::shared_ptr<const ROSMessageType>
  do_intra_process_ros_message_publish_and_return_shared(
    std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter> msg)
  {
    auto ipm = weak_ipm_.lock();
    if (!ipm) {
      throw std::runtime_error(
              "intra process publish called after destruction of intra process manager");
    }
    if (!msg) {
      throw std::runtime_error("cannot publish msg which is a null pointer");
    }
    TRACETOOLS_TRACEPOINT(
      rclcpp_intra_publish,
      static_cast<const void *>(publisher_handle_.get()),
      msg.get());

    return ipm->template do_intra_process_publish_and_return_shared<ROSMessageType, ROSMessageType,
             AllocatorT>(
      intra_process_publisher_id_,
      std::move(msg),
      ros_message_type_allocator_);
  }


  std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>
  create_ros_message_unique_ptr()
  {
    auto ptr = ROSMessageTypeAllocatorTraits::allocate(ros_message_type_allocator_, 1);
    ROSMessageTypeAllocatorTraits::construct(ros_message_type_allocator_, ptr);
    return std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>(ptr, ros_message_type_deleter_);
  }

  std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>
  duplicate_ros_message_as_unique_ptr(const ROSMessageType & msg)
  {
    auto ptr = ROSMessageTypeAllocatorTraits::allocate(ros_message_type_allocator_, 1);
    ROSMessageTypeAllocatorTraits::construct(ros_message_type_allocator_, ptr, msg);
    return std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>(ptr, ros_message_type_deleter_);
  }

  std::unique_ptr<PublishedType, PublishedTypeDeleter>
  duplicate_type_adapt_message_as_unique_ptr(const PublishedType & msg)
  {
    auto ptr = PublishedTypeAllocatorTraits::allocate(published_type_allocator_, 1);
    PublishedTypeAllocatorTraits::construct(published_type_allocator_, ptr, msg);
    return std::unique_ptr<PublishedType, PublishedTypeDeleter>(ptr, published_type_deleter_);
  }


  const rclcpp::PublisherOptionsWithAllocator<AllocatorT> options_;

  PublishedTypeAllocator published_type_allocator_;
  PublishedTypeDeleter published_type_deleter_;
  ROSMessageTypeAllocator ros_message_type_allocator_;
  ROSMessageTypeDeleter ros_message_type_deleter_;

  BufferSharedPtr buffer_{nullptr};
};

}  // namespace rclcpp

#endif  // RCLCPP__PUBLISHER_HPP_