Program Listing for File custom_participant_info.hpp
↰ Return to documentation for file (include/rmw_fastrtps_shared_cpp/custom_participant_info.hpp
)
// Copyright 2016-2018 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RMW_FASTRTPS_SHARED_CPP__CUSTOM_PARTICIPANT_INFO_HPP_
#define RMW_FASTRTPS_SHARED_CPP__CUSTOM_PARTICIPANT_INFO_HPP_
#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <utility>
#include <vector>
#include "fastdds/dds/domain/DomainParticipant.hpp"
#include "fastdds/dds/domain/DomainParticipantListener.hpp"
#include "fastdds/dds/publisher/Publisher.hpp"
#include "fastdds/dds/subscriber/Subscriber.hpp"
#include "fastdds/rtps/participant/ParticipantDiscoveryInfo.h"
#include "fastdds/rtps/reader/ReaderDiscoveryInfo.h"
#include "fastdds/rtps/writer/WriterDiscoveryInfo.h"
#include "rcpputils/thread_safety_annotations.hpp"
#include "rcutils/logging_macros.h"
#include "rmw/impl/cpp/key_value.hpp"
#include "rmw/qos_profiles.h"
#include "rmw/rmw.h"
#include "rmw_dds_common/context.hpp"
#include "rmw_dds_common/qos.hpp"
#include "rmw_fastrtps_shared_cpp/create_rmw_gid.hpp"
#include "rmw_fastrtps_shared_cpp/custom_event_info.hpp"
#include "rmw_fastrtps_shared_cpp/qos.hpp"
#include "rmw_fastrtps_shared_cpp/rmw_common.hpp"
using rmw_dds_common::operator<<;
class ParticipantListener;
enum class publishing_mode_t
{
ASYNCHRONOUS, // Asynchronous publishing mode
SYNCHRONOUS, // Synchronous publishing mode
AUTO // Use publishing mode set in XML file or Fast DDS default
};
class CustomTopicListener final : public eprosima::fastdds::dds::TopicListener
{
public:
RMW_FASTRTPS_SHARED_CPP_PUBLIC
explicit CustomTopicListener(EventListenerInterface * event_listener);
RMW_FASTRTPS_SHARED_CPP_PUBLIC
void
on_inconsistent_topic(
eprosima::fastdds::dds::Topic * topic,
eprosima::fastdds::dds::InconsistentTopicStatus status) override;
RMW_FASTRTPS_SHARED_CPP_PUBLIC
void add_event_listener(EventListenerInterface * event_listener);
RMW_FASTRTPS_SHARED_CPP_PUBLIC
void remove_event_listener(EventListenerInterface * event_listener);
private:
std::mutex event_listeners_mutex_;
std::set<EventListenerInterface *> event_listeners_
RCPPUTILS_TSA_GUARDED_BY(event_listeners_mutex_);
};
typedef struct UseCountTopic
{
eprosima::fastdds::dds::Topic * topic{nullptr};
CustomTopicListener * topic_listener{nullptr};
size_t use_count{0};
} UseCountTopic;
typedef struct CustomParticipantInfo
{
eprosima::fastdds::dds::DomainParticipant * participant_{nullptr};
ParticipantListener * listener_{nullptr};
std::mutex topic_name_to_topic_mutex_;
// As of 2023-02-07, Fast-DDS only allows one create_topic() with the same
// topic name per DomainParticipant. Thus, we need to check if the topic
// was already created. If it did, then we just increase the use_count
// that we are tracking, and return the existing topic. If it
// didn't, then we create a new one and start tracking it. Once all
// users of the topic are removed, we will delete the topic.
std::map<std::string, std::unique_ptr<UseCountTopic>> topic_name_to_topic_;
eprosima::fastdds::dds::Publisher * publisher_{nullptr};
eprosima::fastdds::dds::Subscriber * subscriber_{nullptr};
// Protects creation and destruction of topics, readers and writers
mutable std::mutex entity_creation_mutex_;
// Flag to establish if the QoS of the DomainParticipant,
// its DataWriters, and its DataReaders are going
// to be configured only from an XML file or if
// their settings are going to be overwritten by code
// with the default configuration.
bool leave_middleware_default_qos;
publishing_mode_t publishing_mode;
RMW_FASTRTPS_SHARED_CPP_PUBLIC
eprosima::fastdds::dds::Topic * find_or_create_topic(
const std::string & topic_name,
const std::string & type_name,
const eprosima::fastdds::dds::TopicQos & topic_qos,
EventListenerInterface * event_listener);
RMW_FASTRTPS_SHARED_CPP_PUBLIC
void delete_topic(
const eprosima::fastdds::dds::Topic * topic,
EventListenerInterface * event_listener);
} CustomParticipantInfo;
class ParticipantListener : public eprosima::fastdds::dds::DomainParticipantListener
{
public:
explicit ParticipantListener(
const char * identifier,
rmw_dds_common::Context * context)
: context(context),
identifier_(identifier)
{}
void on_participant_discovery(
eprosima::fastdds::dds::DomainParticipant *,
eprosima::fastrtps::rtps::ParticipantDiscoveryInfo && info,
bool & should_be_ignored) override
{
should_be_ignored = false;
switch (info.status) {
case eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT:
{
auto map = rmw::impl::cpp::parse_key_value(info.info.m_userData);
auto name_found = map.find("enclave");
if (name_found == map.end()) {
return;
}
auto enclave =
std::string(name_found->second.begin(), name_found->second.end());
context->graph_cache.add_participant(
rmw_fastrtps_shared_cpp::create_rmw_gid(
identifier_, info.info.m_guid),
enclave);
break;
}
case eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::REMOVED_PARTICIPANT:
// fall through
case eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::DROPPED_PARTICIPANT:
context->graph_cache.remove_participant(
rmw_fastrtps_shared_cpp::create_rmw_gid(
identifier_, info.info.m_guid));
break;
default:
return;
}
}
void on_subscriber_discovery(
eprosima::fastdds::dds::DomainParticipant *,
eprosima::fastrtps::rtps::ReaderDiscoveryInfo && info) override
{
if (eprosima::fastrtps::rtps::ReaderDiscoveryInfo::CHANGED_QOS_READER != info.status) {
bool is_alive =
eprosima::fastrtps::rtps::ReaderDiscoveryInfo::DISCOVERED_READER == info.status;
process_discovery_info(info.info, is_alive, true);
}
}
void on_publisher_discovery(
eprosima::fastdds::dds::DomainParticipant *,
eprosima::fastrtps::rtps::WriterDiscoveryInfo && info) override
{
if (eprosima::fastrtps::rtps::WriterDiscoveryInfo::CHANGED_QOS_WRITER != info.status) {
bool is_alive =
eprosima::fastrtps::rtps::WriterDiscoveryInfo::DISCOVERED_WRITER == info.status;
process_discovery_info(info.info, is_alive, false);
}
}
private:
template<class T>
void
process_discovery_info(T & proxyData, bool is_alive, bool is_reader)
{
{
if (is_alive) {
rmw_qos_profile_t qos_profile = rmw_qos_profile_unknown;
rtps_qos_to_rmw_qos(proxyData.m_qos, &qos_profile);
const auto & userDataValue = proxyData.m_qos.m_userData.getValue();
rosidl_type_hash_t type_hash;
if (RMW_RET_OK != rmw_dds_common::parse_type_hash_from_user_data(
userDataValue.data(), userDataValue.size(), type_hash))
{
// Avoid deadlock trying to acquire rclcpp's global logging mutex
// by using eProsima's logging mechanism.
// TODO(sloretz) revisit when this is fixed: https://github.com/ros2/rclcpp/issues/2147
EPROSIMA_LOG_WARNING(
"rmw_fastrtps_shared_cpp", "Failed to parse a type hash for a topic");
/*
RCUTILS_LOG_WARN_NAMED(
"rmw_fastrtps_shared_cpp",
"Failed to parse type hash for topic '%s' with type '%s' from USER_DATA '%*s'.",
proxyData.topicName().c_str(),
proxyData.typeName().c_str(),
static_cast<int>(userDataValue.size()),
reinterpret_cast<const char *>(userDataValue.data()));
*/
type_hash = rosidl_get_zero_initialized_type_hash();
}
context->graph_cache.add_entity(
rmw_fastrtps_shared_cpp::create_rmw_gid(
identifier_,
proxyData.guid()),
proxyData.topicName().to_string(),
proxyData.typeName().to_string(),
type_hash,
rmw_fastrtps_shared_cpp::create_rmw_gid(
identifier_,
iHandle2GUID(proxyData.RTPSParticipantKey())),
qos_profile,
is_reader);
} else {
context->graph_cache.remove_entity(
rmw_fastrtps_shared_cpp::create_rmw_gid(
identifier_,
proxyData.guid()),
is_reader);
}
}
}
rmw_dds_common::Context * context;
const char * const identifier_;
};
#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_PARTICIPANT_INFO_HPP_