Program Listing for File rmw_waitset_std.hpp
↰ Return to documentation for file (/tmp/ws/src/rmw_connextdds/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp
)
// Copyright 2020 Real-Time Innovations, Inc. (RTI)
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RMW_CONNEXTDDS__RMW_WAITSET_STD_HPP_
#define RMW_CONNEXTDDS__RMW_WAITSET_STD_HPP_
#include "rmw_connextdds/context.hpp"
/******************************************************************************
* Alternative implementation of WaitSets and Conditions using C++ std
******************************************************************************/
class RMW_Connext_WaitSet
{
public:
RMW_Connext_WaitSet() {}
rmw_ret_t
wait(
rmw_subscriptions_t * const subs,
rmw_guard_conditions_t * const gcs,
rmw_services_t * const srvs,
rmw_clients_t * const cls,
rmw_events_t * const evs,
const rmw_time_t * const wait_timeout);
protected:
void
attach(
rmw_subscriptions_t * const subs,
rmw_guard_conditions_t * const gcs,
rmw_services_t * const srvs,
rmw_clients_t * const cls,
rmw_events_t * const evs,
bool & already_active);
void
detach(
rmw_subscriptions_t * const subs,
rmw_guard_conditions_t * const gcs,
rmw_services_t * const srvs,
rmw_clients_t * const cls,
rmw_events_t * const evs,
size_t & active_conditions);
bool
on_condition_active(
rmw_subscriptions_t * const subs,
rmw_guard_conditions_t * const gcs,
rmw_services_t * const srvs,
rmw_clients_t * const cls,
rmw_events_t * const evs);
bool waiting{false};
std::mutex mutex_internal;
std::condition_variable condition;
};
class RMW_Connext_Condition
{
public:
RMW_Connext_Condition()
: mutex_internal(),
waitset_mutex(nullptr),
waitset_condition(nullptr)
{}
template<typename FunctorT>
void
attach(
std::mutex * const waitset_mutex,
std::condition_variable * const waitset_condition,
bool & already_active,
FunctorT && check_trigger)
{
std::lock_guard<std::mutex> lock(this->mutex_internal);
already_active = check_trigger();
if (!already_active) {
this->waitset_mutex = waitset_mutex;
this->waitset_condition = waitset_condition;
}
}
template<typename FunctorT>
void
detach(FunctorT && on_detached)
{
std::lock_guard<std::mutex> lock(this->mutex_internal);
this->waitset_mutex = nullptr;
this->waitset_condition = nullptr;
on_detached();
}
virtual bool owns(DDS_Condition * const cond) = 0;
template<typename FunctorT>
void
update_state(FunctorT && update_condition, const bool notify)
{
std::lock_guard<std::mutex> internal_lock(this->mutex_internal);
if (nullptr != this->waitset_mutex) {
std::lock_guard<std::mutex> lock(*this->waitset_mutex);
update_condition();
} else {
update_condition();
}
if (notify && nullptr != this->waitset_condition) {
this->waitset_condition->notify_one();
}
}
protected:
std::mutex mutex_internal;
std::mutex * waitset_mutex;
std::condition_variable * waitset_condition;
static rmw_ret_t
_attach(
DDS_WaitSet * const waitset,
DDS_Condition * const dds_condition)
{
if (DDS_RETCODE_OK != DDS_WaitSet_attach_condition(waitset, dds_condition)) {
RMW_CONNEXT_LOG_ERROR_SET("failed to attach condition to waitset")
return RMW_RET_ERROR;
}
return RMW_RET_OK;
}
static rmw_ret_t
_detach(
DDS_WaitSet * const waitset,
DDS_Condition * const dds_condition)
{
// detach_condition() returns BAD_PARAMETER if the condition is not attached
DDS_ReturnCode_t rc = DDS_WaitSet_detach_condition(waitset, dds_condition);
if (DDS_RETCODE_OK != rc &&
DDS_RETCODE_BAD_PARAMETER != rc &&
DDS_RETCODE_PRECONDITION_NOT_MET != rc)
{
RMW_CONNEXT_LOG_ERROR_A_SET(
"failed to detach condition from waitset: %d", rc)
return RMW_RET_ERROR;
}
return RMW_RET_OK;
}
friend class RMW_Connext_WaitSet;
friend class RMW_Connext_Event;
};
class RMW_Connext_GuardCondition : public RMW_Connext_Condition
{
public:
explicit RMW_Connext_GuardCondition(const bool internal)
: trigger_value(false),
internal(internal),
gcond(nullptr)
{
if (this->internal) {
this->gcond = DDS_GuardCondition_new();
if (nullptr == this->gcond) {
RMW_CONNEXT_LOG_ERROR_SET("failed to allocate dds guard condition")
}
}
}
virtual ~RMW_Connext_GuardCondition()
{
if (nullptr != this->gcond) {
DDS_GuardCondition_delete(this->gcond);
}
}
DDS_GuardCondition *
guard_condition() const
{
return this->gcond;
}
rmw_ret_t
trigger()
{
if (internal) {
if (DDS_RETCODE_OK !=
DDS_GuardCondition_set_trigger_value(
this->gcond, DDS_BOOLEAN_TRUE))
{
RMW_CONNEXT_LOG_ERROR_SET("failed to trigger internal guard condition")
return RMW_RET_ERROR;
}
return RMW_RET_OK;
}
update_state(
[this]() {
this->trigger_value = true;
}, true /* notify */);
return RMW_RET_OK;
}
virtual rmw_ret_t _attach(DDS_WaitSet * const waitset)
{
return RMW_Connext_Condition::_attach(
waitset, DDS_GuardCondition_as_condition(this->gcond));
}
virtual rmw_ret_t _detach(DDS_WaitSet * const waitset)
{
return RMW_Connext_Condition::_detach(
waitset, DDS_GuardCondition_as_condition(this->gcond));
}
virtual bool
owns(DDS_Condition * const cond)
{
return cond == DDS_GuardCondition_as_condition(this->gcond);
}
protected:
bool trigger_value;
bool internal;
DDS_GuardCondition * gcond;
friend class RMW_Connext_WaitSet;
};
class RMW_Connext_StatusCondition : public RMW_Connext_Condition
{
public:
explicit RMW_Connext_StatusCondition(
DDS_Entity * const entity)
: scond(DDS_Entity_get_statuscondition(entity)),
status_inconsistent_topic(DDS_InconsistentTopicStatus_INITIALIZER)
{
this->scond = DDS_Entity_get_statuscondition(entity);
if (nullptr == this->scond) {
RMW_CONNEXT_LOG_ERROR_SET("failed to get DDS entity's condition")
throw new std::runtime_error("failed to get DDS entity's condition");
}
}
rmw_ret_t
reset_statuses()
{
if (DDS_RETCODE_OK !=
DDS_StatusCondition_set_enabled_statuses(
this->scond, DDS_STATUS_MASK_NONE))
{
RMW_CONNEXT_LOG_ERROR_SET("failed to reset status condition's statuses")
return RMW_RET_ERROR;
}
return RMW_RET_OK;
}
rmw_ret_t
enable_statuses(const DDS_StatusMask statuses)
{
DDS_StatusMask current_statuses =
DDS_StatusCondition_get_enabled_statuses(this->scond);
current_statuses |= statuses;
if (DDS_RETCODE_OK !=
DDS_StatusCondition_set_enabled_statuses(this->scond, current_statuses))
{
RMW_CONNEXT_LOG_ERROR_SET("failed to enable status condition's statuses")
return RMW_RET_ERROR;
}
return RMW_RET_OK;
}
rmw_ret_t
disable_statuses(const DDS_StatusMask statuses)
{
DDS_StatusMask current_statuses =
DDS_StatusCondition_get_enabled_statuses(this->scond);
current_statuses &= ~statuses;
if (DDS_RETCODE_OK !=
DDS_StatusCondition_set_enabled_statuses(this->scond, current_statuses))
{
RMW_CONNEXT_LOG_ERROR_SET("failed to disable status condition's statuses")
return RMW_RET_ERROR;
}
return RMW_RET_OK;
}
virtual rmw_ret_t _attach(DDS_WaitSet * const waitset)
{
return RMW_Connext_Condition::_attach(
waitset, DDS_StatusCondition_as_condition(this->scond));
}
virtual rmw_ret_t _detach(DDS_WaitSet * const waitset)
{
return RMW_Connext_Condition::_detach(
waitset, DDS_StatusCondition_as_condition(this->scond));
}
virtual bool
owns(DDS_Condition * const cond)
{
return cond == DDS_StatusCondition_as_condition(this->scond);
}
DDS_Condition *
dds_condition() const
{
return DDS_StatusCondition_as_condition(this->scond);
}
virtual rmw_ret_t
get_status(const rmw_event_type_t event_type, void * const event_info) = 0;
// No-op to match API in DDS-based implementation.
inline void
invalidate() {}
inline rmw_ret_t
attach_data()
{
return RMW_RET_OK;
}
virtual bool
has_status(const rmw_event_type_t event_type) = 0;
void
on_inconsistent_topic(const struct DDS_InconsistentTopicStatus * status);
void
update_status_inconsistent_topic(const struct DDS_InconsistentTopicStatus * status);
inline rmw_ret_t
get_incompatible_type_status(
rmw_incompatible_type_status_t * const status)
{
update_state(
[this, status]() {
status->total_count = this->status_inconsistent_topic.total_count;
status->total_count_change = this->status_inconsistent_topic.total_count_change;
this->triggered_inconsistent_topic = false;
this->status_inconsistent_topic.total_count_change = 0;
}, false /* notify */);
return RMW_RET_OK;
}
protected:
DDS_StatusCondition * scond;
bool triggered_inconsistent_topic{false};
struct DDS_InconsistentTopicStatus status_inconsistent_topic;
};
void
RMW_Connext_DataWriterListener_offered_deadline_missed(
void * listener_data,
DDS_DataWriter * writer,
const struct DDS_OfferedDeadlineMissedStatus * status);
void
RMW_Connext_DataWriterListener_offered_incompatible_qos(
void * listener_data,
DDS_DataWriter * writer,
const struct DDS_OfferedIncompatibleQosStatus * status);
void
RMW_Connext_DataWriterListener_liveliness_lost(
void * listener_data,
DDS_DataWriter * writer,
const struct DDS_LivelinessLostStatus * status);
void
RMW_Connext_DataWriterListener_matched(
void * listener_data,
DDS_DataWriter * writer,
const struct DDS_PublicationMatchedStatus * status);
class RMW_Connext_PublisherStatusCondition : public RMW_Connext_StatusCondition
{
public:
explicit RMW_Connext_PublisherStatusCondition(DDS_DataWriter * const writer);
virtual bool
has_status(const rmw_event_type_t event_type);
virtual rmw_ret_t
get_status(const rmw_event_type_t event_type, void * const event_info);
friend
void
RMW_Connext_DataWriterListener_offered_deadline_missed(
void * listener_data,
DDS_DataWriter * writer,
const struct DDS_OfferedDeadlineMissedStatus * status);
friend
void
RMW_Connext_DataWriterListener_offered_incompatible_qos(
void * listener_data,
DDS_DataWriter * writer,
const struct DDS_OfferedIncompatibleQosStatus * status);
friend
void
RMW_Connext_DataWriterListener_liveliness_lost(
void * listener_data,
DDS_DataWriter * writer,
const struct DDS_LivelinessLostStatus * status);
rmw_ret_t
install(RMW_Connext_Publisher * const pub);
void
on_offered_deadline_missed(
const DDS_OfferedDeadlineMissedStatus * const status);
void
on_offered_incompatible_qos(
const DDS_OfferedIncompatibleQosStatus * const status);
void
on_liveliness_lost(
const DDS_LivelinessLostStatus * const status);
void
on_matched(
const DDS_PublicationMatchedStatus * const status);
// Helper functions to retrieve status information
inline rmw_ret_t
get_liveliness_lost_status(rmw_liveliness_lost_status_t * const status)
{
update_state(
[this, status]() {
this->triggered_liveliness = false;
status->total_count = this->status_liveliness.total_count;
status->total_count_change = this->status_liveliness.total_count_change;
this->status_liveliness.total_count_change = 0;
this->status_liveliness_last = this->status_liveliness;
}, false /* notify */);
return RMW_RET_OK;
}
inline rmw_ret_t
get_offered_deadline_missed_status(
rmw_offered_deadline_missed_status_t * const status)
{
update_state(
[this, status]() {
this->triggered_deadline = false;
status->total_count = this->status_deadline.total_count;
status->total_count_change = this->status_deadline.total_count_change;
this->status_deadline.total_count_change = 0;
this->status_deadline_last = this->status_deadline;
}, false /* notify */);
return RMW_RET_OK;
}
inline rmw_ret_t
get_offered_qos_incompatible_status(
rmw_offered_qos_incompatible_event_status_t * const status)
{
update_state(
[this, status]() {
this->triggered_qos = false;
status->total_count = this->status_qos.total_count;
status->total_count_change = this->status_qos.total_count_change;
status->last_policy_kind =
dds_qos_policy_to_rmw_qos_policy(this->status_qos.last_policy_id);
this->status_qos.total_count_change = 0;
this->status_qos_last = this->status_qos;
}, false /* notify */);
return RMW_RET_OK;
}
inline rmw_ret_t
get_matched_status(
rmw_matched_status_t * const status)
{
update_state(
[this, status]() {
this->triggered_matched = false;
status->total_count = this->status_matched.total_count;
status->total_count_change = this->status_matched.total_count_change;
status->current_count = this->status_matched.current_count;
status->current_count_change = this->status_matched.current_count_change;
this->status_matched.total_count_change = 0;
this->status_matched.current_count_change = 0;
this->status_matched_last = this->status_matched;
}, false /* notify */);
return RMW_RET_OK;
}
protected:
void update_status_deadline(
const DDS_OfferedDeadlineMissedStatus * const status);
void update_status_liveliness(
const DDS_LivelinessLostStatus * const status);
void update_status_qos(
const DDS_OfferedIncompatibleQosStatus * const status);
void update_status_matched(
const DDS_PublicationMatchedStatus * const status);
bool triggered_deadline{false};
bool triggered_liveliness{false};
bool triggered_qos{false};
bool triggered_matched{false};
DDS_OfferedDeadlineMissedStatus status_deadline;
DDS_OfferedIncompatibleQosStatus status_qos;
DDS_LivelinessLostStatus status_liveliness;
DDS_PublicationMatchedStatus status_matched;
DDS_OfferedDeadlineMissedStatus status_deadline_last;
DDS_OfferedIncompatibleQosStatus status_qos_last;
DDS_LivelinessLostStatus status_liveliness_last;
DDS_PublicationMatchedStatus status_matched_last;
RMW_Connext_Publisher * pub;
};
void
RMW_Connext_DataReaderListener_requested_deadline_missed(
void * listener_data,
DDS_DataReader * reader,
const struct DDS_RequestedDeadlineMissedStatus * status);
void
RMW_Connext_DataReaderListener_requested_incompatible_qos(
void * listener_data,
DDS_DataReader * reader,
const struct DDS_RequestedIncompatibleQosStatus * status);
void
RMW_Connext_DataReaderListener_liveliness_changed(
void * listener_data,
DDS_DataReader * reader,
const struct DDS_LivelinessChangedStatus * status);
void
RMW_Connext_DataReaderListener_sample_lost(
void * listener_data,
DDS_DataReader * reader,
const struct DDS_SampleLostStatus * status);
void
RMW_Connext_DataReaderListener_on_data_available(
void * listener_data,
DDS_DataReader * reader);
class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition
{
public:
RMW_Connext_SubscriberStatusCondition(
DDS_DataReader * const reader,
const bool ignore_local,
const bool internal);
virtual ~RMW_Connext_SubscriberStatusCondition();
virtual bool
has_status(const rmw_event_type_t event_type);
virtual rmw_ret_t
get_status(const rmw_event_type_t event_type, void * const event_info);
friend
void
RMW_Connext_DataReaderListener_requested_deadline_missed(
void * listener_data,
DDS_DataReader * reader,
const struct DDS_RequestedDeadlineMissedStatus * status);
friend
void
RMW_Connext_DataReaderListener_requested_incompatible_qos(
void * listener_data,
DDS_DataReader * reader,
const struct DDS_RequestedIncompatibleQosStatus * status);
friend
void
RMW_Connext_DataReaderListener_liveliness_changed(
void * listener_data,
DDS_DataReader * reader,
const struct DDS_LivelinessChangedStatus * status);
friend
void
RMW_Connext_DataReaderListener_sample_lost(
void * listener_data,
DDS_DataReader * reader,
const struct DDS_SampleLostStatus * status);
friend
void
RMW_Connext_DataReaderListener_on_data_available(
void * listener_data,
DDS_DataReader * reader);
rmw_ret_t
install(RMW_Connext_Subscriber * const sub);
void
on_requested_deadline_missed(
const DDS_RequestedDeadlineMissedStatus * const status);
void
on_requested_incompatible_qos(
const DDS_RequestedIncompatibleQosStatus * const status);
void
on_liveliness_changed(const DDS_LivelinessChangedStatus * const status);
void
on_sample_lost(const DDS_SampleLostStatus * const status);
void
on_matched(const DDS_SubscriptionMatchedStatus * const status);
const bool ignore_local;
const DDS_InstanceHandle_t participant_handle;
// Methods to match the DDS-based implementation
rmw_ret_t
set_data_available(const bool available)
{
update_state(
[this, available]() {
this->triggered_data = available;
}, true /* notify */);
if (nullptr != this->loan_guard_condition) {
if (DDS_RETCODE_OK !=
DDS_GuardCondition_set_trigger_value(this->loan_guard_condition, available))
{
RMW_CONNEXT_LOG_ERROR_SET("failed to set internal reader condition's trigger")
return RMW_RET_ERROR;
}
}
return RMW_RET_OK;
}
virtual bool
owns(DDS_Condition * const cond)
{
return RMW_Connext_StatusCondition::owns(cond) ||
(nullptr != this->loan_guard_condition &&
cond == DDS_GuardCondition_as_condition(this->loan_guard_condition));
}
virtual rmw_ret_t _attach(DDS_WaitSet * const waitset)
{
rmw_ret_t rc = RMW_Connext_StatusCondition::_attach(waitset);
if (RMW_RET_OK != rc) {
return rc;
}
if (nullptr != this->loan_guard_condition) {
return RMW_Connext_Condition::_attach(
waitset, DDS_GuardCondition_as_condition(this->loan_guard_condition));
}
return RMW_RET_OK;
}
virtual rmw_ret_t _detach(DDS_WaitSet * const waitset)
{
rmw_ret_t rc = RMW_Connext_StatusCondition::_detach(waitset);
if (RMW_RET_OK != rc) {
return rc;
}
if (nullptr != this->loan_guard_condition) {
return RMW_Connext_Condition::_detach(
waitset, DDS_GuardCondition_as_condition(this->loan_guard_condition));
}
return RMW_RET_OK;
}
// Helper functions to retrieve status information
inline rmw_ret_t
get_liveliness_changed_status(rmw_liveliness_changed_status_t * const status)
{
update_state(
[this, status]() {
this->triggered_liveliness = false;
status->alive_count = this->status_liveliness.alive_count;
status->alive_count_change = this->status_liveliness.alive_count_change;
status->not_alive_count = this->status_liveliness.not_alive_count;
status->not_alive_count_change = this->status_liveliness.not_alive_count_change;
this->status_liveliness.alive_count_change = 0;
this->status_liveliness.not_alive_count_change = 0;
this->status_liveliness_last = this->status_liveliness;
}, false /* notify */);
return RMW_RET_OK;
}
inline rmw_ret_t
get_requested_deadline_missed_status(
rmw_requested_deadline_missed_status_t * const status)
{
update_state(
[this, status]() {
this->triggered_deadline = false;
status->total_count = this->status_deadline.total_count;
status->total_count_change = this->status_deadline.total_count_change;
this->status_deadline.total_count_change = 0;
this->status_deadline_last = this->status_deadline;
}, false /* notify */);
return RMW_RET_OK;
}
inline rmw_ret_t
get_requested_qos_incompatible_status(
rmw_requested_qos_incompatible_event_status_t * const status)
{
update_state(
[this, status]() {
this->triggered_qos = false;
status->total_count = this->status_qos.total_count;
status->total_count_change = this->status_qos.total_count_change;
status->last_policy_kind =
dds_qos_policy_to_rmw_qos_policy(this->status_qos.last_policy_id);
this->status_qos.total_count_change = 0;
this->status_qos_last = this->status_qos;
}, false /* notify */);
return RMW_RET_OK;
}
inline rmw_ret_t
get_message_lost_status(rmw_message_lost_status_t * const status)
{
update_state(
[this, status]() {
this->triggered_sample_lost = false;
status->total_count = this->status_sample_lost.total_count;
status->total_count_change = this->status_sample_lost.total_count_change;
this->status_sample_lost.total_count_change = 0;
this->status_sample_lost_last = this->status_sample_lost;
}, false /* notify */);
return RMW_RET_OK;
}
inline rmw_ret_t
get_matched_status(rmw_matched_status_t * const status)
{
update_state(
[this, status]() {
this->triggered_matched = false;
status->total_count = static_cast<size_t>(this->status_matched.total_count);
status->total_count_change = static_cast<size_t>(this->status_matched.total_count_change);
status->current_count = static_cast<size_t>(this->status_matched.current_count);
status->current_count_change = this->status_matched.current_count_change;
this->status_matched.total_count_change = 0;
this->status_matched.current_count_change = 0;
this->status_matched_last = this->status_matched;
}, false /* notify */);
return RMW_RET_OK;
}
protected:
void update_status_deadline(
const DDS_RequestedDeadlineMissedStatus * const status);
void update_status_liveliness(
const DDS_LivelinessChangedStatus * const status);
void update_status_qos(
const DDS_RequestedIncompatibleQosStatus * const status);
void update_status_sample_lost(
const DDS_SampleLostStatus * const status);
void update_status_matched(
const DDS_SubscriptionMatchedStatus * const status);
DDS_GuardCondition * const loan_guard_condition;
bool triggered_deadline{false};
bool triggered_liveliness{false};
bool triggered_qos{false};
bool triggered_sample_lost{false};
bool triggered_matched{false};
bool triggered_data{false};
DDS_RequestedDeadlineMissedStatus status_deadline;
DDS_RequestedIncompatibleQosStatus status_qos;
DDS_LivelinessChangedStatus status_liveliness;
DDS_SampleLostStatus status_sample_lost;
DDS_SubscriptionMatchedStatus status_matched;
DDS_RequestedDeadlineMissedStatus status_deadline_last;
DDS_RequestedIncompatibleQosStatus status_qos_last;
DDS_LivelinessChangedStatus status_liveliness_last;
DDS_SampleLostStatus status_sample_lost_last;
DDS_SubscriptionMatchedStatus status_matched_last;
RMW_Connext_Subscriber * sub;
friend class RMW_Connext_WaitSet;
};
/******************************************************************************
* Event support
******************************************************************************/
class RMW_Connext_Event
{
public:
static
rmw_ret_t
enable(rmw_event_t * const event);
static
rmw_ret_t
disable(rmw_event_t * const event);
static
DDS_Condition *
condition(const rmw_event_t * const event);
static
bool
writer_event(const rmw_event_t * const event)
{
return !ros_event_for_reader(event->event_type);
}
static
bool
reader_event(const rmw_event_t * const event)
{
return ros_event_for_reader(event->event_type);
}
static
RMW_Connext_Publisher *
publisher(const rmw_event_t * const event)
{
return reinterpret_cast<RMW_Connext_Publisher *>(event->data);
}
static
RMW_Connext_Subscriber *
subscriber(const rmw_event_t * const event)
{
return reinterpret_cast<RMW_Connext_Subscriber *>(event->data);
}
};
#endif // RMW_CONNEXTDDS__RMW_WAITSET_STD_HPP_