Program Listing for File subscription.hpp

Return to documentation for file (include/rclcpp/subscription.hpp)

// Copyright 2014 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__SUBSCRIPTION_HPP_
#define RCLCPP__SUBSCRIPTION_HPP_

#include <rmw/error_handling.h>
#include <rmw/rmw.h>

#include <chrono>
#include <functional>
#include <iostream>
#include <memory>
#include <sstream>
#include <string>
#include <utility>

#include "rcl/error_handling.h"
#include "rcl/subscription.h"

#include "rclcpp/any_subscription_callback.hpp"
#include "rclcpp/detail/resolve_use_intra_process.hpp"
#include "rclcpp/detail/resolve_intra_process_buffer_type.hpp"
#include "rclcpp/exceptions.hpp"
#include "rclcpp/expand_topic_or_service_name.hpp"
#include "rclcpp/experimental/intra_process_manager.hpp"
#include "rclcpp/experimental/subscription_intra_process.hpp"
#include "rclcpp/logging.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/message_info.hpp"
#include "rclcpp/message_memory_strategy.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/subscription_base.hpp"
#include "rclcpp/subscription_options.hpp"
#include "rclcpp/subscription_traits.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/visibility_control.hpp"
#include "rclcpp/waitable.hpp"
#include "rclcpp/topic_statistics/subscription_topic_statistics.hpp"
#include "tracetools/tracetools.h"

namespace rclcpp
{

namespace node_interfaces
{
class NodeTopicsInterface;
}  // namespace node_interfaces

template<
  typename MessageT,
  typename AllocatorT = std::allocator<void>,
  typename SubscribedT = typename rclcpp::TypeAdapter<MessageT>::custom_type,
  typename ROSMessageT = typename rclcpp::TypeAdapter<MessageT>::ros_message_type,
  typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy<
    ROSMessageT,
    AllocatorT
  >>
class Subscription : public SubscriptionBase
{
  friend class rclcpp::node_interfaces::NodeTopicsInterface;

public:
  // Redeclare these here to use outside of the class.
  using SubscribedType = SubscribedT;
  using ROSMessageType = ROSMessageT;
  using MessageMemoryStrategyType = MessageMemoryStrategyT;

  using SubscribedTypeAllocatorTraits = allocator::AllocRebind<SubscribedType, AllocatorT>;
  using SubscribedTypeAllocator = typename SubscribedTypeAllocatorTraits::allocator_type;
  using SubscribedTypeDeleter = allocator::Deleter<SubscribedTypeAllocator, SubscribedType>;

  using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, AllocatorT>;
  using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type;
  using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;

  using MessageAllocatorTraits [[deprecated("use ROSMessageTypeAllocatorTraits")]] =
    ROSMessageTypeAllocatorTraits;
  using MessageAllocator [[deprecated("use ROSMessageTypeAllocator")]] =
    ROSMessageTypeAllocator;
  using MessageDeleter [[deprecated("use ROSMessageTypeDeleter")]] =
    ROSMessageTypeDeleter;

  using ConstMessageSharedPtr [[deprecated]] = std::shared_ptr<const ROSMessageType>;
  using MessageUniquePtr
  [[deprecated("use std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter> instead")]] =
    std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>;

private:
  using SubscriptionTopicStatisticsSharedPtr =
    std::shared_ptr<rclcpp::topic_statistics::SubscriptionTopicStatistics<ROSMessageType>>;

public:
  RCLCPP_SMART_PTR_DEFINITIONS(Subscription)


  Subscription(
    rclcpp::node_interfaces::NodeBaseInterface * node_base,
    const rosidl_message_type_support_t & type_support_handle,
    const std::string & topic_name,
    const rclcpp::QoS & qos,
    AnySubscriptionCallback<MessageT, AllocatorT> callback,
    const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options,
    typename MessageMemoryStrategyT::SharedPtr message_memory_strategy,
    SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics = nullptr)
  : SubscriptionBase(
      node_base,
      type_support_handle,
      topic_name,
      options.template to_rcl_subscription_options<ROSMessageType>(qos),
      callback.is_serialized_message_callback()),
    any_callback_(callback),
    options_(options),
    message_memory_strategy_(message_memory_strategy)
  {
    if (options_.event_callbacks.deadline_callback) {
      this->add_event_handler(
        options_.event_callbacks.deadline_callback,
        RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED);
    }
    if (options_.event_callbacks.liveliness_callback) {
      this->add_event_handler(
        options_.event_callbacks.liveliness_callback,
        RCL_SUBSCRIPTION_LIVELINESS_CHANGED);
    }
    if (options_.event_callbacks.incompatible_qos_callback) {
      this->add_event_handler(
        options_.event_callbacks.incompatible_qos_callback,
        RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS);
    } else if (options_.use_default_callbacks) {
      // Register default callback when not specified
      try {
        this->add_event_handler(
          [this](QOSRequestedIncompatibleQoSInfo & info) {
            this->default_incompatible_qos_callback(info);
          },
          RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS);
      } catch (UnsupportedEventTypeException & /*exc*/) {
        // pass
      }
    }
    if (options_.event_callbacks.message_lost_callback) {
      this->add_event_handler(
        options_.event_callbacks.message_lost_callback,
        RCL_SUBSCRIPTION_MESSAGE_LOST);
    }

    // Setup intra process publishing if requested.
    if (rclcpp::detail::resolve_use_intra_process(options_, *node_base)) {
      using rclcpp::detail::resolve_intra_process_buffer_type;

      // Check if the QoS is compatible with intra-process.
      auto qos_profile = get_actual_qos();
      if (qos_profile.history() != rclcpp::HistoryPolicy::KeepLast) {
        throw std::invalid_argument(
                "intraprocess communication on topic '" + topic_name +
                "' allowed only with keep last history qos policy");
      }
      if (qos_profile.depth() == 0) {
        throw std::invalid_argument(
                "intraprocess communication on topic '" + topic_name +
                "' is not allowed with 0 depth qos policy");
      }
      if (qos_profile.durability() != rclcpp::DurabilityPolicy::Volatile) {
        throw std::invalid_argument(
                "intraprocess communication allowed only with volatile durability");
      }

      using SubscriptionIntraProcessT = rclcpp::experimental::SubscriptionIntraProcess<
        MessageT,
        SubscribedType,
        SubscribedTypeAllocator,
        SubscribedTypeDeleter,
        ROSMessageT,
        AllocatorT>;

      // First create a SubscriptionIntraProcess which will be given to the intra-process manager.
      auto context = node_base->get_context();
      subscription_intra_process_ = std::make_shared<SubscriptionIntraProcessT>(
        callback,
        options_.get_allocator(),
        context,
        this->get_topic_name(),  // important to get like this, as it has the fully-qualified name
        qos_profile,
        resolve_intra_process_buffer_type(options_.intra_process_buffer_type, callback));
      TRACEPOINT(
        rclcpp_subscription_init,
        static_cast<const void *>(get_subscription_handle().get()),
        static_cast<const void *>(subscription_intra_process_.get()));

      // Add it to the intra process manager.
      using rclcpp::experimental::IntraProcessManager;
      auto ipm = context->get_sub_context<IntraProcessManager>();
      uint64_t intra_process_subscription_id = ipm->add_subscription(subscription_intra_process_);
      this->setup_intra_process(intra_process_subscription_id, ipm);
    }

    if (subscription_topic_statistics != nullptr) {
      this->subscription_topic_statistics_ = std::move(subscription_topic_statistics);
    }

    TRACEPOINT(
      rclcpp_subscription_init,
      static_cast<const void *>(get_subscription_handle().get()),
      static_cast<const void *>(this));
    TRACEPOINT(
      rclcpp_subscription_callback_added,
      static_cast<const void *>(this),
      static_cast<const void *>(&any_callback_));
    // The callback object gets copied, so if registration is done too early/before this point
    // (e.g. in `AnySubscriptionCallback::set()`), its address won't match any address used later
    // in subsequent tracepoints.
#ifndef TRACETOOLS_DISABLED
    any_callback_.register_callback_for_tracing();
#endif
  }

  void
  post_init_setup(
    rclcpp::node_interfaces::NodeBaseInterface * node_base,
    const rclcpp::QoS & qos,
    const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options)
  {
    (void)node_base;
    (void)qos;
    (void)options;
  }


  bool
  take(ROSMessageType & message_out, rclcpp::MessageInfo & message_info_out)
  {
    return this->take_type_erased(static_cast<void *>(&message_out), message_info_out);
  }


  template<typename TakeT>
  std::enable_if_t<
    !rosidl_generator_traits::is_message<TakeT>::value &&
    std::is_same_v<TakeT, SubscribedType>,
    bool
  >
  take(TakeT & message_out, rclcpp::MessageInfo & message_info_out)
  {
    ROSMessageType local_message;
    bool taken = this->take_type_erased(static_cast<void *>(&local_message), message_info_out);
    if (taken) {
      rclcpp::TypeAdapter<MessageT>::convert_to_custom(local_message, message_out);
    }
    return taken;
  }

  std::shared_ptr<void>
  create_message() override
  {
    /* The default message memory strategy provides a dynamically allocated message on each call to
     * create_message, though alternative memory strategies that re-use a preallocated message may be
     * used (see rclcpp/strategies/message_pool_memory_strategy.hpp).
     */
    return message_memory_strategy_->borrow_message();
  }

  std::shared_ptr<rclcpp::SerializedMessage>
  create_serialized_message() override
  {
    return message_memory_strategy_->borrow_serialized_message();
  }

  void
  handle_message(
    std::shared_ptr<void> & message,
    const rclcpp::MessageInfo & message_info) override
  {
    if (matches_any_intra_process_publishers(&message_info.get_rmw_message_info().publisher_gid)) {
      // In this case, the message will be delivered via intra process and
      // we should ignore this copy of the message.
      return;
    }
    auto typed_message = std::static_pointer_cast<ROSMessageType>(message);

    std::chrono::time_point<std::chrono::system_clock> now;
    if (subscription_topic_statistics_) {
      // get current time before executing callback to
      // exclude callback duration from topic statistics result.
      now = std::chrono::system_clock::now();
    }

    any_callback_.dispatch(typed_message, message_info);

    if (subscription_topic_statistics_) {
      const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
      const auto time = rclcpp::Time(nanos.time_since_epoch().count());
      subscription_topic_statistics_->handle_message(*typed_message, time);
    }
  }

  void
  handle_serialized_message(
    const std::shared_ptr<rclcpp::SerializedMessage> & serialized_message,
    const rclcpp::MessageInfo & message_info) override
  {
    // TODO(wjwwood): enable topic statistics for serialized messages
    any_callback_.dispatch(serialized_message, message_info);
  }

  void
  handle_loaned_message(
    void * loaned_message,
    const rclcpp::MessageInfo & message_info) override
  {
    if (matches_any_intra_process_publishers(&message_info.get_rmw_message_info().publisher_gid)) {
      // In this case, the message will be delivered via intra process and
      // we should ignore this copy of the message.
      return;
    }

    auto typed_message = static_cast<ROSMessageType *>(loaned_message);
    // message is loaned, so we have to make sure that the deleter does not deallocate the message
    auto sptr = std::shared_ptr<ROSMessageType>(
      typed_message, [](ROSMessageType * msg) {(void) msg;});

    std::chrono::time_point<std::chrono::system_clock> now;
    if (subscription_topic_statistics_) {
      // get current time before executing callback to
      // exclude callback duration from topic statistics result.
      now = std::chrono::system_clock::now();
    }

    any_callback_.dispatch(sptr, message_info);

    if (subscription_topic_statistics_) {
      const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
      const auto time = rclcpp::Time(nanos.time_since_epoch().count());
      subscription_topic_statistics_->handle_message(*typed_message, time);
    }
  }


  void
  return_message(std::shared_ptr<void> & message) override
  {
    auto typed_message = std::static_pointer_cast<ROSMessageType>(message);
    message_memory_strategy_->return_message(typed_message);
  }


  void
  return_serialized_message(std::shared_ptr<rclcpp::SerializedMessage> & message) override
  {
    message_memory_strategy_->return_serialized_message(message);
  }

  bool
  use_take_shared_method() const
  {
    return any_callback_.use_take_shared_method();
  }

private:
  RCLCPP_DISABLE_COPY(Subscription)

  AnySubscriptionCallback<MessageT, AllocatorT> any_callback_;

  const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> options_;
  typename message_memory_strategy::MessageMemoryStrategy<ROSMessageType, AllocatorT>::SharedPtr
    message_memory_strategy_;

  SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics_{nullptr};
};

}  // namespace rclcpp

#endif  // RCLCPP__SUBSCRIPTION_HPP_