.. _program_listing_file_include_rclcpp_subscription_base.hpp: Program Listing for File subscription_base.hpp ============================================== |exhale_lsh| :ref:`Return to documentation for file ` (``include/rclcpp/subscription_base.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp // Copyright 2019 Open Source Robotics Foundation, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #ifndef RCLCPP__SUBSCRIPTION_BASE_HPP_ #define RCLCPP__SUBSCRIPTION_BASE_HPP_ #include #include #include #include #include #include #include #include "rcl/event_callback.h" #include "rcl/subscription.h" #include "rmw/impl/cpp/demangle.hpp" #include "rmw/rmw.h" #include "rclcpp/any_subscription_callback.hpp" #include "rclcpp/detail/cpp_callback_trampoline.hpp" #include "rclcpp/experimental/intra_process_manager.hpp" #include "rclcpp/experimental/subscription_intra_process_base.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/message_info.hpp" #include "rclcpp/network_flow_endpoint.hpp" #include "rclcpp/qos.hpp" #include "rclcpp/qos_event.hpp" #include "rclcpp/serialized_message.hpp" #include "rclcpp/subscription_content_filter_options.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/visibility_control.hpp" namespace rclcpp { namespace node_interfaces { class NodeBaseInterface; } // namespace node_interfaces namespace experimental { class IntraProcessManager; } // namespace experimental class SubscriptionBase : public std::enable_shared_from_this { public: RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(SubscriptionBase) RCLCPP_PUBLIC SubscriptionBase( rclcpp::node_interfaces::NodeBaseInterface * node_base, const rosidl_message_type_support_t & type_support_handle, const std::string & topic_name, const rcl_subscription_options_t & subscription_options, bool is_serialized = false); RCLCPP_PUBLIC virtual ~SubscriptionBase(); RCLCPP_PUBLIC const char * get_topic_name() const; RCLCPP_PUBLIC std::shared_ptr get_subscription_handle(); RCLCPP_PUBLIC std::shared_ptr get_subscription_handle() const; RCLCPP_PUBLIC const std::unordered_map> & get_event_handlers() const; RCLCPP_PUBLIC rclcpp::QoS get_actual_qos() const; RCLCPP_PUBLIC bool take_type_erased(void * message_out, rclcpp::MessageInfo & message_info_out); RCLCPP_PUBLIC bool take_serialized(rclcpp::SerializedMessage & message_out, rclcpp::MessageInfo & message_info_out); RCLCPP_PUBLIC virtual std::shared_ptr create_message() = 0; RCLCPP_PUBLIC virtual std::shared_ptr create_serialized_message() = 0; RCLCPP_PUBLIC virtual void handle_message(std::shared_ptr & message, const rclcpp::MessageInfo & message_info) = 0; RCLCPP_PUBLIC virtual void handle_serialized_message( const std::shared_ptr & serialized_message, const rclcpp::MessageInfo & message_info) = 0; RCLCPP_PUBLIC virtual void handle_loaned_message(void * loaned_message, const rclcpp::MessageInfo & message_info) = 0; RCLCPP_PUBLIC virtual void return_message(std::shared_ptr & message) = 0; RCLCPP_PUBLIC virtual void return_serialized_message(std::shared_ptr & message) = 0; RCLCPP_PUBLIC const rosidl_message_type_support_t & get_message_type_support_handle() const; RCLCPP_PUBLIC bool is_serialized() const; RCLCPP_PUBLIC size_t get_publisher_count() const; RCLCPP_PUBLIC bool can_loan_messages() const; using IntraProcessManagerWeakPtr = std::weak_ptr; RCLCPP_PUBLIC void setup_intra_process( uint64_t intra_process_subscription_id, IntraProcessManagerWeakPtr weak_ipm); RCLCPP_PUBLIC rclcpp::Waitable::SharedPtr get_intra_process_waitable() const; RCLCPP_PUBLIC bool exchange_in_use_by_wait_set_state(void * pointer_to_subscription_part, bool in_use_state); RCLCPP_PUBLIC std::vector get_network_flow_endpoints() const; void set_on_new_message_callback(std::function callback) { if (!callback) { throw std::invalid_argument( "The callback passed to set_on_new_message_callback " "is not callable."); } auto new_callback = [callback, this](size_t number_of_messages) { try { callback(number_of_messages); } catch (const std::exception & exception) { RCLCPP_ERROR_STREAM( node_logger_, "rclcpp::SubscriptionBase@" << this << " caught " << rmw::impl::cpp::demangle(exception) << " exception in user-provided callback for the 'on new message' callback: " << exception.what()); } catch (...) { RCLCPP_ERROR_STREAM( node_logger_, "rclcpp::SubscriptionBase@" << this << " caught unhandled exception in user-provided callback " << "for the 'on new message' callback"); } }; std::lock_guard lock(callback_mutex_); // Set it temporarily to the new callback, while we replace the old one. // This two-step setting, prevents a gap where the old std::function has // been replaced but the middleware hasn't been told about the new one yet. set_on_new_message_callback( rclcpp::detail::cpp_callback_trampoline, static_cast(&new_callback)); // Store the std::function to keep it in scope, also overwrites the existing one. on_new_message_callback_ = new_callback; // Set it again, now using the permanent storage. set_on_new_message_callback( rclcpp::detail::cpp_callback_trampoline, static_cast(&on_new_message_callback_)); } void clear_on_new_message_callback() { std::lock_guard lock(callback_mutex_); if (on_new_message_callback_) { set_on_new_message_callback(nullptr, nullptr); on_new_message_callback_ = nullptr; } } void set_on_new_intra_process_message_callback(std::function callback) { if (!use_intra_process_) { RCLCPP_WARN( rclcpp::get_logger("rclcpp"), "Calling set_on_new_intra_process_message_callback for subscription with IPC disabled"); return; } if (!callback) { throw std::invalid_argument( "The callback passed to set_on_new_intra_process_message_callback " "is not callable."); } // The on_ready_callback signature has an extra `int` argument used to disambiguate between // possible different entities within a generic waitable. // We hide that detail to users of this method. std::function new_callback = std::bind(callback, std::placeholders::_1); subscription_intra_process_->set_on_ready_callback(new_callback); } void clear_on_new_intra_process_message_callback() { if (!use_intra_process_) { RCLCPP_WARN( rclcpp::get_logger("rclcpp"), "Calling clear_on_new_intra_process_message_callback for subscription with IPC disabled"); return; } subscription_intra_process_->clear_on_ready_callback(); } void set_on_new_qos_event_callback( std::function callback, rcl_subscription_event_type_t event_type) { if (event_handlers_.count(event_type) == 0) { RCLCPP_WARN( rclcpp::get_logger("rclcpp"), "Calling set_on_new_qos_event_callback for non registered subscription event_type"); return; } if (!callback) { throw std::invalid_argument( "The callback passed to set_on_new_qos_event_callback " "is not callable."); } // The on_ready_callback signature has an extra `int` argument used to disambiguate between // possible different entities within a generic waitable. // We hide that detail to users of this method. std::function new_callback = std::bind(callback, std::placeholders::_1); event_handlers_[event_type]->set_on_ready_callback(new_callback); } void clear_on_new_qos_event_callback(rcl_subscription_event_type_t event_type) { if (event_handlers_.count(event_type) == 0) { RCLCPP_WARN( rclcpp::get_logger("rclcpp"), "Calling clear_on_new_qos_event_callback for non registered event_type"); return; } event_handlers_[event_type]->clear_on_ready_callback(); } RCLCPP_PUBLIC bool is_cft_enabled() const; RCLCPP_PUBLIC void set_content_filter( const std::string & filter_expression, const std::vector & expression_parameters = {}); RCLCPP_PUBLIC rclcpp::ContentFilterOptions get_content_filter() const; protected: template void add_event_handler( const EventCallbackT & callback, const rcl_subscription_event_type_t event_type) { auto handler = std::make_shared>>( callback, rcl_subscription_event_init, get_subscription_handle(), event_type); qos_events_in_use_by_wait_set_.insert(std::make_pair(handler.get(), false)); event_handlers_.insert(std::make_pair(event_type, handler)); } RCLCPP_PUBLIC void default_incompatible_qos_callback(QOSRequestedIncompatibleQoSInfo & info) const; RCLCPP_PUBLIC bool matches_any_intra_process_publishers(const rmw_gid_t * sender_gid) const; RCLCPP_PUBLIC void set_on_new_message_callback(rcl_event_callback_t callback, const void * user_data); rclcpp::node_interfaces::NodeBaseInterface * const node_base_; std::shared_ptr node_handle_; std::shared_ptr subscription_handle_; std::shared_ptr intra_process_subscription_handle_; rclcpp::Logger node_logger_; std::unordered_map> event_handlers_; bool use_intra_process_; IntraProcessManagerWeakPtr weak_ipm_; uint64_t intra_process_subscription_id_; std::shared_ptr subscription_intra_process_; private: RCLCPP_DISABLE_COPY(SubscriptionBase) rosidl_message_type_support_t type_support_; bool is_serialized_; std::atomic subscription_in_use_by_wait_set_{false}; std::atomic intra_process_subscription_waitable_in_use_by_wait_set_{false}; std::unordered_map> qos_events_in_use_by_wait_set_; std::recursive_mutex callback_mutex_; std::function on_new_message_callback_{nullptr}; }; } // namespace rclcpp #endif // RCLCPP__SUBSCRIPTION_BASE_HPP_