.. _program_listing_file_include_rclcpp_experimental_intra_process_manager.hpp: Program Listing for File intra_process_manager.hpp ================================================== |exhale_lsh| :ref:`Return to documentation for file ` (``include/rclcpp/experimental/intra_process_manager.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp // Copyright 2019 Open Source Robotics Foundation, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #ifndef RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_ #define RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_ #include #include #include #include #include #include #include #include #include #include "rclcpp/allocator/allocator_deleter.hpp" #include "rclcpp/experimental/buffers/intra_process_buffer.hpp" #include "rclcpp/experimental/ros_message_intra_process_buffer.hpp" #include "rclcpp/experimental/subscription_intra_process.hpp" #include "rclcpp/experimental/subscription_intra_process_base.hpp" #include "rclcpp/experimental/subscription_intra_process_buffer.hpp" #include "rclcpp/logger.hpp" #include "rclcpp/logging.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/publisher_base.hpp" #include "rclcpp/type_adapter.hpp" #include "rclcpp/visibility_control.hpp" namespace rclcpp { namespace experimental { class IntraProcessManager { private: RCLCPP_DISABLE_COPY(IntraProcessManager) public: RCLCPP_SMART_PTR_DEFINITIONS(IntraProcessManager) RCLCPP_PUBLIC IntraProcessManager(); RCLCPP_PUBLIC virtual ~IntraProcessManager(); template< typename ROSMessageType, typename Alloc = std::allocator > uint64_t add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription) { std::unique_lock lock(mutex_); uint64_t sub_id = IntraProcessManager::get_next_unique_id(); subscriptions_[sub_id] = subscription; // adds the subscription id to all the matchable publishers for (auto & pair : publishers_) { auto publisher = pair.second.lock(); if (!publisher) { continue; } if (can_communicate(publisher, subscription)) { uint64_t pub_id = pair.first; insert_sub_id_for_pub(sub_id, pub_id, subscription->use_take_shared_method()); if (publisher->is_durability_transient_local() && subscription->is_durability_transient_local()) { do_transient_local_publish( pub_id, sub_id, subscription->use_take_shared_method()); } } } return sub_id; } RCLCPP_PUBLIC void remove_subscription(uint64_t intra_process_subscription_id); RCLCPP_PUBLIC uint64_t add_publisher( rclcpp::PublisherBase::SharedPtr publisher, rclcpp::experimental::buffers::IntraProcessBufferBase::SharedPtr buffer = rclcpp::experimental::buffers::IntraProcessBufferBase::SharedPtr()); RCLCPP_PUBLIC void remove_publisher(uint64_t intra_process_publisher_id); template< typename MessageT, typename ROSMessageType, typename Alloc, typename Deleter = std::default_delete > void do_intra_process_publish( uint64_t intra_process_publisher_id, std::unique_ptr message, typename allocator::AllocRebind::allocator_type & allocator) { using MessageAllocTraits = allocator::AllocRebind; using MessageAllocatorT = typename MessageAllocTraits::allocator_type; std::shared_lock lock(mutex_); auto publisher_it = pub_to_subs_.find(intra_process_publisher_id); if (publisher_it == pub_to_subs_.end()) { // Publisher is either invalid or no longer exists. RCLCPP_WARN( rclcpp::get_logger("rclcpp"), "Calling do_intra_process_publish for invalid or no longer existing publisher id"); return; } const auto & sub_ids = publisher_it->second; if (sub_ids.take_ownership_subscriptions.empty()) { // None of the buffers require ownership, so we promote the pointer std::shared_ptr msg = std::move(message); this->template add_shared_msg_to_buffers( msg, sub_ids.take_shared_subscriptions); } else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT sub_ids.take_shared_subscriptions.size() <= 1) { // There is at maximum 1 buffer that does not require ownership. // So this case is equivalent to all the buffers requiring ownership // Merge the two vector of ids into a unique one std::vector concatenated_vector( sub_ids.take_shared_subscriptions.begin(), sub_ids.take_shared_subscriptions.end()); concatenated_vector.insert( concatenated_vector.end(), sub_ids.take_ownership_subscriptions.begin(), sub_ids.take_ownership_subscriptions.end()); this->template add_owned_msg_to_buffers( std::move(message), concatenated_vector, allocator); } else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT sub_ids.take_shared_subscriptions.size() > 1) { // Construct a new shared pointer from the message // for the buffers that do not require ownership auto shared_msg = std::allocate_shared(allocator, *message); this->template add_shared_msg_to_buffers( shared_msg, sub_ids.take_shared_subscriptions); this->template add_owned_msg_to_buffers( std::move(message), sub_ids.take_ownership_subscriptions, allocator); } } template< typename MessageT, typename ROSMessageType, typename Alloc, typename Deleter = std::default_delete > std::shared_ptr do_intra_process_publish_and_return_shared( uint64_t intra_process_publisher_id, std::unique_ptr message, typename allocator::AllocRebind::allocator_type & allocator) { using MessageAllocTraits = allocator::AllocRebind; using MessageAllocatorT = typename MessageAllocTraits::allocator_type; std::shared_lock lock(mutex_); auto publisher_it = pub_to_subs_.find(intra_process_publisher_id); if (publisher_it == pub_to_subs_.end()) { // Publisher is either invalid or no longer exists. RCLCPP_WARN( rclcpp::get_logger("rclcpp"), "Calling do_intra_process_publish for invalid or no longer existing publisher id"); return nullptr; } const auto & sub_ids = publisher_it->second; if (sub_ids.take_ownership_subscriptions.empty()) { // If there are no owning, just convert to shared. std::shared_ptr shared_msg = std::move(message); if (!sub_ids.take_shared_subscriptions.empty()) { this->template add_shared_msg_to_buffers( shared_msg, sub_ids.take_shared_subscriptions); } return shared_msg; } else { // Construct a new shared pointer from the message for the buffers that // do not require ownership and to return. auto shared_msg = std::allocate_shared(allocator, *message); if (!sub_ids.take_shared_subscriptions.empty()) { this->template add_shared_msg_to_buffers( shared_msg, sub_ids.take_shared_subscriptions); } if (!sub_ids.take_ownership_subscriptions.empty()) { this->template add_owned_msg_to_buffers( std::move(message), sub_ids.take_ownership_subscriptions, allocator); } return shared_msg; } } template< typename MessageT, typename Alloc, typename Deleter, typename ROSMessageType> void add_shared_msg_to_buffer( std::shared_ptr message, uint64_t subscription_id) { add_shared_msg_to_buffers(message, {subscription_id}); } template< typename MessageT, typename Alloc, typename Deleter, typename ROSMessageType> void add_owned_msg_to_buffer( std::unique_ptr message, uint64_t subscription_id, typename allocator::AllocRebind::allocator_type & allocator) { add_owned_msg_to_buffers( std::move(message), {subscription_id}, allocator); } RCLCPP_PUBLIC bool matches_any_publishers(const rmw_gid_t * id) const; RCLCPP_PUBLIC size_t get_subscription_count(uint64_t intra_process_publisher_id) const; RCLCPP_PUBLIC rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr get_subscription_intra_process(uint64_t intra_process_subscription_id); RCLCPP_PUBLIC size_t lowest_available_capacity(const uint64_t intra_process_publisher_id) const; private: struct SplittedSubscriptions { std::vector take_shared_subscriptions; std::vector take_ownership_subscriptions; }; using SubscriptionMap = std::unordered_map; using PublisherMap = std::unordered_map; using PublisherBufferMap = std::unordered_map; using PublisherToSubscriptionIdsMap = std::unordered_map; RCLCPP_PUBLIC static uint64_t get_next_unique_id(); RCLCPP_PUBLIC void insert_sub_id_for_pub(uint64_t sub_id, uint64_t pub_id, bool use_take_shared_method); RCLCPP_PUBLIC bool can_communicate( rclcpp::PublisherBase::SharedPtr pub, rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr sub) const; template< typename ROSMessageType, typename Alloc = std::allocator > void do_transient_local_publish( const uint64_t pub_id, const uint64_t sub_id, const bool use_take_shared_method) { using ROSMessageTypeAllocatorTraits = allocator::AllocRebind; using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type; using ROSMessageTypeDeleter = allocator::Deleter; auto publisher_buffer = publisher_buffers_[pub_id].lock(); if (!publisher_buffer) { throw std::runtime_error("publisher buffer has unexpectedly gone out of scope"); } auto buffer = std::dynamic_pointer_cast< rclcpp::experimental::buffers::IntraProcessBuffer< ROSMessageType, ROSMessageTypeAllocator, ROSMessageTypeDeleter > >(publisher_buffer); if (!buffer) { throw std::runtime_error( "failed to dynamic cast publisher's IntraProcessBufferBase to " "IntraProcessBuffer which can happen when the publisher and " "subscription use different allocator types, which is not supported"); } if (use_take_shared_method) { auto data_vec = buffer->get_all_data_shared(); for (auto shared_data : data_vec) { this->template add_shared_msg_to_buffer< ROSMessageType, ROSMessageTypeAllocator, ROSMessageTypeDeleter, ROSMessageType>( shared_data, sub_id); } } else { auto data_vec = buffer->get_all_data_unique(); for (auto & owned_data : data_vec) { auto allocator = ROSMessageTypeAllocator(); this->template add_owned_msg_to_buffer< ROSMessageType, ROSMessageTypeAllocator, ROSMessageTypeDeleter, ROSMessageType>( std::move(owned_data), sub_id, allocator); } } } template< typename MessageT, typename Alloc, typename Deleter, typename ROSMessageType> void add_shared_msg_to_buffers( std::shared_ptr message, std::vector subscription_ids) { using ROSMessageTypeAllocatorTraits = allocator::AllocRebind; using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type; using ROSMessageTypeDeleter = allocator::Deleter; using PublishedType = typename rclcpp::TypeAdapter::custom_type; using PublishedTypeAllocatorTraits = allocator::AllocRebind; using PublishedTypeAllocator = typename PublishedTypeAllocatorTraits::allocator_type; using PublishedTypeDeleter = allocator::Deleter; for (auto id : subscription_ids) { auto subscription_it = subscriptions_.find(id); if (subscription_it == subscriptions_.end()) { throw std::runtime_error("subscription has unexpectedly gone out of scope"); } auto subscription_base = subscription_it->second.lock(); if (subscription_base == nullptr) { subscriptions_.erase(id); continue; } auto subscription = std::dynamic_pointer_cast< rclcpp::experimental::SubscriptionIntraProcessBuffer >(subscription_base); if (subscription != nullptr) { subscription->provide_intra_process_data(message); continue; } auto ros_message_subscription = std::dynamic_pointer_cast< rclcpp::experimental::SubscriptionROSMsgIntraProcessBuffer >(subscription_base); if (nullptr == ros_message_subscription) { throw std::runtime_error( "failed to dynamic cast SubscriptionIntraProcessBase to " "SubscriptionIntraProcessBuffer, or to " "SubscriptionROSMsgIntraProcessBuffer which can happen when the publisher and " "subscription use different allocator types, which is not supported"); } if constexpr (rclcpp::TypeAdapter::is_specialized::value) { ROSMessageType ros_msg; rclcpp::TypeAdapter::convert_to_ros_message(*message, ros_msg); ros_message_subscription->provide_intra_process_message( std::make_shared(ros_msg)); } else { if constexpr (std::is_same::value) { ros_message_subscription->provide_intra_process_message(message); } else { if constexpr (std::is_same::ros_message_type, ROSMessageType>::value) { ROSMessageType ros_msg; rclcpp::TypeAdapter::convert_to_ros_message( *message, ros_msg); ros_message_subscription->provide_intra_process_message( std::make_shared(ros_msg)); } } } } } template< typename MessageT, typename Alloc, typename Deleter, typename ROSMessageType> void add_owned_msg_to_buffers( std::unique_ptr message, std::vector subscription_ids, typename allocator::AllocRebind::allocator_type & allocator) { using MessageAllocTraits = allocator::AllocRebind; using MessageUniquePtr = std::unique_ptr; using ROSMessageTypeAllocatorTraits = allocator::AllocRebind; using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type; using ROSMessageTypeDeleter = allocator::Deleter; using PublishedType = typename rclcpp::TypeAdapter::custom_type; using PublishedTypeAllocatorTraits = allocator::AllocRebind; using PublishedTypeAllocator = typename PublishedTypeAllocatorTraits::allocator_type; using PublishedTypeDeleter = allocator::Deleter; for (auto it = subscription_ids.begin(); it != subscription_ids.end(); it++) { auto subscription_it = subscriptions_.find(*it); if (subscription_it == subscriptions_.end()) { throw std::runtime_error("subscription has unexpectedly gone out of scope"); } auto subscription_base = subscription_it->second.lock(); if (subscription_base == nullptr) { subscriptions_.erase(subscription_it); continue; } auto subscription = std::dynamic_pointer_cast< rclcpp::experimental::SubscriptionIntraProcessBuffer >(subscription_base); if (subscription != nullptr) { if (std::next(it) == subscription_ids.end()) { // If this is the last subscription, give up ownership subscription->provide_intra_process_data(std::move(message)); // Last message delivered, break from for loop break; } else { // Copy the message since we have additional subscriptions to serve Deleter deleter = message.get_deleter(); auto ptr = MessageAllocTraits::allocate(allocator, 1); MessageAllocTraits::construct(allocator, ptr, *message); subscription->provide_intra_process_data(MessageUniquePtr(ptr, deleter)); } continue; } auto ros_message_subscription = std::dynamic_pointer_cast< rclcpp::experimental::SubscriptionROSMsgIntraProcessBuffer >(subscription_base); if (nullptr == ros_message_subscription) { throw std::runtime_error( "failed to dynamic cast SubscriptionIntraProcessBase to " "SubscriptionIntraProcessBuffer, or to " "SubscriptionROSMsgIntraProcessBuffer which can happen when the publisher and " "subscription use different allocator types, which is not supported"); } if constexpr (rclcpp::TypeAdapter::is_specialized::value) { ROSMessageTypeAllocator ros_message_alloc(allocator); auto ptr = ROSMessageTypeAllocatorTraits::allocate(ros_message_alloc, 1); ROSMessageTypeAllocatorTraits::construct(ros_message_alloc, ptr); ROSMessageTypeDeleter deleter; allocator::set_allocator_for_deleter(&deleter, &allocator); rclcpp::TypeAdapter::convert_to_ros_message(*message, *ptr); auto ros_msg = std::unique_ptr(ptr, deleter); ros_message_subscription->provide_intra_process_message(std::move(ros_msg)); } else { if constexpr (std::is_same::value) { if (std::next(it) == subscription_ids.end()) { // If this is the last subscription, give up ownership ros_message_subscription->provide_intra_process_message(std::move(message)); // Last message delivered, break from for loop break; } else { // Copy the message since we have additional subscriptions to serve Deleter deleter = message.get_deleter(); allocator::set_allocator_for_deleter(&deleter, &allocator); auto ptr = MessageAllocTraits::allocate(allocator, 1); MessageAllocTraits::construct(allocator, ptr, *message); ros_message_subscription->provide_intra_process_message( MessageUniquePtr(ptr, deleter)); } } } } } PublisherToSubscriptionIdsMap pub_to_subs_; SubscriptionMap subscriptions_; PublisherMap publishers_; PublisherBufferMap publisher_buffers_; mutable std::shared_timed_mutex mutex_; }; } // namespace experimental } // namespace rclcpp #endif // RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_