Program Listing for File intra_process_manager.hpp

Return to documentation for file (include/rclcpp/experimental/intra_process_manager.hpp)

// Copyright 2019 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_
#define RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_

#include <rmw/types.h>

#include <shared_mutex>

#include <iterator>
#include <memory>
#include <stdexcept>
#include <unordered_map>
#include <utility>
#include <vector>
#include <typeinfo>

#include "rclcpp/allocator/allocator_deleter.hpp"
#include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
#include "rclcpp/experimental/ros_message_intra_process_buffer.hpp"
#include "rclcpp/experimental/subscription_intra_process.hpp"
#include "rclcpp/experimental/subscription_intra_process_base.hpp"
#include "rclcpp/experimental/subscription_intra_process_buffer.hpp"
#include "rclcpp/logger.hpp"
#include "rclcpp/logging.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/publisher_base.hpp"
#include "rclcpp/type_adapter.hpp"
#include "rclcpp/visibility_control.hpp"

namespace rclcpp
{

namespace experimental
{


class IntraProcessManager
{
private:
  RCLCPP_DISABLE_COPY(IntraProcessManager)

public:
  RCLCPP_SMART_PTR_DEFINITIONS(IntraProcessManager)

  RCLCPP_PUBLIC
  IntraProcessManager();

  RCLCPP_PUBLIC
  virtual ~IntraProcessManager();


  template<
    typename ROSMessageType,
    typename Alloc = std::allocator<ROSMessageType>
  >
  uint64_t
  add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription)
  {
    std::unique_lock<std::shared_timed_mutex> lock(mutex_);

    uint64_t sub_id = IntraProcessManager::get_next_unique_id();

    subscriptions_[sub_id] = subscription;

    // adds the subscription id to all the matchable publishers
    for (auto & pair : publishers_) {
      auto publisher = pair.second.lock();
      if (!publisher) {
        continue;
      }
      if (can_communicate(publisher, subscription)) {
        uint64_t pub_id = pair.first;
        insert_sub_id_for_pub(sub_id, pub_id, subscription->use_take_shared_method());
        if (publisher->is_durability_transient_local() &&
          subscription->is_durability_transient_local())
        {
          do_transient_local_publish<ROSMessageType, Alloc>(
            pub_id, sub_id,
            subscription->use_take_shared_method());
        }
      }
    }

    return sub_id;
  }


  RCLCPP_PUBLIC
  void
  remove_subscription(uint64_t intra_process_subscription_id);


  RCLCPP_PUBLIC
  uint64_t
  add_publisher(
    rclcpp::PublisherBase::SharedPtr publisher,
    rclcpp::experimental::buffers::IntraProcessBufferBase::SharedPtr buffer =
    rclcpp::experimental::buffers::IntraProcessBufferBase::SharedPtr());


  RCLCPP_PUBLIC
  void
  remove_publisher(uint64_t intra_process_publisher_id);


  template<
    typename MessageT,
    typename ROSMessageType,
    typename Alloc,
    typename Deleter = std::default_delete<MessageT>
  >
  void
  do_intra_process_publish(
    uint64_t intra_process_publisher_id,
    std::unique_ptr<MessageT, Deleter> message,
    typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
  {
    using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
    using MessageAllocatorT = typename MessageAllocTraits::allocator_type;

    std::shared_lock<std::shared_timed_mutex> lock(mutex_);

    auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
    if (publisher_it == pub_to_subs_.end()) {
      // Publisher is either invalid or no longer exists.
      RCLCPP_WARN(
        rclcpp::get_logger("rclcpp"),
        "Calling do_intra_process_publish for invalid or no longer existing publisher id");
      return;
    }
    const auto & sub_ids = publisher_it->second;

    if (sub_ids.take_ownership_subscriptions.empty()) {
      // None of the buffers require ownership, so we promote the pointer
      std::shared_ptr<MessageT> msg = std::move(message);

      this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
        msg, sub_ids.take_shared_subscriptions);
    } else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT
      sub_ids.take_shared_subscriptions.size() <= 1)
    {
      // There is at maximum 1 buffer that does not require ownership.
      // So this case is equivalent to all the buffers requiring ownership

      // Merge the two vector of ids into a unique one
      std::vector<uint64_t> concatenated_vector(
        sub_ids.take_shared_subscriptions.begin(), sub_ids.take_shared_subscriptions.end());
      concatenated_vector.insert(
        concatenated_vector.end(),
        sub_ids.take_ownership_subscriptions.begin(),
        sub_ids.take_ownership_subscriptions.end());
      this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
        std::move(message),
        concatenated_vector,
        allocator);
    } else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT
      sub_ids.take_shared_subscriptions.size() > 1)
    {
      // Construct a new shared pointer from the message
      // for the buffers that do not require ownership
      auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(allocator, *message);

      this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
        shared_msg, sub_ids.take_shared_subscriptions);
      this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
        std::move(message), sub_ids.take_ownership_subscriptions, allocator);
    }
  }

  template<
    typename MessageT,
    typename ROSMessageType,
    typename Alloc,
    typename Deleter = std::default_delete<MessageT>
  >
  std::shared_ptr<const MessageT>
  do_intra_process_publish_and_return_shared(
    uint64_t intra_process_publisher_id,
    std::unique_ptr<MessageT, Deleter> message,
    typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
  {
    using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
    using MessageAllocatorT = typename MessageAllocTraits::allocator_type;

    std::shared_lock<std::shared_timed_mutex> lock(mutex_);

    auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
    if (publisher_it == pub_to_subs_.end()) {
      // Publisher is either invalid or no longer exists.
      RCLCPP_WARN(
        rclcpp::get_logger("rclcpp"),
        "Calling do_intra_process_publish for invalid or no longer existing publisher id");
      return nullptr;
    }
    const auto & sub_ids = publisher_it->second;

    if (sub_ids.take_ownership_subscriptions.empty()) {
      // If there are no owning, just convert to shared.
      std::shared_ptr<MessageT> shared_msg = std::move(message);
      if (!sub_ids.take_shared_subscriptions.empty()) {
        this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
          shared_msg, sub_ids.take_shared_subscriptions);
      }
      return shared_msg;
    } else {
      // Construct a new shared pointer from the message for the buffers that
      // do not require ownership and to return.
      auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(allocator, *message);

      if (!sub_ids.take_shared_subscriptions.empty()) {
        this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
          shared_msg,
          sub_ids.take_shared_subscriptions);
      }
      if (!sub_ids.take_ownership_subscriptions.empty()) {
        this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
          std::move(message),
          sub_ids.take_ownership_subscriptions,
          allocator);
      }
      return shared_msg;
    }
  }

  template<
    typename MessageT,
    typename Alloc,
    typename Deleter,
    typename ROSMessageType>
  void
  add_shared_msg_to_buffer(
    std::shared_ptr<const MessageT> message,
    uint64_t subscription_id)
  {
    add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(message, {subscription_id});
  }

  template<
    typename MessageT,
    typename Alloc,
    typename Deleter,
    typename ROSMessageType>
  void
  add_owned_msg_to_buffer(
    std::unique_ptr<MessageT, Deleter> message,
    uint64_t subscription_id,
    typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
  {
    add_owned_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
      std::move(message), {subscription_id}, allocator);
  }

  RCLCPP_PUBLIC
  bool
  matches_any_publishers(const rmw_gid_t * id) const;

  RCLCPP_PUBLIC
  size_t
  get_subscription_count(uint64_t intra_process_publisher_id) const;

  RCLCPP_PUBLIC
  rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr
  get_subscription_intra_process(uint64_t intra_process_subscription_id);

  RCLCPP_PUBLIC
  size_t
  lowest_available_capacity(const uint64_t intra_process_publisher_id) const;

private:
  struct SplittedSubscriptions
  {
    std::vector<uint64_t> take_shared_subscriptions;
    std::vector<uint64_t> take_ownership_subscriptions;
  };

  using SubscriptionMap =
    std::unordered_map<uint64_t, rclcpp::experimental::SubscriptionIntraProcessBase::WeakPtr>;

  using PublisherMap =
    std::unordered_map<uint64_t, rclcpp::PublisherBase::WeakPtr>;

  using PublisherBufferMap =
    std::unordered_map<uint64_t, rclcpp::experimental::buffers::IntraProcessBufferBase::WeakPtr>;

  using PublisherToSubscriptionIdsMap =
    std::unordered_map<uint64_t, SplittedSubscriptions>;

  RCLCPP_PUBLIC
  static
  uint64_t
  get_next_unique_id();

  RCLCPP_PUBLIC
  void
  insert_sub_id_for_pub(uint64_t sub_id, uint64_t pub_id, bool use_take_shared_method);

  RCLCPP_PUBLIC
  bool
  can_communicate(
    rclcpp::PublisherBase::SharedPtr pub,
    rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr sub) const;

  template<
    typename ROSMessageType,
    typename Alloc = std::allocator<ROSMessageType>
  >
  void do_transient_local_publish(
    const uint64_t pub_id, const uint64_t sub_id,
    const bool use_take_shared_method)
  {
    using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, Alloc>;
    using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type;
    using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;

    auto publisher_buffer = publisher_buffers_[pub_id].lock();
    if (!publisher_buffer) {
      throw std::runtime_error("publisher buffer has unexpectedly gone out of scope");
    }
    auto buffer = std::dynamic_pointer_cast<
      rclcpp::experimental::buffers::IntraProcessBuffer<
        ROSMessageType,
        ROSMessageTypeAllocator,
        ROSMessageTypeDeleter
      >
      >(publisher_buffer);
    if (!buffer) {
      throw std::runtime_error(
              "failed to dynamic cast publisher's IntraProcessBufferBase to "
              "IntraProcessBuffer<ROSMessageType,ROSMessageTypeAllocator,"
              "ROSMessageTypeDeleter> which can happen when the publisher and "
              "subscription use different allocator types, which is not supported");
    }
    if (use_take_shared_method) {
      auto data_vec = buffer->get_all_data_shared();
      for (auto shared_data : data_vec) {
        this->template add_shared_msg_to_buffer<
          ROSMessageType, ROSMessageTypeAllocator, ROSMessageTypeDeleter, ROSMessageType>(
          shared_data, sub_id);
      }
    } else {
      auto data_vec = buffer->get_all_data_unique();
      for (auto & owned_data : data_vec) {
        auto allocator = ROSMessageTypeAllocator();
        this->template add_owned_msg_to_buffer<
          ROSMessageType, ROSMessageTypeAllocator, ROSMessageTypeDeleter, ROSMessageType>(
          std::move(owned_data), sub_id, allocator);
      }
    }
  }

  template<
    typename MessageT,
    typename Alloc,
    typename Deleter,
    typename ROSMessageType>
  void
  add_shared_msg_to_buffers(
    std::shared_ptr<const MessageT> message,
    std::vector<uint64_t> subscription_ids)
  {
    using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, Alloc>;
    using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type;
    using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;

    using PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type;
    using PublishedTypeAllocatorTraits = allocator::AllocRebind<PublishedType, Alloc>;
    using PublishedTypeAllocator = typename PublishedTypeAllocatorTraits::allocator_type;
    using PublishedTypeDeleter = allocator::Deleter<PublishedTypeAllocator, PublishedType>;

    for (auto id : subscription_ids) {
      auto subscription_it = subscriptions_.find(id);
      if (subscription_it == subscriptions_.end()) {
        throw std::runtime_error("subscription has unexpectedly gone out of scope");
      }
      auto subscription_base = subscription_it->second.lock();
      if (subscription_base == nullptr) {
        subscriptions_.erase(id);
        continue;
      }

      auto subscription = std::dynamic_pointer_cast<
        rclcpp::experimental::SubscriptionIntraProcessBuffer<PublishedType,
        PublishedTypeAllocator, PublishedTypeDeleter, ROSMessageType>
        >(subscription_base);
      if (subscription != nullptr) {
        subscription->provide_intra_process_data(message);
        continue;
      }

      auto ros_message_subscription = std::dynamic_pointer_cast<
        rclcpp::experimental::SubscriptionROSMsgIntraProcessBuffer<ROSMessageType,
        ROSMessageTypeAllocator, ROSMessageTypeDeleter>
        >(subscription_base);
      if (nullptr == ros_message_subscription) {
        throw std::runtime_error(
                "failed to dynamic cast SubscriptionIntraProcessBase to "
                "SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>, or to "
                "SubscriptionROSMsgIntraProcessBuffer<ROSMessageType,ROSMessageTypeAllocator,"
                "ROSMessageTypeDeleter> which can happen when the publisher and "
                "subscription use different allocator types, which is not supported");
      }

      if constexpr (rclcpp::TypeAdapter<MessageT>::is_specialized::value) {
        ROSMessageType ros_msg;
        rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*message, ros_msg);
        ros_message_subscription->provide_intra_process_message(
          std::make_shared<ROSMessageType>(ros_msg));
      } else {
        if constexpr (std::is_same<MessageT, ROSMessageType>::value) {
          ros_message_subscription->provide_intra_process_message(message);
        } else {
          if constexpr (std::is_same<typename rclcpp::TypeAdapter<MessageT,
            ROSMessageType>::ros_message_type, ROSMessageType>::value)
          {
            ROSMessageType ros_msg;
            rclcpp::TypeAdapter<MessageT, ROSMessageType>::convert_to_ros_message(
              *message, ros_msg);
            ros_message_subscription->provide_intra_process_message(
              std::make_shared<ROSMessageType>(ros_msg));
          }
        }
      }
    }
  }

  template<
    typename MessageT,
    typename Alloc,
    typename Deleter,
    typename ROSMessageType>
  void
  add_owned_msg_to_buffers(
    std::unique_ptr<MessageT, Deleter> message,
    std::vector<uint64_t> subscription_ids,
    typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
  {
    using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
    using MessageUniquePtr = std::unique_ptr<MessageT, Deleter>;

    using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, Alloc>;
    using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type;
    using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;

    using PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type;
    using PublishedTypeAllocatorTraits = allocator::AllocRebind<PublishedType, Alloc>;
    using PublishedTypeAllocator = typename PublishedTypeAllocatorTraits::allocator_type;
    using PublishedTypeDeleter = allocator::Deleter<PublishedTypeAllocator, PublishedType>;

    for (auto it = subscription_ids.begin(); it != subscription_ids.end(); it++) {
      auto subscription_it = subscriptions_.find(*it);
      if (subscription_it == subscriptions_.end()) {
        throw std::runtime_error("subscription has unexpectedly gone out of scope");
      }
      auto subscription_base = subscription_it->second.lock();
      if (subscription_base == nullptr) {
        subscriptions_.erase(subscription_it);
        continue;
      }

      auto subscription = std::dynamic_pointer_cast<
        rclcpp::experimental::SubscriptionIntraProcessBuffer<PublishedType,
        PublishedTypeAllocator, PublishedTypeDeleter, ROSMessageType>
        >(subscription_base);
      if (subscription != nullptr) {
        if (std::next(it) == subscription_ids.end()) {
          // If this is the last subscription, give up ownership
          subscription->provide_intra_process_data(std::move(message));
          // Last message delivered, break from for loop
          break;
        } else {
          // Copy the message since we have additional subscriptions to serve
          Deleter deleter = message.get_deleter();
          auto ptr = MessageAllocTraits::allocate(allocator, 1);
          MessageAllocTraits::construct(allocator, ptr, *message);

          subscription->provide_intra_process_data(MessageUniquePtr(ptr, deleter));
        }

        continue;
      }

      auto ros_message_subscription = std::dynamic_pointer_cast<
        rclcpp::experimental::SubscriptionROSMsgIntraProcessBuffer<ROSMessageType,
        ROSMessageTypeAllocator, ROSMessageTypeDeleter>
        >(subscription_base);
      if (nullptr == ros_message_subscription) {
        throw std::runtime_error(
                "failed to dynamic cast SubscriptionIntraProcessBase to "
                "SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>, or to "
                "SubscriptionROSMsgIntraProcessBuffer<ROSMessageType,ROSMessageTypeAllocator,"
                "ROSMessageTypeDeleter> which can happen when the publisher and "
                "subscription use different allocator types, which is not supported");
      }

      if constexpr (rclcpp::TypeAdapter<MessageT, ROSMessageType>::is_specialized::value) {
        ROSMessageTypeAllocator ros_message_alloc(allocator);
        auto ptr = ROSMessageTypeAllocatorTraits::allocate(ros_message_alloc, 1);
        ROSMessageTypeAllocatorTraits::construct(ros_message_alloc, ptr);
        ROSMessageTypeDeleter deleter;
        allocator::set_allocator_for_deleter(&deleter, &allocator);
        rclcpp::TypeAdapter<MessageT, ROSMessageType>::convert_to_ros_message(*message, *ptr);
        auto ros_msg = std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>(ptr, deleter);
        ros_message_subscription->provide_intra_process_message(std::move(ros_msg));
      } else {
        if constexpr (std::is_same<MessageT, ROSMessageType>::value) {
          if (std::next(it) == subscription_ids.end()) {
            // If this is the last subscription, give up ownership
            ros_message_subscription->provide_intra_process_message(std::move(message));
            // Last message delivered, break from for loop
            break;
          } else {
            // Copy the message since we have additional subscriptions to serve
            Deleter deleter = message.get_deleter();
            allocator::set_allocator_for_deleter(&deleter, &allocator);
            auto ptr = MessageAllocTraits::allocate(allocator, 1);
            MessageAllocTraits::construct(allocator, ptr, *message);

            ros_message_subscription->provide_intra_process_message(
              MessageUniquePtr(ptr, deleter));
          }
        }
      }
    }
  }

  PublisherToSubscriptionIdsMap pub_to_subs_;
  SubscriptionMap subscriptions_;
  PublisherMap publishers_;
  PublisherBufferMap publisher_buffers_;

  mutable std::shared_timed_mutex mutex_;
};

}  // namespace experimental
}  // namespace rclcpp

#endif  // RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_