Program Listing for File types.hpp
↰ Return to documentation for file (/tmp/ws/src/rmw_gurumdds/rmw_gurumdds_shared_cpp/include/rmw_gurumdds_shared_cpp/types.hpp
)
// Copyright 2019 GurumNetworks, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RMW_GURUMDDS_SHARED_CPP__TYPES_HPP_
#define RMW_GURUMDDS_SHARED_CPP__TYPES_HPP_
#include <atomic>
#include <cassert>
#include <exception>
#include <iostream>
#include <limits>
#include <list>
#include <map>
#include <mutex>
#include <set>
#include <sstream>
#include <stdexcept>
#include <string>
#include <utility>
#include "rmw/rmw.h"
#include "rmw/ret_types.h"
#include "rmw_gurumdds_shared_cpp/dds_include.hpp"
#include "rmw_gurumdds_shared_cpp/guid.hpp"
#include "rmw_gurumdds_shared_cpp/qos.hpp"
#include "rmw_gurumdds_shared_cpp/rmw_common.hpp"
#include "rmw_gurumdds_shared_cpp/topic_cache.hpp"
#include "rmw_gurumdds_shared_cpp/visibility_control.h"
enum EntityType {Publisher, Subscriber};
typedef struct _ListenerContext
{
std::mutex * mutex_;
TopicCache<GuidPrefix_t> * topic_cache;
TopicCache<GuidPrefix_t> * graph_cache;
rmw_guard_condition_t * graph_guard_condition;
const char * implementation_identifier;
} ListenerContext;
static void pdp_on_data_available(const dds_DataReader * a_reader)
{
dds_DataReader * reader = const_cast<dds_DataReader *>(a_reader);
ListenerContext * context =
reinterpret_cast<ListenerContext *>(dds_DataReader_get_listener_context(reader));
if (context == nullptr) {
return;
}
std::lock_guard<std::mutex> lock(*context->mutex_);
dds_DataSeq * samples = dds_DataSeq_create(8);
if (samples == nullptr) {
fprintf(stderr, "failed to create data sample sequence\n");
return;
}
dds_SampleInfoSeq * infos = dds_SampleInfoSeq_create(8);
if (infos == nullptr) {
dds_DataSeq_delete(samples);
fprintf(stderr, "failed to create sample info sequence\n");
return;
}
dds_ReturnCode_t ret = dds_DataReader_take(
reader, samples, infos, 8,
dds_ANY_SAMPLE_STATE, dds_ANY_VIEW_STATE, dds_ANY_INSTANCE_STATE);
if (ret == dds_RETCODE_NO_DATA) {
dds_DataReader_return_loan(reader, samples, infos);
dds_DataSeq_delete(samples);
dds_SampleInfoSeq_delete(infos);
return;
}
if (ret != dds_RETCODE_OK) {
fprintf(stderr, "failed to access data from the built-in participant reader\n");
dds_DataReader_return_loan(reader, samples, infos);
dds_DataSeq_delete(samples);
dds_SampleInfoSeq_delete(infos);
return;
}
for (dds_UnsignedLong i = 0; i < dds_DataSeq_length(samples); ++i) {
GuidPrefix_t participant_guid;
dds_SampleInfo * info = dds_SampleInfoSeq_get(infos, i);
if (reinterpret_cast<void *>(info->instance_handle) == NULL) {
continue;
}
memcpy(participant_guid.value, reinterpret_cast<void *>(info->instance_handle), 16);
if (info->valid_data && info->instance_state == dds_ALIVE_INSTANCE_STATE) {
continue;
} else {
if (context->graph_cache->remove_topic_by_puid(participant_guid) <= 0) {
continue;
}
rmw_ret_t rmw_ret = shared__rmw_trigger_guard_condition(
context->implementation_identifier, context->graph_guard_condition);
if (rmw_ret != RMW_RET_OK) {
fprintf(
stderr,
"failed to trigger graph guard condition: %s\n",
rmw_get_error_string().str);
}
}
}
dds_DataReader_return_loan(reader, samples, infos);
dds_DataSeq_delete(samples);
dds_SampleInfoSeq_delete(infos);
dds_DataReader_set_listener_context(reader, context);
}
static void pub_on_data_available(const dds_DataReader * a_reader)
{
dds_DataReader * reader = const_cast<dds_DataReader *>(a_reader);
ListenerContext * context =
reinterpret_cast<ListenerContext *>(dds_DataReader_get_listener_context(reader));
if (context == nullptr) {
return;
}
std::lock_guard<std::mutex> lock(*context->mutex_);
dds_DataSeq * samples = dds_DataSeq_create(8);
if (samples == nullptr) {
fprintf(stderr, "failed to create data sample sequence\n");
return;
}
dds_SampleInfoSeq * infos = dds_SampleInfoSeq_create(8);
if (infos == nullptr) {
dds_DataSeq_delete(samples);
fprintf(stderr, "failed to create sample info sequence\n");
return;
}
dds_ReturnCode_t ret = dds_DataReader_take(
reader, samples, infos, dds_LENGTH_UNLIMITED,
dds_ANY_SAMPLE_STATE, dds_ANY_VIEW_STATE, dds_ANY_INSTANCE_STATE);
if (ret == dds_RETCODE_NO_DATA) {
dds_DataReader_return_loan(reader, samples, infos);
dds_DataSeq_delete(samples);
dds_SampleInfoSeq_delete(infos);
return;
}
if (ret != dds_RETCODE_OK) {
fprintf(stderr, "failed to access data from the built-in publication reader\n");
dds_DataReader_return_loan(reader, samples, infos);
dds_DataSeq_delete(samples);
dds_SampleInfoSeq_delete(infos);
return;
}
for (dds_UnsignedLong i = 0; i < dds_DataSeq_length(samples); ++i) {
std::string topic_name, type_name;
GuidPrefix_t guid, participant_guid;
dds_PublicationBuiltinTopicData * pbtd =
reinterpret_cast<dds_PublicationBuiltinTopicData *>(dds_DataSeq_get(samples, i));
dds_SampleInfo * info = dds_SampleInfoSeq_get(infos, i);
if (reinterpret_cast<void *>(info->instance_handle) == NULL) {
continue;
}
memcpy(guid.value, reinterpret_cast<void *>(info->instance_handle), 16);
if (info->valid_data && info->instance_state == dds_ALIVE_INSTANCE_STATE) {
dds_BuiltinTopicKey_to_GUID(&participant_guid, pbtd->participant_key);
topic_name = std::string(pbtd->topic_name);
type_name = std::string(pbtd->type_name);
rmw_qos_profile_t qos = {
RMW_QOS_POLICY_HISTORY_UNKNOWN, // TODO(clemjh): pbtd doesn't contain history qos policy
RMW_QOS_POLICY_DEPTH_SYSTEM_DEFAULT,
convert_reliability(pbtd->reliability),
convert_durability(pbtd->durability),
convert_deadline(pbtd->deadline),
convert_lifespan(pbtd->lifespan),
convert_liveliness(pbtd->liveliness),
convert_liveliness_lease_duration(pbtd->liveliness),
false,
};
context->topic_cache->add_topic(
participant_guid, guid, std::move(topic_name),
std::move(type_name), qos);
context->graph_cache->add_topic(
participant_guid, guid, std::move(topic_name),
std::move(type_name), qos);
} else {
context->topic_cache->remove_topic(guid);
context->graph_cache->remove_topic(guid);
}
}
if (dds_DataSeq_length(samples) > 0) {
rmw_ret_t rmw_ret = shared__rmw_trigger_guard_condition(
context->implementation_identifier, context->graph_guard_condition);
if (rmw_ret != RMW_RET_OK) {
fprintf(stderr, "failed to trigger graph guard condition: %s\n", rmw_get_error_string().str);
}
}
dds_DataReader_return_loan(reader, samples, infos);
dds_DataSeq_delete(samples);
dds_SampleInfoSeq_delete(infos);
dds_DataReader_set_listener_context(reader, context);
}
static void sub_on_data_available(const dds_DataReader * a_reader)
{
dds_DataReader * reader = const_cast<dds_DataReader *>(a_reader);
ListenerContext * context =
reinterpret_cast<ListenerContext *>(dds_DataReader_get_listener_context(reader));
if (context == nullptr) {
return;
}
std::lock_guard<std::mutex> lock(*context->mutex_);
dds_DataSeq * samples = dds_DataSeq_create(8);
if (samples == nullptr) {
fprintf(stderr, "failed to create data sample sequence\n");
return;
}
dds_SampleInfoSeq * infos = dds_SampleInfoSeq_create(8);
if (infos == nullptr) {
dds_DataSeq_delete(samples);
fprintf(stderr, "failed to create sample info sequence\n");
return;
}
dds_ReturnCode_t ret = dds_DataReader_take(
reader, samples, infos, dds_LENGTH_UNLIMITED,
dds_ANY_SAMPLE_STATE, dds_ANY_VIEW_STATE, dds_ANY_INSTANCE_STATE);
if (ret == dds_RETCODE_NO_DATA) {
dds_DataReader_return_loan(reader, samples, infos);
dds_DataSeq_delete(samples);
dds_SampleInfoSeq_delete(infos);
return;
}
if (ret != dds_RETCODE_OK) {
fprintf(stderr, "failed to access data from the built-in subscription reader\n");
dds_DataReader_return_loan(reader, samples, infos);
dds_DataSeq_delete(samples);
dds_SampleInfoSeq_delete(infos);
return;
}
for (dds_UnsignedLong i = 0; i < dds_DataSeq_length(samples); ++i) {
std::string topic_name, type_name;
GuidPrefix_t guid, participant_guid;
dds_SubscriptionBuiltinTopicData * sbtd =
reinterpret_cast<dds_SubscriptionBuiltinTopicData *>(dds_DataSeq_get(samples, i));
dds_SampleInfo * info = dds_SampleInfoSeq_get(infos, i);
if (reinterpret_cast<void *>(info->instance_handle) == NULL) {
continue;
}
memcpy(guid.value, reinterpret_cast<void *>(info->instance_handle), 16);
if (info->valid_data && info->instance_state == dds_ALIVE_INSTANCE_STATE) {
dds_BuiltinTopicKey_to_GUID(&participant_guid, sbtd->participant_key);
topic_name = sbtd->topic_name;
type_name = sbtd->type_name;
rmw_qos_profile_t qos = {
RMW_QOS_POLICY_HISTORY_UNKNOWN, // TODO(clemjh): sbtd doesn't contain history qos policy
RMW_QOS_POLICY_DEPTH_SYSTEM_DEFAULT,
convert_reliability(sbtd->reliability),
convert_durability(sbtd->durability),
convert_deadline(sbtd->deadline),
RMW_QOS_LIFESPAN_DEFAULT,
convert_liveliness(sbtd->liveliness),
convert_liveliness_lease_duration(sbtd->liveliness),
false,
};
context->topic_cache->add_topic(
participant_guid, guid, std::move(topic_name),
std::move(type_name), qos);
context->graph_cache->add_topic(
participant_guid, guid, std::move(topic_name),
std::move(type_name), qos);
} else {
context->topic_cache->remove_topic(guid);
context->graph_cache->remove_topic(guid);
}
}
if (dds_DataSeq_length(samples) > 0) {
rmw_ret_t rmw_ret = shared__rmw_trigger_guard_condition(
context->implementation_identifier, context->graph_guard_condition);
if (rmw_ret != RMW_RET_OK) {
fprintf(stderr, "failed to trigger graph guard condition: %s\n", rmw_get_error_string().str);
}
}
dds_DataReader_return_loan(reader, samples, infos);
dds_DataSeq_delete(samples);
dds_SampleInfoSeq_delete(infos);
dds_DataReader_set_listener_context(reader, context);
}
class GurumddsDataReaderListener
{
public:
explicit GurumddsDataReaderListener(
const char * implementation_identifier, rmw_guard_condition_t * graph_guard_condition)
: graph_guard_condition(graph_guard_condition),
implementation_identifier(implementation_identifier)
{}
virtual ~GurumddsDataReaderListener() = default;
RMW_GURUMDDS_SHARED_CPP_PUBLIC
virtual void add_information(
const GuidPrefix_t & participant_guid,
const GuidPrefix_t & topic_guid,
const std::string & topic_name,
const std::string & type_name,
rmw_qos_profile_t & qos,
EntityType entity_type);
RMW_GURUMDDS_SHARED_CPP_PUBLIC
virtual void remove_information(
const GuidPrefix_t & topic_guid,
const EntityType entity_type);
RMW_GURUMDDS_SHARED_CPP_PUBLIC
virtual void trigger_graph_guard_condition(void);
size_t count_topic(const char * topic_name);
void fill_topic_names_and_types(
bool no_demangle,
std::map<std::string, std::set<std::string>> & topic_names_to_types);
void fill_service_names_and_types(
std::map<std::string, std::set<std::string>> & services);
void fill_topic_names_and_types_by_guid(
bool no_demangle,
std::map<std::string, std::set<std::string>> & topic_names_to_types_by_guid,
GuidPrefix_t & participant_guid);
void fill_service_names_and_types_by_guid(
std::map<std::string, std::set<std::string>> & services,
GuidPrefix_t & participant_guid,
const std::string suffix);
dds_DataReaderListener dds_listener;
ListenerContext context;
dds_DataReader * dds_reader;
std::mutex mutex_;
TopicCache<GuidPrefix_t> topic_cache;
rmw_guard_condition_t * graph_guard_condition;
const char * implementation_identifier;
};
class GurumddsParticipantListener : public GurumddsDataReaderListener
{
public:
GurumddsParticipantListener(
const char * implementation_identifier, rmw_guard_condition_t * graph_guard_condition)
: GurumddsDataReaderListener(implementation_identifier, graph_guard_condition)
{
context.mutex_ = &(this->mutex_);
context.topic_cache = &(this->topic_cache);
context.graph_guard_condition = this->graph_guard_condition;
context.implementation_identifier = this->implementation_identifier;
dds_listener.on_data_available = pdp_on_data_available;
}
~GurumddsParticipantListener() {}
};
class GurumddsPublisherListener : public GurumddsDataReaderListener
{
public:
GurumddsPublisherListener(
const char * implementation_identifier, rmw_guard_condition_t * graph_guard_condition)
: GurumddsDataReaderListener(implementation_identifier, graph_guard_condition)
{
context.mutex_ = &(this->mutex_);
context.topic_cache = &(this->topic_cache);
context.graph_guard_condition = this->graph_guard_condition;
context.implementation_identifier = this->implementation_identifier;
dds_listener.on_data_available = pub_on_data_available;
}
~GurumddsPublisherListener() {}
};
class GurumddsSubscriberListener : public GurumddsDataReaderListener
{
public:
GurumddsSubscriberListener(
const char * implementation_identifier, rmw_guard_condition_t * graph_guard_condition)
: GurumddsDataReaderListener(implementation_identifier, graph_guard_condition)
{
context.mutex_ = &(this->mutex_);
context.topic_cache = &(this->topic_cache);
context.graph_guard_condition = this->graph_guard_condition;
context.implementation_identifier = this->implementation_identifier;
dds_listener.on_data_available = sub_on_data_available;
}
~GurumddsSubscriberListener() {}
};
typedef struct _GurumddsNodeInfo
{
dds_DomainParticipant * participant;
rmw_guard_condition_t * graph_guard_condition;
GurumddsParticipantListener * part_listener;
GurumddsPublisherListener * pub_listener;
GurumddsSubscriberListener * sub_listener;
std::list<dds_Publisher *> pub_list;
std::list<dds_Subscriber *> sub_list;
} GurumddsNodeInfo;
typedef struct _GurumddsWaitSetInfo
{
dds_WaitSet * wait_set;
dds_ConditionSeq * active_conditions;
dds_ConditionSeq * attached_conditions;
} GurumddsWaitSetInfo;
typedef struct _GurumddsEventInfo
{
virtual ~_GurumddsEventInfo() = default;
virtual rmw_ret_t get_status(const dds_StatusMask mask, void * event) = 0;
virtual dds_StatusCondition * get_statuscondition() = 0;
virtual dds_StatusMask get_status_changes() = 0;
} GurumddsEventInfo;
#endif // RMW_GURUMDDS_SHARED_CPP__TYPES_HPP_