Program Listing for File subscription_base.hpp

Return to documentation for file (include/rclcpp/subscription_base.hpp)

// Copyright 2019 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__SUBSCRIPTION_BASE_HPP_
#define RCLCPP__SUBSCRIPTION_BASE_HPP_

#include <atomic>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <vector>
#include <utility>

#include "rcl/event_callback.h"
#include "rcl/subscription.h"

#include "rmw/impl/cpp/demangle.hpp"
#include "rmw/rmw.h"

#include "rclcpp/any_subscription_callback.hpp"
#include "rclcpp/detail/cpp_callback_trampoline.hpp"
#include "rclcpp/experimental/intra_process_manager.hpp"
#include "rclcpp/experimental/subscription_intra_process_base.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/message_info.hpp"
#include "rclcpp/network_flow_endpoint.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/qos_event.hpp"
#include "rclcpp/serialized_message.hpp"
#include "rclcpp/subscription_content_filter_options.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/visibility_control.hpp"

namespace rclcpp
{

namespace node_interfaces
{
class NodeBaseInterface;
}  // namespace node_interfaces

namespace experimental
{
class IntraProcessManager;
}  // namespace experimental

class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
{
public:
  RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(SubscriptionBase)


  RCLCPP_PUBLIC
  SubscriptionBase(
    rclcpp::node_interfaces::NodeBaseInterface * node_base,
    const rosidl_message_type_support_t & type_support_handle,
    const std::string & topic_name,
    const rcl_subscription_options_t & subscription_options,
    bool is_serialized = false);

  RCLCPP_PUBLIC
  virtual ~SubscriptionBase();

  RCLCPP_PUBLIC
  const char *
  get_topic_name() const;

  RCLCPP_PUBLIC
  std::shared_ptr<rcl_subscription_t>
  get_subscription_handle();

  RCLCPP_PUBLIC
  std::shared_ptr<const rcl_subscription_t>
  get_subscription_handle() const;


  RCLCPP_PUBLIC
  const
  std::unordered_map<rcl_subscription_event_type_t, std::shared_ptr<rclcpp::QOSEventHandlerBase>> &
  get_event_handlers() const;


  RCLCPP_PUBLIC
  rclcpp::QoS
  get_actual_qos() const;


  RCLCPP_PUBLIC
  bool
  take_type_erased(void * message_out, rclcpp::MessageInfo & message_info_out);


  RCLCPP_PUBLIC
  bool
  take_serialized(rclcpp::SerializedMessage & message_out, rclcpp::MessageInfo & message_info_out);


  RCLCPP_PUBLIC
  virtual
  std::shared_ptr<void>
  create_message() = 0;


  RCLCPP_PUBLIC
  virtual
  std::shared_ptr<rclcpp::SerializedMessage>
  create_serialized_message() = 0;


  RCLCPP_PUBLIC
  virtual
  void
  handle_message(std::shared_ptr<void> & message, const rclcpp::MessageInfo & message_info) = 0;

  RCLCPP_PUBLIC
  virtual
  void
  handle_serialized_message(
    const std::shared_ptr<rclcpp::SerializedMessage> & serialized_message,
    const rclcpp::MessageInfo & message_info) = 0;

  RCLCPP_PUBLIC
  virtual
  void
  handle_loaned_message(void * loaned_message, const rclcpp::MessageInfo & message_info) = 0;


  RCLCPP_PUBLIC
  virtual
  void
  return_message(std::shared_ptr<void> & message) = 0;


  RCLCPP_PUBLIC
  virtual
  void
  return_serialized_message(std::shared_ptr<rclcpp::SerializedMessage> & message) = 0;

  RCLCPP_PUBLIC
  const rosidl_message_type_support_t &
  get_message_type_support_handle() const;


  RCLCPP_PUBLIC
  bool
  is_serialized() const;


  RCLCPP_PUBLIC
  size_t
  get_publisher_count() const;


  RCLCPP_PUBLIC
  bool
  can_loan_messages() const;

  using IntraProcessManagerWeakPtr =
    std::weak_ptr<rclcpp::experimental::IntraProcessManager>;

  RCLCPP_PUBLIC
  void
  setup_intra_process(
    uint64_t intra_process_subscription_id,
    IntraProcessManagerWeakPtr weak_ipm);


  RCLCPP_PUBLIC
  rclcpp::Waitable::SharedPtr
  get_intra_process_waitable() const;


  RCLCPP_PUBLIC
  bool
  exchange_in_use_by_wait_set_state(void * pointer_to_subscription_part, bool in_use_state);


  RCLCPP_PUBLIC
  std::vector<rclcpp::NetworkFlowEndpoint>
  get_network_flow_endpoints() const;


  void
  set_on_new_message_callback(std::function<void(size_t)> callback)
  {
    if (!callback) {
      throw std::invalid_argument(
              "The callback passed to set_on_new_message_callback "
              "is not callable.");
    }

    auto new_callback =
      [callback, this](size_t number_of_messages) {
        try {
          callback(number_of_messages);
        } catch (const std::exception & exception) {
          RCLCPP_ERROR_STREAM(
            node_logger_,
            "rclcpp::SubscriptionBase@" << this <<
              " caught " << rmw::impl::cpp::demangle(exception) <<
              " exception in user-provided callback for the 'on new message' callback: " <<
              exception.what());
        } catch (...) {
          RCLCPP_ERROR_STREAM(
            node_logger_,
            "rclcpp::SubscriptionBase@" << this <<
              " caught unhandled exception in user-provided callback " <<
              "for the 'on new message' callback");
        }
      };

    std::lock_guard<std::recursive_mutex> lock(callback_mutex_);

    // Set it temporarily to the new callback, while we replace the old one.
    // This two-step setting, prevents a gap where the old std::function has
    // been replaced but the middleware hasn't been told about the new one yet.
    set_on_new_message_callback(
      rclcpp::detail::cpp_callback_trampoline<const void *, size_t>,
      static_cast<const void *>(&new_callback));

    // Store the std::function to keep it in scope, also overwrites the existing one.
    on_new_message_callback_ = new_callback;

    // Set it again, now using the permanent storage.
    set_on_new_message_callback(
      rclcpp::detail::cpp_callback_trampoline<const void *, size_t>,
      static_cast<const void *>(&on_new_message_callback_));
  }

  void
  clear_on_new_message_callback()
  {
    std::lock_guard<std::recursive_mutex> lock(callback_mutex_);

    if (on_new_message_callback_) {
      set_on_new_message_callback(nullptr, nullptr);
      on_new_message_callback_ = nullptr;
    }
  }


  void
  set_on_new_intra_process_message_callback(std::function<void(size_t)> callback)
  {
    if (!use_intra_process_) {
      RCLCPP_WARN(
        rclcpp::get_logger("rclcpp"),
        "Calling set_on_new_intra_process_message_callback for subscription with IPC disabled");
      return;
    }

    if (!callback) {
      throw std::invalid_argument(
              "The callback passed to set_on_new_intra_process_message_callback "
              "is not callable.");
    }

    // The on_ready_callback signature has an extra `int` argument used to disambiguate between
    // possible different entities within a generic waitable.
    // We hide that detail to users of this method.
    std::function<void(size_t, int)> new_callback = std::bind(callback, std::placeholders::_1);
    subscription_intra_process_->set_on_ready_callback(new_callback);
  }

  void
  clear_on_new_intra_process_message_callback()
  {
    if (!use_intra_process_) {
      RCLCPP_WARN(
        rclcpp::get_logger("rclcpp"),
        "Calling clear_on_new_intra_process_message_callback for subscription with IPC disabled");
      return;
    }

    subscription_intra_process_->clear_on_ready_callback();
  }


  void
  set_on_new_qos_event_callback(
    std::function<void(size_t)> callback,
    rcl_subscription_event_type_t event_type)
  {
    if (event_handlers_.count(event_type) == 0) {
      RCLCPP_WARN(
        rclcpp::get_logger("rclcpp"),
        "Calling set_on_new_qos_event_callback for non registered subscription event_type");
      return;
    }

    if (!callback) {
      throw std::invalid_argument(
              "The callback passed to set_on_new_qos_event_callback "
              "is not callable.");
    }

    // The on_ready_callback signature has an extra `int` argument used to disambiguate between
    // possible different entities within a generic waitable.
    // We hide that detail to users of this method.
    std::function<void(size_t, int)> new_callback = std::bind(callback, std::placeholders::_1);
    event_handlers_[event_type]->set_on_ready_callback(new_callback);
  }

  void
  clear_on_new_qos_event_callback(rcl_subscription_event_type_t event_type)
  {
    if (event_handlers_.count(event_type) == 0) {
      RCLCPP_WARN(
        rclcpp::get_logger("rclcpp"),
        "Calling clear_on_new_qos_event_callback for non registered event_type");
      return;
    }

    event_handlers_[event_type]->clear_on_ready_callback();
  }


  RCLCPP_PUBLIC
  bool
  is_cft_enabled() const;


  RCLCPP_PUBLIC
  void
  set_content_filter(
    const std::string & filter_expression,
    const std::vector<std::string> & expression_parameters = {});


  RCLCPP_PUBLIC
  rclcpp::ContentFilterOptions
  get_content_filter() const;

protected:
  template<typename EventCallbackT>
  void
  add_event_handler(
    const EventCallbackT & callback,
    const rcl_subscription_event_type_t event_type)
  {
    auto handler = std::make_shared<QOSEventHandler<EventCallbackT,
        std::shared_ptr<rcl_subscription_t>>>(
      callback,
      rcl_subscription_event_init,
      get_subscription_handle(),
      event_type);
    qos_events_in_use_by_wait_set_.insert(std::make_pair(handler.get(), false));
    event_handlers_.insert(std::make_pair(event_type, handler));
  }

  RCLCPP_PUBLIC
  void default_incompatible_qos_callback(QOSRequestedIncompatibleQoSInfo & info) const;

  RCLCPP_PUBLIC
  bool
  matches_any_intra_process_publishers(const rmw_gid_t * sender_gid) const;

  RCLCPP_PUBLIC
  void
  set_on_new_message_callback(rcl_event_callback_t callback, const void * user_data);

  rclcpp::node_interfaces::NodeBaseInterface * const node_base_;

  std::shared_ptr<rcl_node_t> node_handle_;
  std::shared_ptr<rcl_subscription_t> subscription_handle_;
  std::shared_ptr<rcl_subscription_t> intra_process_subscription_handle_;
  rclcpp::Logger node_logger_;

  std::unordered_map<rcl_subscription_event_type_t,
    std::shared_ptr<rclcpp::QOSEventHandlerBase>> event_handlers_;

  bool use_intra_process_;
  IntraProcessManagerWeakPtr weak_ipm_;
  uint64_t intra_process_subscription_id_;
  std::shared_ptr<rclcpp::experimental::SubscriptionIntraProcessBase> subscription_intra_process_;

private:
  RCLCPP_DISABLE_COPY(SubscriptionBase)

  rosidl_message_type_support_t type_support_;
  bool is_serialized_;

  std::atomic<bool> subscription_in_use_by_wait_set_{false};
  std::atomic<bool> intra_process_subscription_waitable_in_use_by_wait_set_{false};
  std::unordered_map<rclcpp::QOSEventHandlerBase *,
    std::atomic<bool>> qos_events_in_use_by_wait_set_;

  std::recursive_mutex callback_mutex_;
  std::function<void(size_t)> on_new_message_callback_{nullptr};
};

}  // namespace rclcpp

#endif  // RCLCPP__SUBSCRIPTION_BASE_HPP_