Program Listing for File rmw_impl.hpp
↰ Return to documentation for file (/tmp/ws/src/rmw_connextdds/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp
)
// Copyright 2020 Real-Time Innovations, Inc. (RTI)
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RMW_CONNEXTDDS__RMW_IMPL_HPP_
#define RMW_CONNEXTDDS__RMW_IMPL_HPP_
#include <string>
#include <vector>
#include <map>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include "rmw_connextdds/context.hpp"
#include "rmw_connextdds/type_support.hpp"
#include "rmw_connextdds/demangle.hpp"
#include "rmw_connextdds/namespace_prefix.hpp"
#include "rmw_connextdds/rmw_api_impl.hpp"
#include "rcutils/types/uint8_array.h"
#include "rcpputils/thread_safety_annotations.hpp"
/******************************************************************************
* General helpers and utilities.
******************************************************************************/
rcutils_ret_t
rcutils_uint8_array_copy(
rcutils_uint8_array_t * const dst,
const rcutils_uint8_array_t * const src);
rmw_qos_policy_kind_t
dds_qos_policy_to_rmw_qos_policy(const DDS_QosPolicyId_t last_policy_id);
bool ros_event_for_reader(const rmw_event_type_t ros);
DDS_StatusKind
ros_event_to_dds(const rmw_event_type_t ros, bool * const invalid);
const char *
dds_event_to_str(const DDS_StatusKind event);
rmw_ret_t
rmw_connextdds_parse_string_list(
const char * const list,
struct DDS_StringSeq * const parsed_out,
const char delimiter = ',',
const bool trim = true,
const bool allow_empty_elements = true,
const bool append_values = true);
bool rmw_connextdds_find_string_in_list(
const DDS_StringSeq * const values,
const char * const value);
DDS_Duration_t rmw_connextdds_duration_from_ros_time(
const rmw_time_t * const ros_time);
/******************************************************************************
* WaitSet wrapper
******************************************************************************/
class RMW_Connext_Publisher;
class RMW_Connext_Subscriber;
class RMW_Connext_Client;
class RMW_Connext_Service;
#include "rmw_connextdds/rmw_waitset_std.hpp"
/******************************************************************************
* Node support
******************************************************************************/
class RMW_Connext_Node
{
rmw_context_impl_t * ctx;
explicit RMW_Connext_Node(rmw_context_impl_t * const ctx)
: ctx(ctx)
{
}
public:
static RMW_Connext_Node *
create(rmw_context_impl_t * const ctx);
rmw_ret_t
finalize();
rmw_guard_condition_t *
graph_guard_condition()
{
return this->ctx->common.graph_guard_condition;
}
};
/******************************************************************************
* Publication support
******************************************************************************/
class RMW_Connext_Publisher
{
public:
static RMW_Connext_Publisher *
create(
rmw_context_impl_t * const ctx,
DDS_DomainParticipant * const dp,
DDS_Publisher * const pub,
const rosidl_message_type_support_t * const type_supports,
const char * const topic_name,
const rmw_qos_profile_t * const qos_policies,
const rmw_publisher_options_t * const publisher_options,
const bool internal = false,
const RMW_Connext_MessageType msg_type = RMW_CONNEXT_MESSAGE_USERDATA,
const void * const intro_members = nullptr,
const bool intro_members_cpp = false,
std::string * const type_name = nullptr);
rmw_ret_t
finalize();
DDS_DataWriter *
writer() const
{
return this->dds_writer;
}
RMW_Connext_MessageTypeSupport *
message_type_support() const
{
return this->type_support;
}
DDS_InstanceHandle_t
instance_handle()
{
return DDS_Entity_get_instance_handle(
DDS_DataWriter_as_entity(this->writer()));
}
rmw_ret_t
write(
const void * const ros_message,
const bool serialized,
int64_t * const sn_out = nullptr);
rmw_ret_t
enable() const
{
if (DDS_RETCODE_OK !=
DDS_Entity_enable(
DDS_Topic_as_entity(
DDS_DataWriter_get_topic(this->dds_writer))))
{
RMW_CONNEXT_LOG_ERROR_A_SET(
"failed to enable dds writer's topic: '%s' [%s]",
DDS_TopicDescription_get_name(
DDS_Topic_as_topicdescription(this->dds_topic())),
this->type_support->type_name())
return RMW_RET_ERROR;
}
if (DDS_RETCODE_OK !=
DDS_Entity_enable(DDS_DataWriter_as_entity(this->dds_writer)))
{
RMW_CONNEXT_LOG_ERROR_A_SET(
"failed to enable dds writer: '%s' [%s]",
DDS_TopicDescription_get_name(
DDS_Topic_as_topicdescription(this->dds_topic())),
this->type_support->type_name())
return RMW_RET_ERROR;
}
return RMW_RET_OK;
}
RMW_Connext_PublisherStatusCondition *
condition()
{
return &this->status_condition;
}
const rmw_gid_t * gid() const
{
return &this->ros_gid;
}
size_t
subscriptions_count();
rmw_ret_t
assert_liveliness();
rmw_ret_t
wait_for_all_acked(rmw_time_t wait_timeout);
rmw_ret_t
qos(rmw_qos_profile_t * const qos);
rmw_ret_t
requestreply_header_to_dds(
const RMW_Connext_RequestReplyMessage * const rr_msg,
DDS_SampleIdentity_t * const sample_identity,
DDS_SampleIdentity_t * const related_sample_identity);
DDS_Topic * dds_topic() const
{
return DDS_DataWriter_get_topic(this->dds_writer);
}
DDS_Publisher * dds_publisher() const
{
return DDS_DataWriter_get_publisher(this->dds_writer);
}
DDS_DomainParticipant * dds_participant() const
{
DDS_Publisher * const pub = this->dds_publisher();
return DDS_Publisher_get_participant(pub);
}
private:
rmw_context_impl_t * ctx;
DDS_DataWriter * dds_writer;
RMW_Connext_MessageTypeSupport * type_support;
const bool created_topic;
rmw_gid_t ros_gid;
RMW_Connext_PublisherStatusCondition status_condition;
RMW_Connext_Publisher(
rmw_context_impl_t * const ctx,
DDS_DataWriter * const dds_writer,
RMW_Connext_MessageTypeSupport * const type_support,
const bool created_topic);
};
rmw_publisher_t *
rmw_connextdds_create_publisher(
rmw_context_impl_t * const ctx,
const rmw_node_t * const node,
DDS_DomainParticipant * const dp,
DDS_Publisher * const pub,
const rosidl_message_type_support_t * const type_supports,
const char * const topic_name,
const rmw_qos_profile_t * const qos_policies,
const rmw_publisher_options_t * const publisher_options,
const bool internal = false);
rmw_ret_t
rmw_connextdds_destroy_publisher(
rmw_context_impl_t * const ctx,
rmw_publisher_t * const pub);
/******************************************************************************
* Subscription support
******************************************************************************/
class RMW_Connext_Subscriber
{
public:
static RMW_Connext_Subscriber *
create(
rmw_context_impl_t * const ctx,
DDS_DomainParticipant * const dp,
DDS_Subscriber * const sub,
const rosidl_message_type_support_t * const type_supports,
const char * const topic_name,
const rmw_qos_profile_t * const qos_policies,
const rmw_subscription_options_t * const subscriber_options,
const bool internal = false,
const RMW_Connext_MessageType msg_type = RMW_CONNEXT_MESSAGE_USERDATA,
const void * const intro_members = nullptr,
const bool intro_members_cpp = false,
std::string * const type_name = nullptr,
const char * const cft_name = nullptr,
const char * const cft_filter = nullptr);
rmw_ret_t
finalize();
DDS_DataReader *
reader() const
{
return this->dds_reader;
}
RMW_Connext_UntypedSampleSeq *
data_seq()
{
return &this->loan_data;
}
DDS_SampleInfoSeq *
info_seq()
{
return &this->loan_info;
}
RMW_Connext_MessageTypeSupport *
message_type_support() const
{
return this->type_support;
}
DDS_InstanceHandle_t
instance_handle()
{
return DDS_Entity_get_instance_handle(
DDS_DataReader_as_entity(this->reader()));
}
DDS_InstanceHandle_t
participant_instance_handle()
{
return DDS_Entity_get_instance_handle(
DDS_DomainParticipant_as_entity(
DDS_Subscriber_get_participant(
DDS_DataReader_get_subscriber(this->reader()))));
}
rmw_ret_t
enable() const
{
if (DDS_RETCODE_OK !=
DDS_Entity_enable(DDS_Topic_as_entity(this->dds_topic)))
{
RMW_CONNEXT_LOG_ERROR_A_SET(
"failed to enable dds reader's topic: '%s' [%s]",
DDS_TopicDescription_get_name(
DDS_Topic_as_topicdescription(this->dds_topic)),
this->type_support->type_name())
return RMW_RET_ERROR;
}
if (DDS_RETCODE_OK !=
DDS_Entity_enable(DDS_DataReader_as_entity(this->dds_reader)))
{
RMW_CONNEXT_LOG_ERROR_A_SET(
"failed to enable dds reader: '%s' [%s]",
DDS_TopicDescription_get_name(
DDS_Topic_as_topicdescription(this->dds_topic)),
this->type_support->type_name())
return RMW_RET_ERROR;
}
return RMW_RET_OK;
}
RMW_Connext_SubscriberStatusCondition *
condition()
{
return &this->status_condition;
}
const rmw_gid_t * gid() const
{
return &this->ros_gid;
}
size_t
publications_count();
rmw_ret_t
qos(rmw_qos_profile_t * const qos);
rmw_ret_t
loan_messages(const bool update_condition = true);
rmw_ret_t
return_messages();
rmw_ret_t
loan_messages_if_needed(const bool update_condition = true)
{
rmw_ret_t rc = RMW_RET_OK;
// take messages from reader if we don't have an outstanding loan
// or if we have consumed the current loan
if (this->loan_len == 0 || this->loan_next >= this->loan_len) {
/* return any previously loaned messages */
if (this->loan_len > 0) {
rc = this->return_messages();
if (RMW_RET_OK != rc) {
return rc;
}
}
/* loan messages from reader */
rc = this->loan_messages(update_condition);
if (RMW_RET_OK != rc) {
return rc;
}
}
return RMW_RET_OK;
}
void
requestreply_header_from_dds(
RMW_Connext_RequestReplyMessage * const rr_msg,
const DDS_SampleIdentity_t * const sample_identity,
const DDS_SampleIdentity_t * const related_sample_identity);
rmw_ret_t
take_next(
void ** const ros_messages,
rmw_message_info_t * const message_infos,
const size_t max_samples,
size_t * const taken,
const bool serialized,
const DDS_InstanceHandle_t * const request_writer_handle = nullptr);
rmw_ret_t
take_message(
void * const ros_message,
rmw_message_info_t * const message_info,
bool * const taken,
const DDS_InstanceHandle_t * const request_writer_handle = nullptr);
rmw_ret_t
take(
rmw_message_sequence_t * const message_sequence,
rmw_message_info_sequence_t * const message_info_sequence,
const size_t max_samples,
size_t * const taken);
rmw_ret_t
take_serialized(
rmw_serialized_message_t * const serialized_message,
rmw_message_info_t * const message_info,
bool * const taken);
rmw_ret_t
set_content_filter(
const rmw_subscription_content_filter_options_t * const options);
rmw_ret_t
get_content_filter(
rcutils_allocator_t * allocator,
rmw_subscription_content_filter_options_t * const options);
bool
has_data()
{
std::lock_guard<std::mutex> lock(this->loan_mutex);
if (RMW_RET_OK !=
this->loan_messages_if_needed(false /* update_condition */))
{
RMW_CONNEXT_LOG_ERROR("failed to check loaned messages")
return false;
}
bool has_data = this->loan_len > 0;
RMW_CONNEXT_ASSERT(!has_data || this->loan_next < this->loan_len)
return has_data;
}
DDS_Subscriber * dds_subscriber() const
{
return DDS_DataReader_get_subscriber(this->dds_reader);
}
DDS_DomainParticipant * dds_participant() const
{
DDS_Subscriber * const sub = this->dds_subscriber();
return DDS_Subscriber_get_participant(sub);
}
DDS_Topic * topic() const
{
return this->dds_topic;
}
static std::string get_atomic_id()
{
static std::atomic_uint64_t id;
return std::to_string(id++);
}
bool is_cft_enabled()
{
return !this->cft_expression.empty();
}
const bool internal;
const bool ignore_local;
private:
rmw_context_impl_t * ctx;
DDS_DataReader * dds_reader;
DDS_Topic * dds_topic;
DDS_TopicDescription * dds_topic_cft;
std::string cft_expression;
RMW_Connext_MessageTypeSupport * type_support;
rmw_gid_t ros_gid;
const bool created_topic;
RMW_Connext_SubscriberStatusCondition status_condition;
RMW_Connext_UntypedSampleSeq loan_data;
DDS_SampleInfoSeq loan_info;
size_t loan_len;
size_t loan_next;
std::mutex loan_mutex;
RMW_Connext_Subscriber(
rmw_context_impl_t * const ctx,
DDS_DataReader * const dds_reader,
DDS_Topic * const dds_topic,
RMW_Connext_MessageTypeSupport * const type_support,
const bool ignore_local,
const bool created_topic,
DDS_TopicDescription * const dds_topic_cft,
const char * const cft_expression,
const bool internal);
friend class RMW_Connext_SubscriberStatusCondition;
};
rmw_subscription_t *
rmw_connextdds_create_subscriber(
rmw_context_impl_t * const ctx,
const rmw_node_t * const node,
DDS_DomainParticipant * const dp,
DDS_Subscriber * const sub,
const rosidl_message_type_support_t * const type_supports,
const char * const topic_name,
const rmw_qos_profile_t * const qos_policies,
const rmw_subscription_options_t * const subscriber_options,
const bool internal = false);
rmw_ret_t
rmw_connextdds_destroy_subscriber(
rmw_context_impl_t * const ctx,
rmw_subscription_t * const sub);
void rmw_connextdds_message_info_from_dds(
rmw_message_info_t * const to,
const DDS_SampleInfo * const from);
/******************************************************************************
* Client/Service support
******************************************************************************/
class RMW_Connext_Client
{
RMW_Connext_Publisher * request_pub;
RMW_Connext_Subscriber * reply_sub;
std::atomic_uint next_request_id;
rmw_context_impl_t * ctx;
RMW_Connext_Client()
: request_pub(nullptr),
reply_sub(nullptr),
next_request_id(1)
{
}
public:
static RMW_Connext_Client *
create(
rmw_context_impl_t * const ctx,
DDS_DomainParticipant * const dp,
DDS_Publisher * const pub,
DDS_Subscriber * const sub,
const rosidl_service_type_support_t * const type_supports,
const char * const svc_name,
const rmw_qos_profile_t * const qos_policies);
rmw_ret_t
finalize();
rmw_ret_t
is_service_available(bool & available);
rmw_ret_t
take_response(
rmw_service_info_t * const request_header,
void * const ros_response,
bool * const taken);
rmw_ret_t
send_request(
const void * const ros_request,
int64_t * const sequence_id);
RMW_Connext_Publisher *
publisher() const
{
return this->request_pub;
}
RMW_Connext_Subscriber *
subscriber() const
{
return this->reply_sub;
}
rmw_ret_t
enable();
rmw_ret_t
request_publisher_qos(rmw_qos_profile_t * const qos);
rmw_ret_t
response_subscription_qos(rmw_qos_profile_t * const qos);
const rmw_gid_t gid() const
{
return *this->request_pub->gid();
}
};
class RMW_Connext_Service
{
RMW_Connext_Publisher * reply_pub;
RMW_Connext_Subscriber * request_sub;
rmw_context_impl_t * ctx;
public:
static RMW_Connext_Service *
create(
rmw_context_impl_t * const ctx,
DDS_DomainParticipant * const dp,
DDS_Publisher * const pub,
DDS_Subscriber * const sub,
const rosidl_service_type_support_t * const type_supports,
const char * const svc_name,
const rmw_qos_profile_t * const qos_policies);
rmw_ret_t
finalize();
rmw_ret_t
take_request(
rmw_service_info_t * const request_header,
void * const ros_request,
bool * const taken);
rmw_ret_t
send_response(
rmw_request_id_t * const request_id,
const void * const ros_response);
RMW_Connext_Publisher *
publisher() const
{
return this->reply_pub;
}
RMW_Connext_Subscriber *
subscriber() const
{
return this->request_sub;
}
rmw_ret_t
enable();
rmw_ret_t
response_publisher_qos(rmw_qos_profile_t * const qos);
rmw_ret_t
request_subscription_qos(rmw_qos_profile_t * const qos);
};
/******************************************************************************
* Waitsets and Conditions
******************************************************************************/
rmw_guard_condition_t *
rmw_connextdds_create_guard_condition(const bool internal);
rmw_ret_t
rmw_connextdds_destroy_guard_condition(rmw_guard_condition_t * const gc);
rmw_ret_t
rmw_connextdds_trigger_guard_condition(const rmw_guard_condition_t * const gc);
rmw_wait_set_t *
rmw_connextdds_create_waitset(const size_t max_conditions);
rmw_ret_t
rmw_connextdds_destroy_waitset(rmw_wait_set_t * const ws);
rmw_ret_t
rmw_connextdds_waitset_wait(
rmw_wait_set_t * const ws,
rmw_subscriptions_t * subs,
rmw_guard_conditions_t * gcs,
rmw_services_t * srvs,
rmw_clients_t * cls,
rmw_events_t * evs,
const rmw_time_t * wait_timeout);
/******************************************************************************
* GUID helpers
******************************************************************************/
rmw_ret_t
rmw_connextdds_gid_to_guid(const rmw_gid_t & gid, struct DDS_GUID_t & guid);
rmw_ret_t
rmw_connextdds_guid_to_gid(const struct DDS_GUID_t & guid, rmw_gid_t & gid);
void rmw_connextdds_sn_ros_to_dds(
const int64_t sn_ros, struct DDS_SequenceNumber_t & sn_dds);
#define rmw_connextdds_sn_ros_to_dds(sn_ros_, sn_dds_) \
{ \
(sn_dds_).high = ((sn_ros_) & 0xFFFFFFFF00000000LL) >> 32; \
(sn_dds_).low = (sn_ros_) & 0x00000000FFFFFFFFLL; \
}
void rmw_connextdds_sn_dds_to_ros(
const struct DDS_SequenceNumber_t & sn_dds, int64_t & sn_ros);
#define rmw_connextdds_sn_dds_to_ros(sn_dds_, sn_ros_) \
{ \
sn_ros_ = ((int64_t)(sn_dds_).high) << 32 | (sn_dds_).low; \
}
void rmw_connextdds_ih_to_gid(
const DDS_InstanceHandle_t & ih, rmw_gid_t & gid);
void rmw_connextdds_get_entity_gid(
DDS_Entity * const entity, rmw_gid_t & gid);
void rmw_connextdds_get_entity_gid(
DDS_DomainParticipant * const dp, rmw_gid_t & gid);
void rmw_connextdds_get_entity_gid(
DDS_Publisher * const pub, rmw_gid_t & gid);
void rmw_connextdds_get_entity_gid(
DDS_Subscriber * const sub, rmw_gid_t & gid);
void rmw_connextdds_get_entity_gid(
DDS_DataWriter * const writer, rmw_gid_t & gid);
void rmw_connextdds_get_entity_gid(
DDS_DataReader * const reader, rmw_gid_t & gid);
void rmw_connextdds_get_entity_gid(
DDS_Topic * const topic, rmw_gid_t & gid);
/******************************************************************************
* Type helpers
******************************************************************************/
std::string
rmw_connextdds_create_type_name(
const message_type_support_callbacks_t * callbacks,
const bool mangle_names = true);
std::string
rmw_connextdds_create_type_name(
const rosidl_typesupport_introspection_cpp::MessageMembers * const members,
const bool mangle_names = true);
std::string
rmw_connextdds_create_type_name(
const rosidl_typesupport_introspection_c__MessageMembers * const members,
const bool mangle_names = true);
std::string
rmw_connextdds_create_type_name_request(
const service_type_support_callbacks_t * callbacks,
const bool mangle_names = true);
std::string
rmw_connextdds_create_type_name_response(
const service_type_support_callbacks_t * callbacks,
const bool mangle_names = true);
/******************************************************************************
* Topic Helpers
******************************************************************************/
std::string
rmw_connextdds_create_topic_name(
const char * prefix,
const char * topic_name,
const char * suffix,
bool avoid_ros_namespace_conventions);
std::string
rmw_connextdds_create_topic_name(
const char * prefix,
const char * topic_name,
const char * suffix,
const rmw_qos_profile_t * qos_policies);
/******************************************************************************
* Qos Helpers
******************************************************************************/
rmw_ret_t
rmw_connextdds_get_readerwriter_qos(
const bool writer_qos,
RMW_Connext_MessageTypeSupport * const type_support,
DDS_HistoryQosPolicy * const history,
DDS_ReliabilityQosPolicy * const reliability,
DDS_DurabilityQosPolicy * const durability,
DDS_DeadlineQosPolicy * const deadline,
DDS_LivelinessQosPolicy * const liveliness,
DDS_ResourceLimitsQosPolicy * const resource_limits,
DDS_PublishModeQosPolicy * const publish_mode,
DDS_LifespanQosPolicy * const lifespan,
DDS_UserDataQosPolicy * const user_data,
const rmw_qos_profile_t * const qos_policies,
const rmw_publisher_options_t * const pub_options,
const rmw_subscription_options_t * const sub_options);
rmw_ret_t
rmw_connextdds_readerwriter_qos_to_ros(
const DDS_HistoryQosPolicy * const history,
const DDS_ReliabilityQosPolicy * const reliability,
const DDS_DurabilityQosPolicy * const durability,
const DDS_DeadlineQosPolicy * const deadline,
const DDS_LivelinessQosPolicy * const liveliness,
const DDS_LifespanQosPolicy * const lifespan,
rmw_qos_profile_t * const qos_policies);
/******************************************************************************
* Security Helpers
******************************************************************************/
rmw_ret_t
rmw_connextdds_apply_security_logging_configuration(
DDS_PropertyQosPolicy * const properties);
#endif // RMW_CONNEXTDDS__RMW_IMPL_HPP_