.. _program_listing_file_include_rclcpp_subscription.hpp: Program Listing for File subscription.hpp ========================================= |exhale_lsh| :ref:`Return to documentation for file ` (``include/rclcpp/subscription.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp // Copyright 2014 Open Source Robotics Foundation, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #ifndef RCLCPP__SUBSCRIPTION_HPP_ #define RCLCPP__SUBSCRIPTION_HPP_ #include #include #include #include #include #include #include #include #include #include "rcl/error_handling.h" #include "rcl/subscription.h" #include "rclcpp/any_subscription_callback.hpp" #include "rclcpp/detail/resolve_use_intra_process.hpp" #include "rclcpp/detail/resolve_intra_process_buffer_type.hpp" #include "rclcpp/exceptions.hpp" #include "rclcpp/expand_topic_or_service_name.hpp" #include "rclcpp/experimental/intra_process_manager.hpp" #include "rclcpp/experimental/subscription_intra_process.hpp" #include "rclcpp/logging.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/message_info.hpp" #include "rclcpp/message_memory_strategy.hpp" #include "rclcpp/node_interfaces/node_base_interface.hpp" #include "rclcpp/subscription_base.hpp" #include "rclcpp/subscription_options.hpp" #include "rclcpp/subscription_traits.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/visibility_control.hpp" #include "rclcpp/waitable.hpp" #include "rclcpp/topic_statistics/subscription_topic_statistics.hpp" #include "tracetools/tracetools.h" namespace rclcpp { namespace node_interfaces { class NodeTopicsInterface; } // namespace node_interfaces template< typename MessageT, typename AllocatorT = std::allocator, typename SubscribedT = typename rclcpp::TypeAdapter::custom_type, typename ROSMessageT = typename rclcpp::TypeAdapter::ros_message_type, typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy< ROSMessageT, AllocatorT >> class Subscription : public SubscriptionBase { friend class rclcpp::node_interfaces::NodeTopicsInterface; public: // Redeclare these here to use outside of the class. using SubscribedType = SubscribedT; using ROSMessageType = ROSMessageT; using MessageMemoryStrategyType = MessageMemoryStrategyT; using SubscribedTypeAllocatorTraits = allocator::AllocRebind; using SubscribedTypeAllocator = typename SubscribedTypeAllocatorTraits::allocator_type; using SubscribedTypeDeleter = allocator::Deleter; using ROSMessageTypeAllocatorTraits = allocator::AllocRebind; using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type; using ROSMessageTypeDeleter = allocator::Deleter; using MessageAllocatorTraits [[deprecated("use ROSMessageTypeAllocatorTraits")]] = ROSMessageTypeAllocatorTraits; using MessageAllocator [[deprecated("use ROSMessageTypeAllocator")]] = ROSMessageTypeAllocator; using MessageDeleter [[deprecated("use ROSMessageTypeDeleter")]] = ROSMessageTypeDeleter; using ConstMessageSharedPtr [[deprecated]] = std::shared_ptr; using MessageUniquePtr [[deprecated("use std::unique_ptr instead")]] = std::unique_ptr; private: using SubscriptionTopicStatisticsSharedPtr = std::shared_ptr; public: RCLCPP_SMART_PTR_DEFINITIONS(Subscription) // *INDENT-OFF* Subscription( rclcpp::node_interfaces::NodeBaseInterface * node_base, const rosidl_message_type_support_t & type_support_handle, const std::string & topic_name, const rclcpp::QoS & qos, AnySubscriptionCallback callback, const rclcpp::SubscriptionOptionsWithAllocator & options, typename MessageMemoryStrategyT::SharedPtr message_memory_strategy, SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics = nullptr) : SubscriptionBase( node_base, type_support_handle, topic_name, options.to_rcl_subscription_options(qos), // NOTE(methylDragon): Passing these args separately is necessary for event binding options.event_callbacks, options.use_default_callbacks, callback.is_serialized_message_callback() ? DeliveredMessageKind::SERIALIZED_MESSAGE : DeliveredMessageKind::ROS_MESSAGE), // NOLINT any_callback_(callback), options_(options), message_memory_strategy_(message_memory_strategy) // *INDENT-ON* { // Setup intra process publishing if requested. if (rclcpp::detail::resolve_use_intra_process(options_, *node_base)) { using rclcpp::detail::resolve_intra_process_buffer_type; // Check if the QoS is compatible with intra-process. auto qos_profile = get_actual_qos(); if (qos_profile.history() != rclcpp::HistoryPolicy::KeepLast) { throw std::invalid_argument( "intraprocess communication allowed only with keep last history qos policy"); } if (qos_profile.depth() == 0) { throw std::invalid_argument( "intraprocess communication is not allowed with 0 depth qos policy"); } using SubscriptionIntraProcessT = rclcpp::experimental::SubscriptionIntraProcess< MessageT, SubscribedType, SubscribedTypeAllocator, SubscribedTypeDeleter, ROSMessageT, AllocatorT>; // First create a SubscriptionIntraProcess which will be given to the intra-process manager. auto context = node_base->get_context(); subscription_intra_process_ = std::make_shared( callback, options_.get_allocator(), context, this->get_topic_name(), // important to get like this, as it has the fully-qualified name qos_profile, resolve_intra_process_buffer_type(options_.intra_process_buffer_type, callback)); TRACETOOLS_TRACEPOINT( rclcpp_subscription_init, static_cast(get_subscription_handle().get()), static_cast(subscription_intra_process_.get())); // Add it to the intra process manager. using rclcpp::experimental::IntraProcessManager; auto ipm = context->get_sub_context(); uint64_t intra_process_subscription_id = ipm->template add_subscription< ROSMessageType, ROSMessageTypeAllocator>(subscription_intra_process_); this->setup_intra_process(intra_process_subscription_id, ipm); } if (subscription_topic_statistics != nullptr) { this->subscription_topic_statistics_ = std::move(subscription_topic_statistics); } TRACETOOLS_TRACEPOINT( rclcpp_subscription_init, static_cast(get_subscription_handle().get()), static_cast(this)); TRACETOOLS_TRACEPOINT( rclcpp_subscription_callback_added, static_cast(this), static_cast(&any_callback_)); // The callback object gets copied, so if registration is done too early/before this point // (e.g. in `AnySubscriptionCallback::set()`), its address won't match any address used later // in subsequent tracepoints. #ifndef TRACETOOLS_DISABLED any_callback_.register_callback_for_tracing(); #endif } void post_init_setup( rclcpp::node_interfaces::NodeBaseInterface * node_base, const rclcpp::QoS & qos, const rclcpp::SubscriptionOptionsWithAllocator & options) { (void)node_base; (void)qos; (void)options; } bool take(ROSMessageType & message_out, rclcpp::MessageInfo & message_info_out) { return this->take_type_erased(static_cast(&message_out), message_info_out); } template std::enable_if_t< !rosidl_generator_traits::is_message::value && std::is_same_v, bool > take(TakeT & message_out, rclcpp::MessageInfo & message_info_out) { ROSMessageType local_message; bool taken = this->take_type_erased(static_cast(&local_message), message_info_out); if (taken) { rclcpp::TypeAdapter::convert_to_custom(local_message, message_out); } return taken; } std::shared_ptr create_message() override { /* The default message memory strategy provides a dynamically allocated message on each call to * create_message, though alternative memory strategies that re-use a preallocated message may be * used (see rclcpp/strategies/message_pool_memory_strategy.hpp). */ return message_memory_strategy_->borrow_message(); } std::shared_ptr create_serialized_message() override { return message_memory_strategy_->borrow_serialized_message(); } void handle_message( std::shared_ptr & message, const rclcpp::MessageInfo & message_info) override { if (matches_any_intra_process_publishers(&message_info.get_rmw_message_info().publisher_gid)) { // In this case, the message will be delivered via intra process and // we should ignore this copy of the message. return; } auto typed_message = std::static_pointer_cast(message); std::chrono::time_point now; if (subscription_topic_statistics_) { // get current time before executing callback to // exclude callback duration from topic statistics result. now = std::chrono::system_clock::now(); } any_callback_.dispatch(typed_message, message_info); if (subscription_topic_statistics_) { const auto nanos = std::chrono::time_point_cast(now); const auto time = rclcpp::Time(nanos.time_since_epoch().count()); subscription_topic_statistics_->handle_message(message_info.get_rmw_message_info(), time); } } void handle_serialized_message( const std::shared_ptr & serialized_message, const rclcpp::MessageInfo & message_info) override { std::chrono::time_point now; if (subscription_topic_statistics_) { // get current time before executing callback to // exclude callback duration from topic statistics result. now = std::chrono::system_clock::now(); } any_callback_.dispatch(serialized_message, message_info); if (subscription_topic_statistics_) { const auto nanos = std::chrono::time_point_cast(now); const auto time = rclcpp::Time(nanos.time_since_epoch().count()); subscription_topic_statistics_->handle_message(message_info.get_rmw_message_info(), time); } } void handle_loaned_message( void * loaned_message, const rclcpp::MessageInfo & message_info) override { if (matches_any_intra_process_publishers(&message_info.get_rmw_message_info().publisher_gid)) { // In this case, the message will be delivered via intra process and // we should ignore this copy of the message. return; } auto typed_message = static_cast(loaned_message); // message is loaned, so we have to make sure that the deleter does not deallocate the message auto sptr = std::shared_ptr( typed_message, [](ROSMessageType * msg) {(void) msg;}); std::chrono::time_point now; if (subscription_topic_statistics_) { // get current time before executing callback to // exclude callback duration from topic statistics result. now = std::chrono::system_clock::now(); } any_callback_.dispatch(sptr, message_info); if (subscription_topic_statistics_) { const auto nanos = std::chrono::time_point_cast(now); const auto time = rclcpp::Time(nanos.time_since_epoch().count()); subscription_topic_statistics_->handle_message(message_info.get_rmw_message_info(), time); } } void return_message(std::shared_ptr & message) override { auto typed_message = std::static_pointer_cast(message); message_memory_strategy_->return_message(typed_message); } void return_serialized_message(std::shared_ptr & message) override { message_memory_strategy_->return_serialized_message(message); } bool use_take_shared_method() const { return any_callback_.use_take_shared_method(); } // DYNAMIC TYPE ================================================================================== // TODO(methylDragon): Reorder later // TODO(methylDragon): Implement later... rclcpp::dynamic_typesupport::DynamicMessageType::SharedPtr get_shared_dynamic_message_type() override { throw rclcpp::exceptions::UnimplementedError( "get_shared_dynamic_message_type is not implemented for Subscription"); } rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr get_shared_dynamic_message() override { throw rclcpp::exceptions::UnimplementedError( "get_shared_dynamic_message is not implemented for Subscription"); } rclcpp::dynamic_typesupport::DynamicSerializationSupport::SharedPtr get_shared_dynamic_serialization_support() override { throw rclcpp::exceptions::UnimplementedError( "get_shared_dynamic_serialization_support is not implemented for Subscription"); } rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr create_dynamic_message() override { throw rclcpp::exceptions::UnimplementedError( "create_dynamic_message is not implemented for Subscription"); } void return_dynamic_message( rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr & message) override { (void) message; throw rclcpp::exceptions::UnimplementedError( "return_dynamic_message is not implemented for Subscription"); } void handle_dynamic_message( const rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr & message, const rclcpp::MessageInfo & message_info) override { (void) message; (void) message_info; throw rclcpp::exceptions::UnimplementedError( "handle_dynamic_message is not implemented for Subscription"); } private: RCLCPP_DISABLE_COPY(Subscription) AnySubscriptionCallback any_callback_; const rclcpp::SubscriptionOptionsWithAllocator options_; typename message_memory_strategy::MessageMemoryStrategy::SharedPtr message_memory_strategy_; SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics_{nullptr}; }; } // namespace rclcpp #endif // RCLCPP__SUBSCRIPTION_HPP_