Program Listing for File subscription_intra_process_buffer.hpp

Return to documentation for file (include/rclcpp/experimental/subscription_intra_process_buffer.hpp)

// Copyright 2021 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BUFFER_HPP_
#define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BUFFER_HPP_

#include <memory>
#include <string>
#include <stdexcept>
#include <utility>

#include "rcl/error_handling.h"
#include "rcl/guard_condition.h"
#include "rcl/wait.h"

#include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
#include "rclcpp/experimental/create_intra_process_buffer.hpp"
#include "rclcpp/experimental/subscription_intra_process_base.hpp"
#include "rclcpp/experimental/ros_message_intra_process_buffer.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/type_support_decl.hpp"

namespace rclcpp
{
namespace experimental
{

template<
  typename SubscribedType,
  typename Alloc = std::allocator<SubscribedType>,
  typename Deleter = std::default_delete<SubscribedType>,
  typename ROSMessageType = SubscribedType
>
class SubscriptionIntraProcessBuffer : public SubscriptionROSMsgIntraProcessBuffer<ROSMessageType,
    typename allocator::AllocRebind<ROSMessageType, Alloc>::allocator_type,
    allocator::Deleter<typename allocator::AllocRebind<ROSMessageType, Alloc>::allocator_type,
    ROSMessageType>>
{
public:
  RCLCPP_SMART_PTR_DEFINITIONS(SubscriptionIntraProcessBuffer)

  using SubscribedTypeAllocatorTraits = allocator::AllocRebind<SubscribedType, Alloc>;
  using SubscribedTypeAllocator = typename SubscribedTypeAllocatorTraits::allocator_type;
  using SubscribedTypeDeleter = allocator::Deleter<SubscribedTypeAllocator, SubscribedType>;

  using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, Alloc>;
  using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type;
  using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;

  using ConstMessageSharedPtr = std::shared_ptr<const ROSMessageType>;
  using MessageUniquePtr = std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>;

  using ConstDataSharedPtr = std::shared_ptr<const SubscribedType>;
  using SubscribedTypeUniquePtr = std::unique_ptr<SubscribedType, SubscribedTypeDeleter>;

  using BufferUniquePtr = typename rclcpp::experimental::buffers::IntraProcessBuffer<
    SubscribedType,
    Alloc,
    SubscribedTypeDeleter
    >::UniquePtr;

  SubscriptionIntraProcessBuffer(
    std::shared_ptr<Alloc> allocator,
    rclcpp::Context::SharedPtr context,
    const std::string & topic_name,
    const rclcpp::QoS & qos_profile,
    rclcpp::IntraProcessBufferType buffer_type)
  : SubscriptionROSMsgIntraProcessBuffer<ROSMessageType, ROSMessageTypeAllocator,
      ROSMessageTypeDeleter>(
      context, topic_name, qos_profile),
    subscribed_type_allocator_(*allocator)
  {
    allocator::set_allocator_for_deleter(&subscribed_type_deleter_, &subscribed_type_allocator_);

    // Create the intra-process buffer.
    buffer_ = rclcpp::experimental::create_intra_process_buffer<SubscribedType, Alloc,
        SubscribedTypeDeleter>(
      buffer_type,
      qos_profile,
      std::make_shared<Alloc>(subscribed_type_allocator_));
  }

  bool
  is_ready(rcl_wait_set_t * wait_set) override
  {
    (void) wait_set;
    return buffer_->has_data();
  }

  SubscribedTypeUniquePtr
  convert_ros_message_to_subscribed_type_unique_ptr(const ROSMessageType & msg)
  {
    if constexpr (!std::is_same<SubscribedType, ROSMessageType>::value) {
      auto ptr = SubscribedTypeAllocatorTraits::allocate(subscribed_type_allocator_, 1);
      SubscribedTypeAllocatorTraits::construct(subscribed_type_allocator_, ptr);
      rclcpp::TypeAdapter<SubscribedType, ROSMessageType>::convert_to_custom(msg, *ptr);
      return SubscribedTypeUniquePtr(ptr, subscribed_type_deleter_);
    } else {
      throw std::runtime_error(
              "convert_ros_message_to_subscribed_type_unique_ptr "
              "unexpectedly called without TypeAdapter");
    }
  }

  void
  provide_intra_process_message(ConstMessageSharedPtr message) override
  {
    if constexpr (std::is_same<SubscribedType, ROSMessageType>::value) {
      buffer_->add_shared(std::move(message));
      trigger_guard_condition();
    } else {
      buffer_->add_shared(convert_ros_message_to_subscribed_type_unique_ptr(*message));
      trigger_guard_condition();
    }
    this->invoke_on_new_message();
  }

  void
  provide_intra_process_message(MessageUniquePtr message) override
  {
    if constexpr (std::is_same<SubscribedType, ROSMessageType>::value) {
      buffer_->add_unique(std::move(message));
      trigger_guard_condition();
    } else {
      buffer_->add_unique(convert_ros_message_to_subscribed_type_unique_ptr(*message));
      trigger_guard_condition();
    }
    this->invoke_on_new_message();
  }

  void
  provide_intra_process_data(ConstDataSharedPtr message)
  {
    buffer_->add_shared(std::move(message));
    trigger_guard_condition();
    this->invoke_on_new_message();
  }

  void
  provide_intra_process_data(SubscribedTypeUniquePtr message)
  {
    buffer_->add_unique(std::move(message));
    trigger_guard_condition();
    this->invoke_on_new_message();
  }

  bool
  use_take_shared_method() const override
  {
    return buffer_->use_take_shared_method();
  }

protected:
  void
  trigger_guard_condition() override
  {
    this->gc_.trigger();
  }

  BufferUniquePtr buffer_;
  SubscribedTypeAllocator subscribed_type_allocator_;
  SubscribedTypeDeleter subscribed_type_deleter_;
};

}  // namespace experimental
}  // namespace rclcpp

#endif  // RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BUFFER_HPP_