.. _program_listing_file_include_rclcpp_subscription.hpp: Program Listing for File subscription.hpp ========================================= |exhale_lsh| :ref:`Return to documentation for file ` (``include/rclcpp/subscription.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp // Copyright 2014 Open Source Robotics Foundation, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #ifndef RCLCPP__SUBSCRIPTION_HPP_ #define RCLCPP__SUBSCRIPTION_HPP_ #include #include #include #include #include #include #include #include #include #include "rcl/error_handling.h" #include "rcl/subscription.h" #include "rclcpp/any_subscription_callback.hpp" #include "rclcpp/detail/resolve_use_intra_process.hpp" #include "rclcpp/detail/resolve_intra_process_buffer_type.hpp" #include "rclcpp/exceptions.hpp" #include "rclcpp/expand_topic_or_service_name.hpp" #include "rclcpp/experimental/intra_process_manager.hpp" #include "rclcpp/experimental/subscription_intra_process.hpp" #include "rclcpp/logging.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/message_info.hpp" #include "rclcpp/message_memory_strategy.hpp" #include "rclcpp/node_interfaces/node_base_interface.hpp" #include "rclcpp/subscription_base.hpp" #include "rclcpp/subscription_options.hpp" #include "rclcpp/subscription_traits.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/visibility_control.hpp" #include "rclcpp/waitable.hpp" #include "rclcpp/topic_statistics/subscription_topic_statistics.hpp" #include "tracetools/tracetools.h" namespace rclcpp { namespace node_interfaces { class NodeTopicsInterface; } // namespace node_interfaces template< typename MessageT, typename AllocatorT = std::allocator, typename SubscribedT = typename rclcpp::TypeAdapter::custom_type, typename ROSMessageT = typename rclcpp::TypeAdapter::ros_message_type, typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy< ROSMessageT, AllocatorT >> class Subscription : public SubscriptionBase { friend class rclcpp::node_interfaces::NodeTopicsInterface; public: // Redeclare these here to use outside of the class. using SubscribedType = SubscribedT; using ROSMessageType = ROSMessageT; using MessageMemoryStrategyType = MessageMemoryStrategyT; using SubscribedTypeAllocatorTraits = allocator::AllocRebind; using SubscribedTypeAllocator = typename SubscribedTypeAllocatorTraits::allocator_type; using SubscribedTypeDeleter = allocator::Deleter; using ROSMessageTypeAllocatorTraits = allocator::AllocRebind; using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type; using ROSMessageTypeDeleter = allocator::Deleter; using MessageAllocatorTraits [[deprecated("use ROSMessageTypeAllocatorTraits")]] = ROSMessageTypeAllocatorTraits; using MessageAllocator [[deprecated("use ROSMessageTypeAllocator")]] = ROSMessageTypeAllocator; using MessageDeleter [[deprecated("use ROSMessageTypeDeleter")]] = ROSMessageTypeDeleter; using ConstMessageSharedPtr [[deprecated]] = std::shared_ptr; using MessageUniquePtr [[deprecated("use std::unique_ptr instead")]] = std::unique_ptr; private: using SubscriptionTopicStatisticsSharedPtr = std::shared_ptr>; public: RCLCPP_SMART_PTR_DEFINITIONS(Subscription) Subscription( rclcpp::node_interfaces::NodeBaseInterface * node_base, const rosidl_message_type_support_t & type_support_handle, const std::string & topic_name, const rclcpp::QoS & qos, AnySubscriptionCallback callback, const rclcpp::SubscriptionOptionsWithAllocator & options, typename MessageMemoryStrategyT::SharedPtr message_memory_strategy, SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics = nullptr) : SubscriptionBase( node_base, type_support_handle, topic_name, options.template to_rcl_subscription_options(qos), callback.is_serialized_message_callback()), any_callback_(callback), options_(options), message_memory_strategy_(message_memory_strategy) { if (options_.event_callbacks.deadline_callback) { this->add_event_handler( options_.event_callbacks.deadline_callback, RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED); } if (options_.event_callbacks.liveliness_callback) { this->add_event_handler( options_.event_callbacks.liveliness_callback, RCL_SUBSCRIPTION_LIVELINESS_CHANGED); } if (options_.event_callbacks.incompatible_qos_callback) { this->add_event_handler( options_.event_callbacks.incompatible_qos_callback, RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS); } else if (options_.use_default_callbacks) { // Register default callback when not specified try { this->add_event_handler( [this](QOSRequestedIncompatibleQoSInfo & info) { this->default_incompatible_qos_callback(info); }, RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS); } catch (UnsupportedEventTypeException & /*exc*/) { // pass } } if (options_.event_callbacks.message_lost_callback) { this->add_event_handler( options_.event_callbacks.message_lost_callback, RCL_SUBSCRIPTION_MESSAGE_LOST); } // Setup intra process publishing if requested. if (rclcpp::detail::resolve_use_intra_process(options_, *node_base)) { using rclcpp::detail::resolve_intra_process_buffer_type; // Check if the QoS is compatible with intra-process. auto qos_profile = get_actual_qos(); if (qos_profile.history() != rclcpp::HistoryPolicy::KeepLast) { throw std::invalid_argument( "intraprocess communication allowed only with keep last history qos policy"); } if (qos_profile.depth() == 0) { throw std::invalid_argument( "intraprocess communication is not allowed with 0 depth qos policy"); } if (qos_profile.durability() != rclcpp::DurabilityPolicy::Volatile) { throw std::invalid_argument( "intraprocess communication allowed only with volatile durability"); } using SubscriptionIntraProcessT = rclcpp::experimental::SubscriptionIntraProcess< MessageT, SubscribedType, SubscribedTypeAllocator, SubscribedTypeDeleter, ROSMessageT, AllocatorT>; // First create a SubscriptionIntraProcess which will be given to the intra-process manager. auto context = node_base->get_context(); subscription_intra_process_ = std::make_shared( callback, options_.get_allocator(), context, this->get_topic_name(), // important to get like this, as it has the fully-qualified name qos_profile, resolve_intra_process_buffer_type(options_.intra_process_buffer_type, callback)); TRACEPOINT( rclcpp_subscription_init, static_cast(get_subscription_handle().get()), static_cast(subscription_intra_process_.get())); // Add it to the intra process manager. using rclcpp::experimental::IntraProcessManager; auto ipm = context->get_sub_context(); uint64_t intra_process_subscription_id = ipm->add_subscription(subscription_intra_process_); this->setup_intra_process(intra_process_subscription_id, ipm); } if (subscription_topic_statistics != nullptr) { this->subscription_topic_statistics_ = std::move(subscription_topic_statistics); } TRACEPOINT( rclcpp_subscription_init, static_cast(get_subscription_handle().get()), static_cast(this)); TRACEPOINT( rclcpp_subscription_callback_added, static_cast(this), static_cast(&any_callback_)); // The callback object gets copied, so if registration is done too early/before this point // (e.g. in `AnySubscriptionCallback::set()`), its address won't match any address used later // in subsequent tracepoints. #ifndef TRACETOOLS_DISABLED any_callback_.register_callback_for_tracing(); #endif } void post_init_setup( rclcpp::node_interfaces::NodeBaseInterface * node_base, const rclcpp::QoS & qos, const rclcpp::SubscriptionOptionsWithAllocator & options) { (void)node_base; (void)qos; (void)options; } bool take(ROSMessageType & message_out, rclcpp::MessageInfo & message_info_out) { return this->take_type_erased(static_cast(&message_out), message_info_out); } template std::enable_if_t< !rosidl_generator_traits::is_message::value && std::is_same_v, bool > take(TakeT & message_out, rclcpp::MessageInfo & message_info_out) { ROSMessageType local_message; bool taken = this->take_type_erased(static_cast(&local_message), message_info_out); if (taken) { rclcpp::TypeAdapter::convert_to_custom(local_message, message_out); } return taken; } std::shared_ptr create_message() override { /* The default message memory strategy provides a dynamically allocated message on each call to * create_message, though alternative memory strategies that re-use a preallocated message may be * used (see rclcpp/strategies/message_pool_memory_strategy.hpp). */ return message_memory_strategy_->borrow_message(); } std::shared_ptr create_serialized_message() override { return message_memory_strategy_->borrow_serialized_message(); } void handle_message( std::shared_ptr & message, const rclcpp::MessageInfo & message_info) override { if (matches_any_intra_process_publishers(&message_info.get_rmw_message_info().publisher_gid)) { // In this case, the message will be delivered via intra process and // we should ignore this copy of the message. return; } auto typed_message = std::static_pointer_cast(message); std::chrono::time_point now; if (subscription_topic_statistics_) { // get current time before executing callback to // exclude callback duration from topic statistics result. now = std::chrono::system_clock::now(); } any_callback_.dispatch(typed_message, message_info); if (subscription_topic_statistics_) { const auto nanos = std::chrono::time_point_cast(now); const auto time = rclcpp::Time(nanos.time_since_epoch().count()); subscription_topic_statistics_->handle_message(*typed_message, time); } } void handle_serialized_message( const std::shared_ptr & serialized_message, const rclcpp::MessageInfo & message_info) override { // TODO(wjwwood): enable topic statistics for serialized messages any_callback_.dispatch(serialized_message, message_info); } void handle_loaned_message( void * loaned_message, const rclcpp::MessageInfo & message_info) override { if (matches_any_intra_process_publishers(&message_info.get_rmw_message_info().publisher_gid)) { // In this case, the message will be delivered via intra process and // we should ignore this copy of the message. return; } auto typed_message = static_cast(loaned_message); // message is loaned, so we have to make sure that the deleter does not deallocate the message auto sptr = std::shared_ptr( typed_message, [](ROSMessageType * msg) {(void) msg;}); std::chrono::time_point now; if (subscription_topic_statistics_) { // get current time before executing callback to // exclude callback duration from topic statistics result. now = std::chrono::system_clock::now(); } any_callback_.dispatch(sptr, message_info); if (subscription_topic_statistics_) { const auto nanos = std::chrono::time_point_cast(now); const auto time = rclcpp::Time(nanos.time_since_epoch().count()); subscription_topic_statistics_->handle_message(*typed_message, time); } } void return_message(std::shared_ptr & message) override { auto typed_message = std::static_pointer_cast(message); message_memory_strategy_->return_message(typed_message); } void return_serialized_message(std::shared_ptr & message) override { message_memory_strategy_->return_serialized_message(message); } bool use_take_shared_method() const { return any_callback_.use_take_shared_method(); } private: RCLCPP_DISABLE_COPY(Subscription) AnySubscriptionCallback any_callback_; const rclcpp::SubscriptionOptionsWithAllocator options_; typename message_memory_strategy::MessageMemoryStrategy::SharedPtr message_memory_strategy_; SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics_{nullptr}; }; } // namespace rclcpp #endif // RCLCPP__SUBSCRIPTION_HPP_