Program Listing for File publisher_base.hpp

Return to documentation for file (include/rclcpp/publisher_base.hpp)

// Copyright 2014 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__PUBLISHER_BASE_HPP_
#define RCLCPP__PUBLISHER_BASE_HPP_

#include <rmw/error_handling.h>
#include <rmw/rmw.h>

#include <chrono>
#include <functional>
#include <iostream>
#include <memory>
#include <sstream>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>

#include "rcl/publisher.h"

#include "rclcpp/macros.hpp"
#include "rclcpp/network_flow_endpoint.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/event_handler.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/visibility_control.hpp"
#include "rcpputils/time.hpp"

namespace rclcpp
{

// Forward declaration is used for friend statement.
namespace node_interfaces
{
class NodeBaseInterface;
class NodeTopicsInterface;
}  // namespace node_interfaces

namespace experimental
{
class IntraProcessManager;
}  // namespace experimental

class PublisherBase : public std::enable_shared_from_this<PublisherBase>
{
  friend ::rclcpp::node_interfaces::NodeTopicsInterface;

public:
  RCLCPP_SMART_PTR_DEFINITIONS(PublisherBase)


  RCLCPP_PUBLIC
  PublisherBase(
    rclcpp::node_interfaces::NodeBaseInterface * node_base,
    const std::string & topic,
    const rosidl_message_type_support_t & type_support,
    const rcl_publisher_options_t & publisher_options,
    const PublisherEventCallbacks & event_callbacks,
    bool use_default_callbacks);

  RCLCPP_PUBLIC
  virtual ~PublisherBase();

  RCLCPP_PUBLIC
  void
  bind_event_callbacks(const PublisherEventCallbacks & event_callbacks, bool use_default_callbacks);


  RCLCPP_PUBLIC
  const char *
  get_topic_name() const;


  RCLCPP_PUBLIC
  size_t
  get_queue_size() const;


  RCLCPP_PUBLIC
  const rmw_gid_t &
  get_gid() const;


  RCLCPP_PUBLIC
  std::shared_ptr<rcl_publisher_t>
  get_publisher_handle();


  RCLCPP_PUBLIC
  std::shared_ptr<const rcl_publisher_t>
  get_publisher_handle() const;


  RCLCPP_PUBLIC
  const
  std::unordered_map<rcl_publisher_event_type_t, std::shared_ptr<rclcpp::EventHandlerBase>> &
  get_event_handlers() const;


  RCLCPP_PUBLIC
  size_t
  get_subscription_count() const;


  RCLCPP_PUBLIC
  size_t
  get_intra_process_subscription_count() const;


  RCLCPP_PUBLIC
  bool
  is_durability_transient_local() const;


  RCLCPP_PUBLIC
  RCUTILS_WARN_UNUSED
  bool
  assert_liveliness() const;


  RCLCPP_PUBLIC
  rclcpp::QoS
  get_actual_qos() const;


  RCLCPP_PUBLIC
  bool
  can_loan_messages() const;


  RCLCPP_PUBLIC
  bool
  operator==(const rmw_gid_t & gid) const;


  RCLCPP_PUBLIC
  bool
  operator==(const rmw_gid_t * gid) const;

  using IntraProcessManagerSharedPtr =
    std::shared_ptr<rclcpp::experimental::IntraProcessManager>;

  RCLCPP_PUBLIC
  void
  setup_intra_process(
    uint64_t intra_process_publisher_id,
    IntraProcessManagerSharedPtr ipm);


  RCLCPP_PUBLIC
  std::vector<rclcpp::NetworkFlowEndpoint>
  get_network_flow_endpoints() const;


  RCLCPP_PUBLIC
  size_t
  lowest_available_ipm_capacity() const;


  template<typename DurationRepT = int64_t, typename DurationT = std::milli>
  bool
  wait_for_all_acked(
    std::chrono::duration<DurationRepT, DurationT> timeout =
    std::chrono::duration<DurationRepT, DurationT>(-1)) const
  {
    rcl_duration_value_t rcl_timeout = rcpputils::convert_to_nanoseconds(timeout).count();

    rcl_ret_t ret = rcl_publisher_wait_for_all_acked(publisher_handle_.get(), rcl_timeout);
    if (ret == RCL_RET_OK) {
      return true;
    } else if (ret == RCL_RET_TIMEOUT) {
      return false;
    } else {
      rclcpp::exceptions::throw_from_rcl_error(ret);
    }
  }


  void
  set_on_new_qos_event_callback(
    std::function<void(size_t)> callback,
    rcl_publisher_event_type_t event_type)
  {
    if (event_handlers_.count(event_type) == 0) {
      RCLCPP_WARN(
        rclcpp::get_logger("rclcpp"),
        "Calling set_on_new_qos_event_callback for non registered publisher event_type");
      return;
    }

    if (!callback) {
      throw std::invalid_argument(
              "The callback passed to set_on_new_qos_event_callback "
              "is not callable.");
    }

    // The on_ready_callback signature has an extra `int` argument used to disambiguate between
    // possible different entities within a generic waitable.
    // We hide that detail to users of this method.
    std::function<void(size_t, int)> new_callback = std::bind(callback, std::placeholders::_1);
    event_handlers_[event_type]->set_on_ready_callback(new_callback);
  }

  void
  clear_on_new_qos_event_callback(rcl_publisher_event_type_t event_type)
  {
    if (event_handlers_.count(event_type) == 0) {
      RCLCPP_WARN(
        rclcpp::get_logger("rclcpp"),
        "Calling clear_on_new_qos_event_callback for non registered event_type");
      return;
    }

    event_handlers_[event_type]->clear_on_ready_callback();
  }

protected:
  template<typename EventCallbackT>
  void
  add_event_handler(
    const EventCallbackT & callback,
    const rcl_publisher_event_type_t event_type)
  {
    auto handler = std::make_shared<EventHandler<EventCallbackT,
        std::shared_ptr<rcl_publisher_t>>>(
      callback,
      rcl_publisher_event_init,
      publisher_handle_,
      event_type);
    event_handlers_.insert(std::make_pair(event_type, handler));
  }

  RCLCPP_PUBLIC
  void default_incompatible_qos_callback(QOSOfferedIncompatibleQoSInfo & info) const;

  RCLCPP_PUBLIC
  void default_incompatible_type_callback(IncompatibleTypeInfo & info) const;

  std::shared_ptr<rcl_node_t> rcl_node_handle_;

  std::shared_ptr<rcl_publisher_t> publisher_handle_;

  std::unordered_map<rcl_publisher_event_type_t,
    std::shared_ptr<rclcpp::EventHandlerBase>> event_handlers_;

  using IntraProcessManagerWeakPtr =
    std::weak_ptr<rclcpp::experimental::IntraProcessManager>;
  bool intra_process_is_enabled_;
  IntraProcessManagerWeakPtr weak_ipm_;
  uint64_t intra_process_publisher_id_;

  rmw_gid_t rmw_gid_;

  const rosidl_message_type_support_t type_support_;

  const PublisherEventCallbacks event_callbacks_;
};

}  // namespace rclcpp

#endif  // RCLCPP__PUBLISHER_BASE_HPP_