.. _program_listing_file_include_rclcpp_experimental_subscription_intra_process_base.hpp: Program Listing for File subscription_intra_process_base.hpp ============================================================ |exhale_lsh| :ref:`Return to documentation for file ` (``include/rclcpp/experimental/subscription_intra_process_base.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp // Copyright 2019 Open Source Robotics Foundation, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #ifndef RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BASE_HPP_ #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BASE_HPP_ #include #include #include #include #include "rcl/wait.h" #include "rmw/impl/cpp/demangle.hpp" #include "rclcpp/guard_condition.hpp" #include "rclcpp/logging.hpp" #include "rclcpp/qos.hpp" #include "rclcpp/waitable.hpp" namespace rclcpp { namespace experimental { class SubscriptionIntraProcessBase : public rclcpp::Waitable { public: RCLCPP_SMART_PTR_ALIASES_ONLY(SubscriptionIntraProcessBase) enum class EntityType : std::size_t { Subscription, }; RCLCPP_PUBLIC SubscriptionIntraProcessBase( rclcpp::Context::SharedPtr context, const std::string & topic_name, const rclcpp::QoS & qos_profile) : gc_(context), topic_name_(topic_name), qos_profile_(qos_profile) {} RCLCPP_PUBLIC virtual ~SubscriptionIntraProcessBase(); RCLCPP_PUBLIC size_t get_number_of_ready_guard_conditions() override {return 1;} RCLCPP_PUBLIC void add_to_wait_set(rcl_wait_set_t * wait_set) override; bool is_ready(rcl_wait_set_t * wait_set) override = 0; std::shared_ptr take_data() override = 0; std::shared_ptr take_data_by_entity_id(size_t id) override { (void)id; return take_data(); } void execute(std::shared_ptr & data) override = 0; virtual bool use_take_shared_method() const = 0; RCLCPP_PUBLIC const char * get_topic_name() const; RCLCPP_PUBLIC QoS get_actual_qos() const; void set_on_ready_callback(std::function callback) override { if (!callback) { throw std::invalid_argument( "The callback passed to set_on_ready_callback " "is not callable."); } // Note: we bind the int identifier argument to this waitable's entity types auto new_callback = [callback, this](size_t number_of_events) { try { callback(number_of_events, static_cast(EntityType::Subscription)); } catch (const std::exception & exception) { RCLCPP_ERROR_STREAM( // TODO(wjwwood): get this class access to the node logger it is associated with rclcpp::get_logger("rclcpp"), "rclcpp::SubscriptionIntraProcessBase@" << this << " caught " << rmw::impl::cpp::demangle(exception) << " exception in user-provided callback for the 'on ready' callback: " << exception.what()); } catch (...) { RCLCPP_ERROR_STREAM( rclcpp::get_logger("rclcpp"), "rclcpp::SubscriptionIntraProcessBase@" << this << " caught unhandled exception in user-provided callback " << "for the 'on ready' callback"); } }; std::lock_guard lock(callback_mutex_); on_new_message_callback_ = new_callback; if (unread_count_ > 0) { if (qos_profile_.history() == HistoryPolicy::KeepAll) { on_new_message_callback_(unread_count_); } else { // Use qos profile depth as upper bound for unread_count_ on_new_message_callback_(std::min(unread_count_, qos_profile_.depth())); } unread_count_ = 0; } } void clear_on_ready_callback() override { std::lock_guard lock(callback_mutex_); on_new_message_callback_ = nullptr; } protected: std::recursive_mutex callback_mutex_; std::function on_new_message_callback_ {nullptr}; size_t unread_count_{0}; rclcpp::GuardCondition gc_; virtual void trigger_guard_condition() = 0; void invoke_on_new_message() { std::lock_guard lock(this->callback_mutex_); if (this->on_new_message_callback_) { this->on_new_message_callback_(1); } else { this->unread_count_++; } } private: std::string topic_name_; QoS qos_profile_; }; } // namespace experimental } // namespace rclcpp #endif // RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BASE_HPP_