.. _program_listing_file_include_rclcpp_experimental_subscription_intra_process.hpp: Program Listing for File subscription_intra_process.hpp ======================================================= |exhale_lsh| :ref:`Return to documentation for file ` (``include/rclcpp/experimental/subscription_intra_process.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp // Copyright 2019 Open Source Robotics Foundation, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #ifndef RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_HPP_ #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_HPP_ #include #include #include #include #include #include #include "rcl/types.h" #include "rclcpp/any_subscription_callback.hpp" #include "rclcpp/context.hpp" #include "rclcpp/experimental/buffers/intra_process_buffer.hpp" #include "rclcpp/experimental/subscription_intra_process_buffer.hpp" #include "rclcpp/qos.hpp" #include "rclcpp/type_support_decl.hpp" #include "tracetools/tracetools.h" namespace rclcpp { namespace experimental { template< typename MessageT, typename SubscribedType, typename SubscribedTypeAlloc = std::allocator, typename SubscribedTypeDeleter = std::default_delete, typename ROSMessageType = SubscribedType, typename Alloc = std::allocator > class SubscriptionIntraProcess : public SubscriptionIntraProcessBuffer< SubscribedType, SubscribedTypeAlloc, SubscribedTypeDeleter, ROSMessageType > { using SubscriptionIntraProcessBufferT = SubscriptionIntraProcessBuffer< SubscribedType, SubscribedTypeAlloc, SubscribedTypeDeleter, ROSMessageType >; public: RCLCPP_SMART_PTR_DEFINITIONS(SubscriptionIntraProcess) using MessageAllocTraits = typename SubscriptionIntraProcessBufferT::SubscribedTypeAllocatorTraits; using MessageAlloc = typename SubscriptionIntraProcessBufferT::SubscribedTypeAllocator; using ConstMessageSharedPtr = typename SubscriptionIntraProcessBufferT::ConstDataSharedPtr; using MessageUniquePtr = typename SubscriptionIntraProcessBufferT::SubscribedTypeUniquePtr; using BufferUniquePtr = typename SubscriptionIntraProcessBufferT::BufferUniquePtr; SubscriptionIntraProcess( AnySubscriptionCallback callback, std::shared_ptr allocator, rclcpp::Context::SharedPtr context, const std::string & topic_name, const rclcpp::QoS & qos_profile, rclcpp::IntraProcessBufferType buffer_type) : SubscriptionIntraProcessBuffer( std::make_shared(*allocator), context, topic_name, qos_profile, buffer_type), any_callback_(callback) { TRACETOOLS_TRACEPOINT( rclcpp_subscription_callback_added, static_cast(this), static_cast(&any_callback_)); // The callback object gets copied, so if registration is done too early/before this point // (e.g. in `AnySubscriptionCallback::set()`), its address won't match any address used later // in subsequent tracepoints. #ifndef TRACETOOLS_DISABLED any_callback_.register_callback_for_tracing(); #endif } virtual ~SubscriptionIntraProcess() = default; void add_to_wait_set(rcl_wait_set_t & wait_set) override { // This block is necessary when the guard condition wakes the wait set, but // the intra process waitable was not handled before the wait set is waited // on again. // Basically we're keeping the guard condition triggered so long as there is // data in the buffer. if (this->buffer_->has_data()) { // If there is data still to be processed, indicate to the // executor or waitset by triggering the guard condition. this->trigger_guard_condition(); } // Let the parent classes handle the rest of the work: return SubscriptionIntraProcessBufferT::add_to_wait_set(wait_set); } std::shared_ptr take_data() override { ConstMessageSharedPtr shared_msg; MessageUniquePtr unique_msg; if (any_callback_.use_take_shared_method()) { shared_msg = this->buffer_->consume_shared(); if (!shared_msg) { return nullptr; } } else { unique_msg = this->buffer_->consume_unique(); if (!unique_msg) { return nullptr; } } if (this->buffer_->has_data()) { // If there is data still to be processed, indicate to the // executor or waitset by triggering the guard condition. this->trigger_guard_condition(); } return std::static_pointer_cast( std::make_shared>( std::pair( shared_msg, std::move(unique_msg))) ); } void execute(const std::shared_ptr & data) override { execute_impl(data); } protected: template typename std::enable_if::value, void>::type execute_impl(const std::shared_ptr &) { throw std::runtime_error("Subscription intra-process can't handle serialized messages"); } template typename std::enable_if::value, void>::type execute_impl(const std::shared_ptr & data) { if (nullptr == data) { return; } rmw_message_info_t msg_info; msg_info.publisher_gid = {0, {0}}; msg_info.from_intra_process = true; auto shared_ptr = std::static_pointer_cast>( data); if (any_callback_.use_take_shared_method()) { ConstMessageSharedPtr shared_msg = shared_ptr->first; any_callback_.dispatch_intra_process(shared_msg, msg_info); } else { MessageUniquePtr unique_msg = std::move(shared_ptr->second); any_callback_.dispatch_intra_process(std::move(unique_msg), msg_info); } shared_ptr.reset(); } AnySubscriptionCallback any_callback_; }; } // namespace experimental } // namespace rclcpp #endif // RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_HPP_