Program Listing for File intra_process_manager.hpp
↰ Return to documentation for file (include/rclcpp/experimental/intra_process_manager.hpp
)
// Copyright 2019 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_
#define RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_
#include <rmw/types.h>
#include <shared_mutex>
#include <iterator>
#include <memory>
#include <stdexcept>
#include <unordered_map>
#include <utility>
#include <vector>
#include <typeinfo>
#include "rclcpp/allocator/allocator_deleter.hpp"
#include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
#include "rclcpp/experimental/ros_message_intra_process_buffer.hpp"
#include "rclcpp/experimental/subscription_intra_process.hpp"
#include "rclcpp/experimental/subscription_intra_process_base.hpp"
#include "rclcpp/experimental/subscription_intra_process_buffer.hpp"
#include "rclcpp/logger.hpp"
#include "rclcpp/logging.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/publisher_base.hpp"
#include "rclcpp/type_adapter.hpp"
#include "rclcpp/visibility_control.hpp"
namespace rclcpp
{
namespace experimental
{
class IntraProcessManager
{
private:
RCLCPP_DISABLE_COPY(IntraProcessManager)
public:
RCLCPP_SMART_PTR_DEFINITIONS(IntraProcessManager)
RCLCPP_PUBLIC
IntraProcessManager();
RCLCPP_PUBLIC
virtual ~IntraProcessManager();
template<
typename ROSMessageType,
typename Alloc = std::allocator<ROSMessageType>
>
uint64_t
add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription)
{
std::unique_lock<std::shared_timed_mutex> lock(mutex_);
uint64_t sub_id = IntraProcessManager::get_next_unique_id();
subscriptions_[sub_id] = subscription;
// adds the subscription id to all the matchable publishers
for (auto & pair : publishers_) {
auto publisher = pair.second.lock();
if (!publisher) {
continue;
}
if (can_communicate(publisher, subscription)) {
uint64_t pub_id = pair.first;
insert_sub_id_for_pub(sub_id, pub_id, subscription->use_take_shared_method());
if (publisher->is_durability_transient_local() &&
subscription->is_durability_transient_local())
{
do_transient_local_publish<ROSMessageType, Alloc>(
pub_id, sub_id,
subscription->use_take_shared_method());
}
}
}
return sub_id;
}
RCLCPP_PUBLIC
void
remove_subscription(uint64_t intra_process_subscription_id);
RCLCPP_PUBLIC
uint64_t
add_publisher(
rclcpp::PublisherBase::SharedPtr publisher,
rclcpp::experimental::buffers::IntraProcessBufferBase::SharedPtr buffer =
rclcpp::experimental::buffers::IntraProcessBufferBase::SharedPtr());
RCLCPP_PUBLIC
void
remove_publisher(uint64_t intra_process_publisher_id);
template<
typename MessageT,
typename ROSMessageType,
typename Alloc,
typename Deleter = std::default_delete<MessageT>
>
void
do_intra_process_publish(
uint64_t intra_process_publisher_id,
std::unique_ptr<MessageT, Deleter> message,
typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
{
using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
using MessageAllocatorT = typename MessageAllocTraits::allocator_type;
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
if (publisher_it == pub_to_subs_.end()) {
// Publisher is either invalid or no longer exists.
RCLCPP_WARN(
rclcpp::get_logger("rclcpp"),
"Calling do_intra_process_publish for invalid or no longer existing publisher id");
return;
}
const auto & sub_ids = publisher_it->second;
if (sub_ids.take_ownership_subscriptions.empty()) {
// None of the buffers require ownership, so we promote the pointer
std::shared_ptr<MessageT> msg = std::move(message);
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
msg, sub_ids.take_shared_subscriptions);
} else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT
sub_ids.take_shared_subscriptions.size() <= 1)
{
// There is at maximum 1 buffer that does not require ownership.
// So this case is equivalent to all the buffers requiring ownership
// Merge the two vector of ids into a unique one
std::vector<uint64_t> concatenated_vector(
sub_ids.take_shared_subscriptions.begin(), sub_ids.take_shared_subscriptions.end());
concatenated_vector.insert(
concatenated_vector.end(),
sub_ids.take_ownership_subscriptions.begin(),
sub_ids.take_ownership_subscriptions.end());
this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
std::move(message),
concatenated_vector,
allocator);
} else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT
sub_ids.take_shared_subscriptions.size() > 1)
{
// Construct a new shared pointer from the message
// for the buffers that do not require ownership
auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(allocator, *message);
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
shared_msg, sub_ids.take_shared_subscriptions);
this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
std::move(message), sub_ids.take_ownership_subscriptions, allocator);
}
}
template<
typename MessageT,
typename ROSMessageType,
typename Alloc,
typename Deleter = std::default_delete<MessageT>
>
std::shared_ptr<const MessageT>
do_intra_process_publish_and_return_shared(
uint64_t intra_process_publisher_id,
std::unique_ptr<MessageT, Deleter> message,
typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
{
using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
using MessageAllocatorT = typename MessageAllocTraits::allocator_type;
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
if (publisher_it == pub_to_subs_.end()) {
// Publisher is either invalid or no longer exists.
RCLCPP_WARN(
rclcpp::get_logger("rclcpp"),
"Calling do_intra_process_publish for invalid or no longer existing publisher id");
return nullptr;
}
const auto & sub_ids = publisher_it->second;
if (sub_ids.take_ownership_subscriptions.empty()) {
// If there are no owning, just convert to shared.
std::shared_ptr<MessageT> shared_msg = std::move(message);
if (!sub_ids.take_shared_subscriptions.empty()) {
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
shared_msg, sub_ids.take_shared_subscriptions);
}
return shared_msg;
} else {
// Construct a new shared pointer from the message for the buffers that
// do not require ownership and to return.
auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(allocator, *message);
if (!sub_ids.take_shared_subscriptions.empty()) {
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
shared_msg,
sub_ids.take_shared_subscriptions);
}
if (!sub_ids.take_ownership_subscriptions.empty()) {
this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
std::move(message),
sub_ids.take_ownership_subscriptions,
allocator);
}
return shared_msg;
}
}
template<
typename MessageT,
typename Alloc,
typename Deleter,
typename ROSMessageType>
void
add_shared_msg_to_buffer(
std::shared_ptr<const MessageT> message,
uint64_t subscription_id)
{
add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(message, {subscription_id});
}
template<
typename MessageT,
typename Alloc,
typename Deleter,
typename ROSMessageType>
void
add_owned_msg_to_buffer(
std::unique_ptr<MessageT, Deleter> message,
uint64_t subscription_id,
typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
{
add_owned_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
std::move(message), {subscription_id}, allocator);
}
RCLCPP_PUBLIC
bool
matches_any_publishers(const rmw_gid_t * id) const;
RCLCPP_PUBLIC
size_t
get_subscription_count(uint64_t intra_process_publisher_id) const;
RCLCPP_PUBLIC
rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr
get_subscription_intra_process(uint64_t intra_process_subscription_id);
RCLCPP_PUBLIC
size_t
lowest_available_capacity(const uint64_t intra_process_publisher_id) const;
private:
struct SplittedSubscriptions
{
std::vector<uint64_t> take_shared_subscriptions;
std::vector<uint64_t> take_ownership_subscriptions;
};
using SubscriptionMap =
std::unordered_map<uint64_t, rclcpp::experimental::SubscriptionIntraProcessBase::WeakPtr>;
using PublisherMap =
std::unordered_map<uint64_t, rclcpp::PublisherBase::WeakPtr>;
using PublisherBufferMap =
std::unordered_map<uint64_t, rclcpp::experimental::buffers::IntraProcessBufferBase::WeakPtr>;
using PublisherToSubscriptionIdsMap =
std::unordered_map<uint64_t, SplittedSubscriptions>;
RCLCPP_PUBLIC
static
uint64_t
get_next_unique_id();
RCLCPP_PUBLIC
void
insert_sub_id_for_pub(uint64_t sub_id, uint64_t pub_id, bool use_take_shared_method);
RCLCPP_PUBLIC
bool
can_communicate(
rclcpp::PublisherBase::SharedPtr pub,
rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr sub) const;
template<
typename ROSMessageType,
typename Alloc = std::allocator<ROSMessageType>
>
void do_transient_local_publish(
const uint64_t pub_id, const uint64_t sub_id,
const bool use_take_shared_method)
{
using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, Alloc>;
using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type;
using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;
auto publisher_buffer = publisher_buffers_[pub_id].lock();
if (!publisher_buffer) {
throw std::runtime_error("publisher buffer has unexpectedly gone out of scope");
}
auto buffer = std::dynamic_pointer_cast<
rclcpp::experimental::buffers::IntraProcessBuffer<
ROSMessageType,
ROSMessageTypeAllocator,
ROSMessageTypeDeleter
>
>(publisher_buffer);
if (!buffer) {
throw std::runtime_error(
"failed to dynamic cast publisher's IntraProcessBufferBase to "
"IntraProcessBuffer<ROSMessageType,ROSMessageTypeAllocator,"
"ROSMessageTypeDeleter> which can happen when the publisher and "
"subscription use different allocator types, which is not supported");
}
if (use_take_shared_method) {
auto data_vec = buffer->get_all_data_shared();
for (auto shared_data : data_vec) {
this->template add_shared_msg_to_buffer<
ROSMessageType, ROSMessageTypeAllocator, ROSMessageTypeDeleter, ROSMessageType>(
shared_data, sub_id);
}
} else {
auto data_vec = buffer->get_all_data_unique();
for (auto & owned_data : data_vec) {
auto allocator = ROSMessageTypeAllocator();
this->template add_owned_msg_to_buffer<
ROSMessageType, ROSMessageTypeAllocator, ROSMessageTypeDeleter, ROSMessageType>(
std::move(owned_data), sub_id, allocator);
}
}
}
template<
typename MessageT,
typename Alloc,
typename Deleter,
typename ROSMessageType>
void
add_shared_msg_to_buffers(
std::shared_ptr<const MessageT> message,
std::vector<uint64_t> subscription_ids)
{
using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, Alloc>;
using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type;
using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;
using PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type;
using PublishedTypeAllocatorTraits = allocator::AllocRebind<PublishedType, Alloc>;
using PublishedTypeAllocator = typename PublishedTypeAllocatorTraits::allocator_type;
using PublishedTypeDeleter = allocator::Deleter<PublishedTypeAllocator, PublishedType>;
for (auto id : subscription_ids) {
auto subscription_it = subscriptions_.find(id);
if (subscription_it == subscriptions_.end()) {
throw std::runtime_error("subscription has unexpectedly gone out of scope");
}
auto subscription_base = subscription_it->second.lock();
if (subscription_base == nullptr) {
subscriptions_.erase(id);
continue;
}
auto subscription = std::dynamic_pointer_cast<
rclcpp::experimental::SubscriptionIntraProcessBuffer<PublishedType,
PublishedTypeAllocator, PublishedTypeDeleter, ROSMessageType>
>(subscription_base);
if (subscription != nullptr) {
subscription->provide_intra_process_data(message);
continue;
}
auto ros_message_subscription = std::dynamic_pointer_cast<
rclcpp::experimental::SubscriptionROSMsgIntraProcessBuffer<ROSMessageType,
ROSMessageTypeAllocator, ROSMessageTypeDeleter>
>(subscription_base);
if (nullptr == ros_message_subscription) {
throw std::runtime_error(
"failed to dynamic cast SubscriptionIntraProcessBase to "
"SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>, or to "
"SubscriptionROSMsgIntraProcessBuffer<ROSMessageType,ROSMessageTypeAllocator,"
"ROSMessageTypeDeleter> which can happen when the publisher and "
"subscription use different allocator types, which is not supported");
}
if constexpr (rclcpp::TypeAdapter<MessageT>::is_specialized::value) {
ROSMessageType ros_msg;
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*message, ros_msg);
ros_message_subscription->provide_intra_process_message(
std::make_shared<ROSMessageType>(ros_msg));
} else {
if constexpr (std::is_same<MessageT, ROSMessageType>::value) {
ros_message_subscription->provide_intra_process_message(message);
} else {
if constexpr (std::is_same<typename rclcpp::TypeAdapter<MessageT,
ROSMessageType>::ros_message_type, ROSMessageType>::value)
{
ROSMessageType ros_msg;
rclcpp::TypeAdapter<MessageT, ROSMessageType>::convert_to_ros_message(
*message, ros_msg);
ros_message_subscription->provide_intra_process_message(
std::make_shared<ROSMessageType>(ros_msg));
}
}
}
}
}
template<
typename MessageT,
typename Alloc,
typename Deleter,
typename ROSMessageType>
void
add_owned_msg_to_buffers(
std::unique_ptr<MessageT, Deleter> message,
std::vector<uint64_t> subscription_ids,
typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
{
using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
using MessageUniquePtr = std::unique_ptr<MessageT, Deleter>;
using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, Alloc>;
using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type;
using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;
using PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type;
using PublishedTypeAllocatorTraits = allocator::AllocRebind<PublishedType, Alloc>;
using PublishedTypeAllocator = typename PublishedTypeAllocatorTraits::allocator_type;
using PublishedTypeDeleter = allocator::Deleter<PublishedTypeAllocator, PublishedType>;
for (auto it = subscription_ids.begin(); it != subscription_ids.end(); it++) {
auto subscription_it = subscriptions_.find(*it);
if (subscription_it == subscriptions_.end()) {
throw std::runtime_error("subscription has unexpectedly gone out of scope");
}
auto subscription_base = subscription_it->second.lock();
if (subscription_base == nullptr) {
subscriptions_.erase(subscription_it);
continue;
}
auto subscription = std::dynamic_pointer_cast<
rclcpp::experimental::SubscriptionIntraProcessBuffer<PublishedType,
PublishedTypeAllocator, PublishedTypeDeleter, ROSMessageType>
>(subscription_base);
if (subscription != nullptr) {
if (std::next(it) == subscription_ids.end()) {
// If this is the last subscription, give up ownership
subscription->provide_intra_process_data(std::move(message));
// Last message delivered, break from for loop
break;
} else {
// Copy the message since we have additional subscriptions to serve
Deleter deleter = message.get_deleter();
auto ptr = MessageAllocTraits::allocate(allocator, 1);
MessageAllocTraits::construct(allocator, ptr, *message);
subscription->provide_intra_process_data(MessageUniquePtr(ptr, deleter));
}
continue;
}
auto ros_message_subscription = std::dynamic_pointer_cast<
rclcpp::experimental::SubscriptionROSMsgIntraProcessBuffer<ROSMessageType,
ROSMessageTypeAllocator, ROSMessageTypeDeleter>
>(subscription_base);
if (nullptr == ros_message_subscription) {
throw std::runtime_error(
"failed to dynamic cast SubscriptionIntraProcessBase to "
"SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>, or to "
"SubscriptionROSMsgIntraProcessBuffer<ROSMessageType,ROSMessageTypeAllocator,"
"ROSMessageTypeDeleter> which can happen when the publisher and "
"subscription use different allocator types, which is not supported");
}
if constexpr (rclcpp::TypeAdapter<MessageT, ROSMessageType>::is_specialized::value) {
ROSMessageTypeAllocator ros_message_alloc(allocator);
auto ptr = ROSMessageTypeAllocatorTraits::allocate(ros_message_alloc, 1);
ROSMessageTypeAllocatorTraits::construct(ros_message_alloc, ptr);
ROSMessageTypeDeleter deleter;
allocator::set_allocator_for_deleter(&deleter, &allocator);
rclcpp::TypeAdapter<MessageT, ROSMessageType>::convert_to_ros_message(*message, *ptr);
auto ros_msg = std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>(ptr, deleter);
ros_message_subscription->provide_intra_process_message(std::move(ros_msg));
} else {
if constexpr (std::is_same<MessageT, ROSMessageType>::value) {
if (std::next(it) == subscription_ids.end()) {
// If this is the last subscription, give up ownership
ros_message_subscription->provide_intra_process_message(std::move(message));
// Last message delivered, break from for loop
break;
} else {
// Copy the message since we have additional subscriptions to serve
Deleter deleter = message.get_deleter();
allocator::set_allocator_for_deleter(&deleter, &allocator);
auto ptr = MessageAllocTraits::allocate(allocator, 1);
MessageAllocTraits::construct(allocator, ptr, *message);
ros_message_subscription->provide_intra_process_message(
MessageUniquePtr(ptr, deleter));
}
}
}
}
}
PublisherToSubscriptionIdsMap pub_to_subs_;
SubscriptionMap subscriptions_;
PublisherMap publishers_;
PublisherBufferMap publisher_buffers_;
mutable std::shared_timed_mutex mutex_;
};
} // namespace experimental
} // namespace rclcpp
#endif // RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_