.. _program_listing_file__tmp_ws_src_rmw_connextdds_rmw_connextdds_common_include_rmw_connextdds_rmw_waitset_std.hpp: Program Listing for File rmw_waitset_std.hpp ============================================ |exhale_lsh| :ref:`Return to documentation for file ` (``/tmp/ws/src/rmw_connextdds/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp // Copyright 2020 Real-Time Innovations, Inc. (RTI) // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #ifndef RMW_CONNEXTDDS__RMW_WAITSET_STD_HPP_ #define RMW_CONNEXTDDS__RMW_WAITSET_STD_HPP_ #include "rmw_connextdds/context.hpp" /****************************************************************************** * Alternative implementation of WaitSets and Conditions using C++ std ******************************************************************************/ class RMW_Connext_WaitSet { public: RMW_Connext_WaitSet() {} rmw_ret_t wait( rmw_subscriptions_t * const subs, rmw_guard_conditions_t * const gcs, rmw_services_t * const srvs, rmw_clients_t * const cls, rmw_events_t * const evs, const rmw_time_t * const wait_timeout); protected: void attach( rmw_subscriptions_t * const subs, rmw_guard_conditions_t * const gcs, rmw_services_t * const srvs, rmw_clients_t * const cls, rmw_events_t * const evs, bool & already_active); void detach( rmw_subscriptions_t * const subs, rmw_guard_conditions_t * const gcs, rmw_services_t * const srvs, rmw_clients_t * const cls, rmw_events_t * const evs, size_t & active_conditions); bool on_condition_active( rmw_subscriptions_t * const subs, rmw_guard_conditions_t * const gcs, rmw_services_t * const srvs, rmw_clients_t * const cls, rmw_events_t * const evs); bool waiting{false}; std::mutex mutex_internal; std::condition_variable condition; }; class RMW_Connext_Condition { public: RMW_Connext_Condition() : mutex_internal(), waitset_mutex(nullptr), waitset_condition(nullptr) {} template void attach( std::mutex * const waitset_mutex, std::condition_variable * const waitset_condition, bool & already_active, FunctorT && check_trigger) { std::lock_guard lock(this->mutex_internal); already_active = check_trigger(); if (!already_active) { this->waitset_mutex = waitset_mutex; this->waitset_condition = waitset_condition; } } template void detach(FunctorT && on_detached) { std::lock_guard lock(this->mutex_internal); this->waitset_mutex = nullptr; this->waitset_condition = nullptr; on_detached(); } virtual bool owns(DDS_Condition * const cond) = 0; template void update_state(FunctorT && update_condition, const bool notify) { std::lock_guard internal_lock(this->mutex_internal); if (nullptr != this->waitset_mutex) { std::lock_guard lock(*this->waitset_mutex); update_condition(); } else { update_condition(); } if (notify && nullptr != this->waitset_condition) { this->waitset_condition->notify_one(); } } protected: std::mutex mutex_internal; std::mutex * waitset_mutex; std::condition_variable * waitset_condition; static rmw_ret_t _attach( DDS_WaitSet * const waitset, DDS_Condition * const dds_condition) { if (DDS_RETCODE_OK != DDS_WaitSet_attach_condition(waitset, dds_condition)) { RMW_CONNEXT_LOG_ERROR_SET("failed to attach condition to waitset") return RMW_RET_ERROR; } return RMW_RET_OK; } static rmw_ret_t _detach( DDS_WaitSet * const waitset, DDS_Condition * const dds_condition) { // detach_condition() returns BAD_PARAMETER if the condition is not attached DDS_ReturnCode_t rc = DDS_WaitSet_detach_condition(waitset, dds_condition); if (DDS_RETCODE_OK != rc && DDS_RETCODE_BAD_PARAMETER != rc && DDS_RETCODE_PRECONDITION_NOT_MET != rc) { RMW_CONNEXT_LOG_ERROR_A_SET( "failed to detach condition from waitset: %d", rc) return RMW_RET_ERROR; } return RMW_RET_OK; } friend class RMW_Connext_WaitSet; friend class RMW_Connext_Event; }; class RMW_Connext_GuardCondition : public RMW_Connext_Condition { public: explicit RMW_Connext_GuardCondition(const bool internal) : trigger_value(false), internal(internal), gcond(nullptr) { if (this->internal) { this->gcond = DDS_GuardCondition_new(); if (nullptr == this->gcond) { RMW_CONNEXT_LOG_ERROR_SET("failed to allocate dds guard condition") } } } virtual ~RMW_Connext_GuardCondition() { if (nullptr != this->gcond) { DDS_GuardCondition_delete(this->gcond); } } DDS_GuardCondition * guard_condition() const { return this->gcond; } rmw_ret_t trigger() { if (internal) { if (DDS_RETCODE_OK != DDS_GuardCondition_set_trigger_value( this->gcond, DDS_BOOLEAN_TRUE)) { RMW_CONNEXT_LOG_ERROR_SET("failed to trigger internal guard condition") return RMW_RET_ERROR; } return RMW_RET_OK; } update_state( [this]() { this->trigger_value = true; }, true /* notify */); return RMW_RET_OK; } virtual rmw_ret_t _attach(DDS_WaitSet * const waitset) { return RMW_Connext_Condition::_attach( waitset, DDS_GuardCondition_as_condition(this->gcond)); } virtual rmw_ret_t _detach(DDS_WaitSet * const waitset) { return RMW_Connext_Condition::_detach( waitset, DDS_GuardCondition_as_condition(this->gcond)); } virtual bool owns(DDS_Condition * const cond) { return cond == DDS_GuardCondition_as_condition(this->gcond); } protected: bool trigger_value; bool internal; DDS_GuardCondition * gcond; friend class RMW_Connext_WaitSet; }; class RMW_Connext_StatusCondition : public RMW_Connext_Condition { public: explicit RMW_Connext_StatusCondition( DDS_Entity * const entity) : scond(DDS_Entity_get_statuscondition(entity)), status_inconsistent_topic(DDS_InconsistentTopicStatus_INITIALIZER) { this->scond = DDS_Entity_get_statuscondition(entity); if (nullptr == this->scond) { RMW_CONNEXT_LOG_ERROR_SET("failed to get DDS entity's condition") throw new std::runtime_error("failed to get DDS entity's condition"); } } rmw_ret_t reset_statuses() { if (DDS_RETCODE_OK != DDS_StatusCondition_set_enabled_statuses( this->scond, DDS_STATUS_MASK_NONE)) { RMW_CONNEXT_LOG_ERROR_SET("failed to reset status condition's statuses") return RMW_RET_ERROR; } return RMW_RET_OK; } rmw_ret_t enable_statuses(const DDS_StatusMask statuses) { DDS_StatusMask current_statuses = DDS_StatusCondition_get_enabled_statuses(this->scond); current_statuses |= statuses; if (DDS_RETCODE_OK != DDS_StatusCondition_set_enabled_statuses(this->scond, current_statuses)) { RMW_CONNEXT_LOG_ERROR_SET("failed to enable status condition's statuses") return RMW_RET_ERROR; } return RMW_RET_OK; } rmw_ret_t disable_statuses(const DDS_StatusMask statuses) { DDS_StatusMask current_statuses = DDS_StatusCondition_get_enabled_statuses(this->scond); current_statuses &= ~statuses; if (DDS_RETCODE_OK != DDS_StatusCondition_set_enabled_statuses(this->scond, current_statuses)) { RMW_CONNEXT_LOG_ERROR_SET("failed to disable status condition's statuses") return RMW_RET_ERROR; } return RMW_RET_OK; } virtual rmw_ret_t _attach(DDS_WaitSet * const waitset) { return RMW_Connext_Condition::_attach( waitset, DDS_StatusCondition_as_condition(this->scond)); } virtual rmw_ret_t _detach(DDS_WaitSet * const waitset) { return RMW_Connext_Condition::_detach( waitset, DDS_StatusCondition_as_condition(this->scond)); } virtual bool owns(DDS_Condition * const cond) { return cond == DDS_StatusCondition_as_condition(this->scond); } DDS_Condition * dds_condition() const { return DDS_StatusCondition_as_condition(this->scond); } virtual rmw_ret_t get_status(const rmw_event_type_t event_type, void * const event_info) = 0; // No-op to match API in DDS-based implementation. inline void invalidate() {} inline rmw_ret_t attach_data() { return RMW_RET_OK; } virtual bool has_status(const rmw_event_type_t event_type) = 0; void on_inconsistent_topic(const struct DDS_InconsistentTopicStatus * status); void update_status_inconsistent_topic(const struct DDS_InconsistentTopicStatus * status); inline rmw_ret_t get_incompatible_type_status( rmw_incompatible_type_status_t * const status) { update_state( [this, status]() { status->total_count = this->status_inconsistent_topic.total_count; status->total_count_change = this->status_inconsistent_topic.total_count_change; this->triggered_inconsistent_topic = false; this->status_inconsistent_topic.total_count_change = 0; }, false /* notify */); return RMW_RET_OK; } protected: DDS_StatusCondition * scond; bool triggered_inconsistent_topic{false}; struct DDS_InconsistentTopicStatus status_inconsistent_topic; }; void RMW_Connext_DataWriterListener_offered_deadline_missed( void * listener_data, DDS_DataWriter * writer, const struct DDS_OfferedDeadlineMissedStatus * status); void RMW_Connext_DataWriterListener_offered_incompatible_qos( void * listener_data, DDS_DataWriter * writer, const struct DDS_OfferedIncompatibleQosStatus * status); void RMW_Connext_DataWriterListener_liveliness_lost( void * listener_data, DDS_DataWriter * writer, const struct DDS_LivelinessLostStatus * status); void RMW_Connext_DataWriterListener_matched( void * listener_data, DDS_DataWriter * writer, const struct DDS_PublicationMatchedStatus * status); class RMW_Connext_PublisherStatusCondition : public RMW_Connext_StatusCondition { public: explicit RMW_Connext_PublisherStatusCondition(DDS_DataWriter * const writer); virtual bool has_status(const rmw_event_type_t event_type); virtual rmw_ret_t get_status(const rmw_event_type_t event_type, void * const event_info); friend void RMW_Connext_DataWriterListener_offered_deadline_missed( void * listener_data, DDS_DataWriter * writer, const struct DDS_OfferedDeadlineMissedStatus * status); friend void RMW_Connext_DataWriterListener_offered_incompatible_qos( void * listener_data, DDS_DataWriter * writer, const struct DDS_OfferedIncompatibleQosStatus * status); friend void RMW_Connext_DataWriterListener_liveliness_lost( void * listener_data, DDS_DataWriter * writer, const struct DDS_LivelinessLostStatus * status); rmw_ret_t install(RMW_Connext_Publisher * const pub); void on_offered_deadline_missed( const DDS_OfferedDeadlineMissedStatus * const status); void on_offered_incompatible_qos( const DDS_OfferedIncompatibleQosStatus * const status); void on_liveliness_lost( const DDS_LivelinessLostStatus * const status); void on_matched( const DDS_PublicationMatchedStatus * const status); // Helper functions to retrieve status information inline rmw_ret_t get_liveliness_lost_status(rmw_liveliness_lost_status_t * const status) { update_state( [this, status]() { this->triggered_liveliness = false; status->total_count = this->status_liveliness.total_count; status->total_count_change = this->status_liveliness.total_count_change; this->status_liveliness.total_count_change = 0; this->status_liveliness_last = this->status_liveliness; }, false /* notify */); return RMW_RET_OK; } inline rmw_ret_t get_offered_deadline_missed_status( rmw_offered_deadline_missed_status_t * const status) { update_state( [this, status]() { this->triggered_deadline = false; status->total_count = this->status_deadline.total_count; status->total_count_change = this->status_deadline.total_count_change; this->status_deadline.total_count_change = 0; this->status_deadline_last = this->status_deadline; }, false /* notify */); return RMW_RET_OK; } inline rmw_ret_t get_offered_qos_incompatible_status( rmw_offered_qos_incompatible_event_status_t * const status) { update_state( [this, status]() { this->triggered_qos = false; status->total_count = this->status_qos.total_count; status->total_count_change = this->status_qos.total_count_change; status->last_policy_kind = dds_qos_policy_to_rmw_qos_policy(this->status_qos.last_policy_id); this->status_qos.total_count_change = 0; this->status_qos_last = this->status_qos; }, false /* notify */); return RMW_RET_OK; } inline rmw_ret_t get_matched_status( rmw_matched_status_t * const status) { update_state( [this, status]() { this->triggered_matched = false; status->total_count = this->status_matched.total_count; status->total_count_change = this->status_matched.total_count_change; status->current_count = this->status_matched.current_count; status->current_count_change = this->status_matched.current_count_change; this->status_matched.total_count_change = 0; this->status_matched.current_count_change = 0; this->status_matched_last = this->status_matched; }, false /* notify */); return RMW_RET_OK; } protected: void update_status_deadline( const DDS_OfferedDeadlineMissedStatus * const status); void update_status_liveliness( const DDS_LivelinessLostStatus * const status); void update_status_qos( const DDS_OfferedIncompatibleQosStatus * const status); void update_status_matched( const DDS_PublicationMatchedStatus * const status); bool triggered_deadline{false}; bool triggered_liveliness{false}; bool triggered_qos{false}; bool triggered_matched{false}; DDS_OfferedDeadlineMissedStatus status_deadline; DDS_OfferedIncompatibleQosStatus status_qos; DDS_LivelinessLostStatus status_liveliness; DDS_PublicationMatchedStatus status_matched; DDS_OfferedDeadlineMissedStatus status_deadline_last; DDS_OfferedIncompatibleQosStatus status_qos_last; DDS_LivelinessLostStatus status_liveliness_last; DDS_PublicationMatchedStatus status_matched_last; RMW_Connext_Publisher * pub; }; void RMW_Connext_DataReaderListener_requested_deadline_missed( void * listener_data, DDS_DataReader * reader, const struct DDS_RequestedDeadlineMissedStatus * status); void RMW_Connext_DataReaderListener_requested_incompatible_qos( void * listener_data, DDS_DataReader * reader, const struct DDS_RequestedIncompatibleQosStatus * status); void RMW_Connext_DataReaderListener_liveliness_changed( void * listener_data, DDS_DataReader * reader, const struct DDS_LivelinessChangedStatus * status); void RMW_Connext_DataReaderListener_sample_lost( void * listener_data, DDS_DataReader * reader, const struct DDS_SampleLostStatus * status); void RMW_Connext_DataReaderListener_on_data_available( void * listener_data, DDS_DataReader * reader); class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition { public: RMW_Connext_SubscriberStatusCondition( DDS_DataReader * const reader, const bool ignore_local, const bool internal); virtual ~RMW_Connext_SubscriberStatusCondition(); virtual bool has_status(const rmw_event_type_t event_type); virtual rmw_ret_t get_status(const rmw_event_type_t event_type, void * const event_info); friend void RMW_Connext_DataReaderListener_requested_deadline_missed( void * listener_data, DDS_DataReader * reader, const struct DDS_RequestedDeadlineMissedStatus * status); friend void RMW_Connext_DataReaderListener_requested_incompatible_qos( void * listener_data, DDS_DataReader * reader, const struct DDS_RequestedIncompatibleQosStatus * status); friend void RMW_Connext_DataReaderListener_liveliness_changed( void * listener_data, DDS_DataReader * reader, const struct DDS_LivelinessChangedStatus * status); friend void RMW_Connext_DataReaderListener_sample_lost( void * listener_data, DDS_DataReader * reader, const struct DDS_SampleLostStatus * status); friend void RMW_Connext_DataReaderListener_on_data_available( void * listener_data, DDS_DataReader * reader); rmw_ret_t install(RMW_Connext_Subscriber * const sub); void on_requested_deadline_missed( const DDS_RequestedDeadlineMissedStatus * const status); void on_requested_incompatible_qos( const DDS_RequestedIncompatibleQosStatus * const status); void on_liveliness_changed(const DDS_LivelinessChangedStatus * const status); void on_sample_lost(const DDS_SampleLostStatus * const status); void on_matched(const DDS_SubscriptionMatchedStatus * const status); const bool ignore_local; const DDS_InstanceHandle_t participant_handle; // Methods to match the DDS-based implementation rmw_ret_t set_data_available(const bool available) { update_state( [this, available]() { this->triggered_data = available; }, true /* notify */); if (nullptr != this->loan_guard_condition) { if (DDS_RETCODE_OK != DDS_GuardCondition_set_trigger_value(this->loan_guard_condition, available)) { RMW_CONNEXT_LOG_ERROR_SET("failed to set internal reader condition's trigger") return RMW_RET_ERROR; } } return RMW_RET_OK; } virtual bool owns(DDS_Condition * const cond) { return RMW_Connext_StatusCondition::owns(cond) || (nullptr != this->loan_guard_condition && cond == DDS_GuardCondition_as_condition(this->loan_guard_condition)); } virtual rmw_ret_t _attach(DDS_WaitSet * const waitset) { rmw_ret_t rc = RMW_Connext_StatusCondition::_attach(waitset); if (RMW_RET_OK != rc) { return rc; } if (nullptr != this->loan_guard_condition) { return RMW_Connext_Condition::_attach( waitset, DDS_GuardCondition_as_condition(this->loan_guard_condition)); } return RMW_RET_OK; } virtual rmw_ret_t _detach(DDS_WaitSet * const waitset) { rmw_ret_t rc = RMW_Connext_StatusCondition::_detach(waitset); if (RMW_RET_OK != rc) { return rc; } if (nullptr != this->loan_guard_condition) { return RMW_Connext_Condition::_detach( waitset, DDS_GuardCondition_as_condition(this->loan_guard_condition)); } return RMW_RET_OK; } // Helper functions to retrieve status information inline rmw_ret_t get_liveliness_changed_status(rmw_liveliness_changed_status_t * const status) { update_state( [this, status]() { this->triggered_liveliness = false; status->alive_count = this->status_liveliness.alive_count; status->alive_count_change = this->status_liveliness.alive_count_change; status->not_alive_count = this->status_liveliness.not_alive_count; status->not_alive_count_change = this->status_liveliness.not_alive_count_change; this->status_liveliness.alive_count_change = 0; this->status_liveliness.not_alive_count_change = 0; this->status_liveliness_last = this->status_liveliness; }, false /* notify */); return RMW_RET_OK; } inline rmw_ret_t get_requested_deadline_missed_status( rmw_requested_deadline_missed_status_t * const status) { update_state( [this, status]() { this->triggered_deadline = false; status->total_count = this->status_deadline.total_count; status->total_count_change = this->status_deadline.total_count_change; this->status_deadline.total_count_change = 0; this->status_deadline_last = this->status_deadline; }, false /* notify */); return RMW_RET_OK; } inline rmw_ret_t get_requested_qos_incompatible_status( rmw_requested_qos_incompatible_event_status_t * const status) { update_state( [this, status]() { this->triggered_qos = false; status->total_count = this->status_qos.total_count; status->total_count_change = this->status_qos.total_count_change; status->last_policy_kind = dds_qos_policy_to_rmw_qos_policy(this->status_qos.last_policy_id); this->status_qos.total_count_change = 0; this->status_qos_last = this->status_qos; }, false /* notify */); return RMW_RET_OK; } inline rmw_ret_t get_message_lost_status(rmw_message_lost_status_t * const status) { update_state( [this, status]() { this->triggered_sample_lost = false; status->total_count = this->status_sample_lost.total_count; status->total_count_change = this->status_sample_lost.total_count_change; this->status_sample_lost.total_count_change = 0; this->status_sample_lost_last = this->status_sample_lost; }, false /* notify */); return RMW_RET_OK; } inline rmw_ret_t get_matched_status(rmw_matched_status_t * const status) { update_state( [this, status]() { this->triggered_matched = false; status->total_count = static_cast(this->status_matched.total_count); status->total_count_change = static_cast(this->status_matched.total_count_change); status->current_count = static_cast(this->status_matched.current_count); status->current_count_change = this->status_matched.current_count_change; this->status_matched.total_count_change = 0; this->status_matched.current_count_change = 0; this->status_matched_last = this->status_matched; }, false /* notify */); return RMW_RET_OK; } protected: void update_status_deadline( const DDS_RequestedDeadlineMissedStatus * const status); void update_status_liveliness( const DDS_LivelinessChangedStatus * const status); void update_status_qos( const DDS_RequestedIncompatibleQosStatus * const status); void update_status_sample_lost( const DDS_SampleLostStatus * const status); void update_status_matched( const DDS_SubscriptionMatchedStatus * const status); DDS_GuardCondition * const loan_guard_condition; bool triggered_deadline{false}; bool triggered_liveliness{false}; bool triggered_qos{false}; bool triggered_sample_lost{false}; bool triggered_matched{false}; bool triggered_data{false}; DDS_RequestedDeadlineMissedStatus status_deadline; DDS_RequestedIncompatibleQosStatus status_qos; DDS_LivelinessChangedStatus status_liveliness; DDS_SampleLostStatus status_sample_lost; DDS_SubscriptionMatchedStatus status_matched; DDS_RequestedDeadlineMissedStatus status_deadline_last; DDS_RequestedIncompatibleQosStatus status_qos_last; DDS_LivelinessChangedStatus status_liveliness_last; DDS_SampleLostStatus status_sample_lost_last; DDS_SubscriptionMatchedStatus status_matched_last; RMW_Connext_Subscriber * sub; friend class RMW_Connext_WaitSet; }; /****************************************************************************** * Event support ******************************************************************************/ class RMW_Connext_Event { public: static rmw_ret_t enable(rmw_event_t * const event); static rmw_ret_t disable(rmw_event_t * const event); static DDS_Condition * condition(const rmw_event_t * const event); static bool writer_event(const rmw_event_t * const event) { return !ros_event_for_reader(event->event_type); } static bool reader_event(const rmw_event_t * const event) { return ros_event_for_reader(event->event_type); } static RMW_Connext_Publisher * publisher(const rmw_event_t * const event) { return reinterpret_cast(event->data); } static RMW_Connext_Subscriber * subscriber(const rmw_event_t * const event) { return reinterpret_cast(event->data); } }; #endif // RMW_CONNEXTDDS__RMW_WAITSET_STD_HPP_