.. _program_listing_file_include_rclcpp_experimental_subscription_intra_process_buffer.hpp: Program Listing for File subscription_intra_process_buffer.hpp ============================================================== |exhale_lsh| :ref:`Return to documentation for file ` (``include/rclcpp/experimental/subscription_intra_process_buffer.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp // Copyright 2021 Open Source Robotics Foundation, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #ifndef RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BUFFER_HPP_ #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BUFFER_HPP_ #include #include #include #include #include "rcl/error_handling.h" #include "rcl/guard_condition.h" #include "rcl/wait.h" #include "rclcpp/experimental/buffers/intra_process_buffer.hpp" #include "rclcpp/experimental/create_intra_process_buffer.hpp" #include "rclcpp/experimental/subscription_intra_process_base.hpp" #include "rclcpp/experimental/ros_message_intra_process_buffer.hpp" #include "rclcpp/qos.hpp" #include "rclcpp/type_support_decl.hpp" namespace rclcpp { namespace experimental { template< typename SubscribedType, typename Alloc = std::allocator, typename Deleter = std::default_delete, typename ROSMessageType = SubscribedType > class SubscriptionIntraProcessBuffer : public SubscriptionROSMsgIntraProcessBuffer::allocator_type, allocator::Deleter::allocator_type, ROSMessageType>> { public: RCLCPP_SMART_PTR_DEFINITIONS(SubscriptionIntraProcessBuffer) using SubscribedTypeAllocatorTraits = allocator::AllocRebind; using SubscribedTypeAllocator = typename SubscribedTypeAllocatorTraits::allocator_type; using SubscribedTypeDeleter = allocator::Deleter; using ROSMessageTypeAllocatorTraits = allocator::AllocRebind; using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type; using ROSMessageTypeDeleter = allocator::Deleter; using ConstMessageSharedPtr = std::shared_ptr; using MessageUniquePtr = std::unique_ptr; using ConstDataSharedPtr = std::shared_ptr; using SubscribedTypeUniquePtr = std::unique_ptr; using BufferUniquePtr = typename rclcpp::experimental::buffers::IntraProcessBuffer< SubscribedType, Alloc, SubscribedTypeDeleter >::UniquePtr; SubscriptionIntraProcessBuffer( std::shared_ptr allocator, rclcpp::Context::SharedPtr context, const std::string & topic_name, const rclcpp::QoS & qos_profile, rclcpp::IntraProcessBufferType buffer_type) : SubscriptionROSMsgIntraProcessBuffer( context, topic_name, qos_profile), subscribed_type_allocator_(*allocator) { allocator::set_allocator_for_deleter(&subscribed_type_deleter_, &subscribed_type_allocator_); // Create the intra-process buffer. buffer_ = rclcpp::experimental::create_intra_process_buffer( buffer_type, qos_profile, std::make_shared(subscribed_type_allocator_)); } bool is_ready(rcl_wait_set_t * wait_set) override { (void) wait_set; return buffer_->has_data(); } SubscribedTypeUniquePtr convert_ros_message_to_subscribed_type_unique_ptr(const ROSMessageType & msg) { if constexpr (!std::is_same::value) { auto ptr = SubscribedTypeAllocatorTraits::allocate(subscribed_type_allocator_, 1); SubscribedTypeAllocatorTraits::construct(subscribed_type_allocator_, ptr); rclcpp::TypeAdapter::convert_to_custom(msg, *ptr); return SubscribedTypeUniquePtr(ptr, subscribed_type_deleter_); } else { throw std::runtime_error( "convert_ros_message_to_subscribed_type_unique_ptr " "unexpectedly called without TypeAdapter"); } } void provide_intra_process_message(ConstMessageSharedPtr message) override { if constexpr (std::is_same::value) { buffer_->add_shared(std::move(message)); trigger_guard_condition(); } else { buffer_->add_shared(convert_ros_message_to_subscribed_type_unique_ptr(*message)); trigger_guard_condition(); } this->invoke_on_new_message(); } void provide_intra_process_message(MessageUniquePtr message) override { if constexpr (std::is_same::value) { buffer_->add_unique(std::move(message)); trigger_guard_condition(); } else { buffer_->add_unique(convert_ros_message_to_subscribed_type_unique_ptr(*message)); trigger_guard_condition(); } this->invoke_on_new_message(); } void provide_intra_process_data(ConstDataSharedPtr message) { buffer_->add_shared(std::move(message)); trigger_guard_condition(); this->invoke_on_new_message(); } void provide_intra_process_data(SubscribedTypeUniquePtr message) { buffer_->add_unique(std::move(message)); trigger_guard_condition(); this->invoke_on_new_message(); } bool use_take_shared_method() const override { return buffer_->use_take_shared_method(); } protected: void trigger_guard_condition() override { this->gc_.trigger(); } BufferUniquePtr buffer_; SubscribedTypeAllocator subscribed_type_allocator_; SubscribedTypeDeleter subscribed_type_deleter_; }; } // namespace experimental } // namespace rclcpp #endif // RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BUFFER_HPP_