.. _program_listing_file__tmp_ws_src_rmw_gurumdds_rmw_gurumdds_shared_cpp_include_rmw_gurumdds_shared_cpp_types.hpp: Program Listing for File types.hpp ================================== |exhale_lsh| :ref:`Return to documentation for file ` (``/tmp/ws/src/rmw_gurumdds/rmw_gurumdds_shared_cpp/include/rmw_gurumdds_shared_cpp/types.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp // Copyright 2019 GurumNetworks, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #ifndef RMW_GURUMDDS_SHARED_CPP__TYPES_HPP_ #define RMW_GURUMDDS_SHARED_CPP__TYPES_HPP_ #include #include #include #include #include #include #include #include #include #include #include #include #include #include "rmw/rmw.h" #include "rmw/ret_types.h" #include "rmw_gurumdds_shared_cpp/dds_include.hpp" #include "rmw_gurumdds_shared_cpp/guid.hpp" #include "rmw_gurumdds_shared_cpp/qos.hpp" #include "rmw_gurumdds_shared_cpp/rmw_common.hpp" #include "rmw_gurumdds_shared_cpp/topic_cache.hpp" #include "rmw_gurumdds_shared_cpp/visibility_control.h" enum EntityType {Publisher, Subscriber}; typedef struct _ListenerContext { std::mutex * mutex_; TopicCache * topic_cache; TopicCache * graph_cache; rmw_guard_condition_t * graph_guard_condition; const char * implementation_identifier; } ListenerContext; static void pdp_on_data_available(const dds_DataReader * a_reader) { dds_DataReader * reader = const_cast(a_reader); ListenerContext * context = reinterpret_cast(dds_DataReader_get_listener_context(reader)); if (context == nullptr) { return; } std::lock_guard lock(*context->mutex_); dds_DataSeq * samples = dds_DataSeq_create(8); if (samples == nullptr) { fprintf(stderr, "failed to create data sample sequence\n"); return; } dds_SampleInfoSeq * infos = dds_SampleInfoSeq_create(8); if (infos == nullptr) { dds_DataSeq_delete(samples); fprintf(stderr, "failed to create sample info sequence\n"); return; } dds_ReturnCode_t ret = dds_DataReader_take( reader, samples, infos, 8, dds_ANY_SAMPLE_STATE, dds_ANY_VIEW_STATE, dds_ANY_INSTANCE_STATE); if (ret == dds_RETCODE_NO_DATA) { dds_DataReader_return_loan(reader, samples, infos); dds_DataSeq_delete(samples); dds_SampleInfoSeq_delete(infos); return; } if (ret != dds_RETCODE_OK) { fprintf(stderr, "failed to access data from the built-in participant reader\n"); dds_DataReader_return_loan(reader, samples, infos); dds_DataSeq_delete(samples); dds_SampleInfoSeq_delete(infos); return; } for (dds_UnsignedLong i = 0; i < dds_DataSeq_length(samples); ++i) { GuidPrefix_t participant_guid; dds_SampleInfo * info = dds_SampleInfoSeq_get(infos, i); if (reinterpret_cast(info->instance_handle) == NULL) { continue; } memcpy(participant_guid.value, reinterpret_cast(info->instance_handle), 16); if (info->valid_data && info->instance_state == dds_ALIVE_INSTANCE_STATE) { continue; } else { if (context->graph_cache->remove_topic_by_puid(participant_guid) <= 0) { continue; } rmw_ret_t rmw_ret = shared__rmw_trigger_guard_condition( context->implementation_identifier, context->graph_guard_condition); if (rmw_ret != RMW_RET_OK) { fprintf( stderr, "failed to trigger graph guard condition: %s\n", rmw_get_error_string().str); } } } dds_DataReader_return_loan(reader, samples, infos); dds_DataSeq_delete(samples); dds_SampleInfoSeq_delete(infos); dds_DataReader_set_listener_context(reader, context); } static void pub_on_data_available(const dds_DataReader * a_reader) { dds_DataReader * reader = const_cast(a_reader); ListenerContext * context = reinterpret_cast(dds_DataReader_get_listener_context(reader)); if (context == nullptr) { return; } std::lock_guard lock(*context->mutex_); dds_DataSeq * samples = dds_DataSeq_create(8); if (samples == nullptr) { fprintf(stderr, "failed to create data sample sequence\n"); return; } dds_SampleInfoSeq * infos = dds_SampleInfoSeq_create(8); if (infos == nullptr) { dds_DataSeq_delete(samples); fprintf(stderr, "failed to create sample info sequence\n"); return; } dds_ReturnCode_t ret = dds_DataReader_take( reader, samples, infos, dds_LENGTH_UNLIMITED, dds_ANY_SAMPLE_STATE, dds_ANY_VIEW_STATE, dds_ANY_INSTANCE_STATE); if (ret == dds_RETCODE_NO_DATA) { dds_DataReader_return_loan(reader, samples, infos); dds_DataSeq_delete(samples); dds_SampleInfoSeq_delete(infos); return; } if (ret != dds_RETCODE_OK) { fprintf(stderr, "failed to access data from the built-in publication reader\n"); dds_DataReader_return_loan(reader, samples, infos); dds_DataSeq_delete(samples); dds_SampleInfoSeq_delete(infos); return; } for (dds_UnsignedLong i = 0; i < dds_DataSeq_length(samples); ++i) { std::string topic_name, type_name; GuidPrefix_t guid, participant_guid; dds_PublicationBuiltinTopicData * pbtd = reinterpret_cast(dds_DataSeq_get(samples, i)); dds_SampleInfo * info = dds_SampleInfoSeq_get(infos, i); if (reinterpret_cast(info->instance_handle) == NULL) { continue; } memcpy(guid.value, reinterpret_cast(info->instance_handle), 16); if (info->valid_data && info->instance_state == dds_ALIVE_INSTANCE_STATE) { dds_BuiltinTopicKey_to_GUID(&participant_guid, pbtd->participant_key); topic_name = std::string(pbtd->topic_name); type_name = std::string(pbtd->type_name); rmw_qos_profile_t qos = { RMW_QOS_POLICY_HISTORY_UNKNOWN, // TODO(clemjh): pbtd doesn't contain history qos policy RMW_QOS_POLICY_DEPTH_SYSTEM_DEFAULT, convert_reliability(pbtd->reliability), convert_durability(pbtd->durability), convert_deadline(pbtd->deadline), convert_lifespan(pbtd->lifespan), convert_liveliness(pbtd->liveliness), convert_liveliness_lease_duration(pbtd->liveliness), false, }; context->topic_cache->add_topic( participant_guid, guid, std::move(topic_name), std::move(type_name), qos); context->graph_cache->add_topic( participant_guid, guid, std::move(topic_name), std::move(type_name), qos); } else { context->topic_cache->remove_topic(guid); context->graph_cache->remove_topic(guid); } } if (dds_DataSeq_length(samples) > 0) { rmw_ret_t rmw_ret = shared__rmw_trigger_guard_condition( context->implementation_identifier, context->graph_guard_condition); if (rmw_ret != RMW_RET_OK) { fprintf(stderr, "failed to trigger graph guard condition: %s\n", rmw_get_error_string().str); } } dds_DataReader_return_loan(reader, samples, infos); dds_DataSeq_delete(samples); dds_SampleInfoSeq_delete(infos); dds_DataReader_set_listener_context(reader, context); } static void sub_on_data_available(const dds_DataReader * a_reader) { dds_DataReader * reader = const_cast(a_reader); ListenerContext * context = reinterpret_cast(dds_DataReader_get_listener_context(reader)); if (context == nullptr) { return; } std::lock_guard lock(*context->mutex_); dds_DataSeq * samples = dds_DataSeq_create(8); if (samples == nullptr) { fprintf(stderr, "failed to create data sample sequence\n"); return; } dds_SampleInfoSeq * infos = dds_SampleInfoSeq_create(8); if (infos == nullptr) { dds_DataSeq_delete(samples); fprintf(stderr, "failed to create sample info sequence\n"); return; } dds_ReturnCode_t ret = dds_DataReader_take( reader, samples, infos, dds_LENGTH_UNLIMITED, dds_ANY_SAMPLE_STATE, dds_ANY_VIEW_STATE, dds_ANY_INSTANCE_STATE); if (ret == dds_RETCODE_NO_DATA) { dds_DataReader_return_loan(reader, samples, infos); dds_DataSeq_delete(samples); dds_SampleInfoSeq_delete(infos); return; } if (ret != dds_RETCODE_OK) { fprintf(stderr, "failed to access data from the built-in subscription reader\n"); dds_DataReader_return_loan(reader, samples, infos); dds_DataSeq_delete(samples); dds_SampleInfoSeq_delete(infos); return; } for (dds_UnsignedLong i = 0; i < dds_DataSeq_length(samples); ++i) { std::string topic_name, type_name; GuidPrefix_t guid, participant_guid; dds_SubscriptionBuiltinTopicData * sbtd = reinterpret_cast(dds_DataSeq_get(samples, i)); dds_SampleInfo * info = dds_SampleInfoSeq_get(infos, i); if (reinterpret_cast(info->instance_handle) == NULL) { continue; } memcpy(guid.value, reinterpret_cast(info->instance_handle), 16); if (info->valid_data && info->instance_state == dds_ALIVE_INSTANCE_STATE) { dds_BuiltinTopicKey_to_GUID(&participant_guid, sbtd->participant_key); topic_name = sbtd->topic_name; type_name = sbtd->type_name; rmw_qos_profile_t qos = { RMW_QOS_POLICY_HISTORY_UNKNOWN, // TODO(clemjh): sbtd doesn't contain history qos policy RMW_QOS_POLICY_DEPTH_SYSTEM_DEFAULT, convert_reliability(sbtd->reliability), convert_durability(sbtd->durability), convert_deadline(sbtd->deadline), RMW_QOS_LIFESPAN_DEFAULT, convert_liveliness(sbtd->liveliness), convert_liveliness_lease_duration(sbtd->liveliness), false, }; context->topic_cache->add_topic( participant_guid, guid, std::move(topic_name), std::move(type_name), qos); context->graph_cache->add_topic( participant_guid, guid, std::move(topic_name), std::move(type_name), qos); } else { context->topic_cache->remove_topic(guid); context->graph_cache->remove_topic(guid); } } if (dds_DataSeq_length(samples) > 0) { rmw_ret_t rmw_ret = shared__rmw_trigger_guard_condition( context->implementation_identifier, context->graph_guard_condition); if (rmw_ret != RMW_RET_OK) { fprintf(stderr, "failed to trigger graph guard condition: %s\n", rmw_get_error_string().str); } } dds_DataReader_return_loan(reader, samples, infos); dds_DataSeq_delete(samples); dds_SampleInfoSeq_delete(infos); dds_DataReader_set_listener_context(reader, context); } class GurumddsDataReaderListener { public: explicit GurumddsDataReaderListener( const char * implementation_identifier, rmw_guard_condition_t * graph_guard_condition) : graph_guard_condition(graph_guard_condition), implementation_identifier(implementation_identifier) {} virtual ~GurumddsDataReaderListener() = default; RMW_GURUMDDS_SHARED_CPP_PUBLIC virtual void add_information( const GuidPrefix_t & participant_guid, const GuidPrefix_t & topic_guid, const std::string & topic_name, const std::string & type_name, rmw_qos_profile_t & qos, EntityType entity_type); RMW_GURUMDDS_SHARED_CPP_PUBLIC virtual void remove_information( const GuidPrefix_t & topic_guid, const EntityType entity_type); RMW_GURUMDDS_SHARED_CPP_PUBLIC virtual void trigger_graph_guard_condition(void); size_t count_topic(const char * topic_name); void fill_topic_names_and_types( bool no_demangle, std::map> & topic_names_to_types); void fill_service_names_and_types( std::map> & services); void fill_topic_names_and_types_by_guid( bool no_demangle, std::map> & topic_names_to_types_by_guid, GuidPrefix_t & participant_guid); void fill_service_names_and_types_by_guid( std::map> & services, GuidPrefix_t & participant_guid, const std::string suffix); dds_DataReaderListener dds_listener; ListenerContext context; dds_DataReader * dds_reader; std::mutex mutex_; TopicCache topic_cache; rmw_guard_condition_t * graph_guard_condition; const char * implementation_identifier; }; class GurumddsParticipantListener : public GurumddsDataReaderListener { public: GurumddsParticipantListener( const char * implementation_identifier, rmw_guard_condition_t * graph_guard_condition) : GurumddsDataReaderListener(implementation_identifier, graph_guard_condition) { context.mutex_ = &(this->mutex_); context.topic_cache = &(this->topic_cache); context.graph_guard_condition = this->graph_guard_condition; context.implementation_identifier = this->implementation_identifier; dds_listener.on_data_available = pdp_on_data_available; } ~GurumddsParticipantListener() {} }; class GurumddsPublisherListener : public GurumddsDataReaderListener { public: GurumddsPublisherListener( const char * implementation_identifier, rmw_guard_condition_t * graph_guard_condition) : GurumddsDataReaderListener(implementation_identifier, graph_guard_condition) { context.mutex_ = &(this->mutex_); context.topic_cache = &(this->topic_cache); context.graph_guard_condition = this->graph_guard_condition; context.implementation_identifier = this->implementation_identifier; dds_listener.on_data_available = pub_on_data_available; } ~GurumddsPublisherListener() {} }; class GurumddsSubscriberListener : public GurumddsDataReaderListener { public: GurumddsSubscriberListener( const char * implementation_identifier, rmw_guard_condition_t * graph_guard_condition) : GurumddsDataReaderListener(implementation_identifier, graph_guard_condition) { context.mutex_ = &(this->mutex_); context.topic_cache = &(this->topic_cache); context.graph_guard_condition = this->graph_guard_condition; context.implementation_identifier = this->implementation_identifier; dds_listener.on_data_available = sub_on_data_available; } ~GurumddsSubscriberListener() {} }; typedef struct _GurumddsNodeInfo { dds_DomainParticipant * participant; rmw_guard_condition_t * graph_guard_condition; GurumddsParticipantListener * part_listener; GurumddsPublisherListener * pub_listener; GurumddsSubscriberListener * sub_listener; std::list pub_list; std::list sub_list; } GurumddsNodeInfo; typedef struct _GurumddsWaitSetInfo { dds_WaitSet * wait_set; dds_ConditionSeq * active_conditions; dds_ConditionSeq * attached_conditions; } GurumddsWaitSetInfo; typedef struct _GurumddsEventInfo { virtual ~_GurumddsEventInfo() = default; virtual rmw_ret_t get_status(const dds_StatusMask mask, void * event) = 0; virtual dds_StatusCondition * get_statuscondition() = 0; virtual dds_StatusMask get_status_changes() = 0; } GurumddsEventInfo; #endif // RMW_GURUMDDS_SHARED_CPP__TYPES_HPP_