Program Listing for File wait_set_template.hpp
↰ Return to documentation for file (include/rclcpp/wait_set_template.hpp
)
// Copyright 2020 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP__WAIT_SET_TEMPLATE_HPP_
#define RCLCPP__WAIT_SET_TEMPLATE_HPP_
#include <chrono>
#include <memory>
#include <utility>
#include "rcl/wait.h"
#include "rcpputils/scope_exit.hpp"
#include "rclcpp/client.hpp"
#include "rclcpp/context.hpp"
#include "rclcpp/contexts/default_context.hpp"
#include "rclcpp/guard_condition.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/service.hpp"
#include "rclcpp/subscription_base.hpp"
#include "rclcpp/subscription_wait_set_mask.hpp"
#include "rclcpp/timer.hpp"
#include "rclcpp/visibility_control.hpp"
#include "rclcpp/wait_result.hpp"
#include "rclcpp/waitable.hpp"
namespace rclcpp
{
template<class SynchronizationPolicy, class StoragePolicy>
class WaitSetTemplate final : private SynchronizationPolicy, private StoragePolicy
{
public:
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(WaitSetTemplate)
using typename StoragePolicy::SubscriptionEntry;
using typename StoragePolicy::WaitableEntry;
explicit
WaitSetTemplate(
const typename StoragePolicy::SubscriptionsIterable & subscriptions = {},
const typename StoragePolicy::GuardConditionsIterable & guard_conditions = {},
const typename StoragePolicy::TimersIterable & timers = {},
const typename StoragePolicy::ClientsIterable & clients = {},
const typename StoragePolicy::ServicesIterable & services = {},
const typename StoragePolicy::WaitablesIterable & waitables = {},
rclcpp::Context::SharedPtr context =
rclcpp::contexts::get_global_default_context())
: SynchronizationPolicy(context),
StoragePolicy(
subscriptions,
guard_conditions,
// this method comes from the SynchronizationPolicy
this->get_extra_guard_conditions(),
timers,
clients,
services,
waitables,
context)
{}
const rcl_wait_set_t &
get_rcl_wait_set() const
{
// this method comes from the StoragePolicy
return this->storage_get_rcl_wait_set();
}
void
add_subscription(
std::shared_ptr<rclcpp::SubscriptionBase> subscription,
rclcpp::SubscriptionWaitSetMask mask = {})
{
if (nullptr == subscription) {
throw std::invalid_argument("subscription is nullptr");
}
// this method comes from the SynchronizationPolicy
this->sync_add_subscription(
std::move(subscription),
mask,
[this](
std::shared_ptr<rclcpp::SubscriptionBase> && inner_subscription,
const rclcpp::SubscriptionWaitSetMask & mask)
{
// These methods comes from the StoragePolicy, and may not exist for
// fixed sized storage policies.
// It will throw if the subscription has already been added.
if (mask.include_subscription) {
auto local_subscription = inner_subscription;
bool already_in_use =
local_subscription->exchange_in_use_by_wait_set_state(local_subscription.get(), true);
if (already_in_use) {
throw std::runtime_error("subscription already associated with a wait set");
}
this->storage_add_subscription(std::move(local_subscription));
}
if (mask.include_events) {
for (auto key_event_pair : inner_subscription->get_event_handlers()) {
auto event = key_event_pair.second;
auto local_subscription = inner_subscription;
bool already_in_use =
local_subscription->exchange_in_use_by_wait_set_state(event.get(), true);
if (already_in_use) {
throw std::runtime_error("subscription event already associated with a wait set");
}
this->storage_add_waitable(std::move(event), std::move(local_subscription));
}
}
if (mask.include_intra_process_waitable) {
auto local_subscription = inner_subscription;
auto waitable = inner_subscription->get_intra_process_waitable();
if (nullptr != waitable) {
bool already_in_use = local_subscription->exchange_in_use_by_wait_set_state(
waitable.get(),
true);
if (already_in_use) {
throw std::runtime_error(
"subscription intra-process waitable already associated with a wait set");
}
this->storage_add_waitable(
std::move(inner_subscription->get_intra_process_waitable()),
std::move(local_subscription));
}
}
});
}
void
remove_subscription(
std::shared_ptr<rclcpp::SubscriptionBase> subscription,
rclcpp::SubscriptionWaitSetMask mask = {})
{
if (nullptr == subscription) {
throw std::invalid_argument("subscription is nullptr");
}
// this method comes from the SynchronizationPolicy
this->sync_remove_subscription(
std::move(subscription),
mask,
[this](
std::shared_ptr<rclcpp::SubscriptionBase> && inner_subscription,
const rclcpp::SubscriptionWaitSetMask & mask)
{
// This method comes from the StoragePolicy, and it may not exist for
// fixed sized storage policies.
// It will throw if the subscription is not in the wait set.
if (mask.include_subscription) {
auto local_subscription = inner_subscription;
local_subscription->exchange_in_use_by_wait_set_state(local_subscription.get(), false);
this->storage_remove_subscription(std::move(local_subscription));
}
if (mask.include_events) {
for (auto key_event_pair : inner_subscription->get_event_handlers()) {
auto event = key_event_pair.second;
auto local_subscription = inner_subscription;
local_subscription->exchange_in_use_by_wait_set_state(event.get(), false);
this->storage_remove_waitable(std::move(event));
}
}
if (mask.include_intra_process_waitable) {
auto local_waitable = inner_subscription->get_intra_process_waitable();
if (nullptr != local_waitable) {
// This is the case when intra process is enabled for the subscription.
inner_subscription->exchange_in_use_by_wait_set_state(local_waitable.get(), false);
this->storage_remove_waitable(std::move(local_waitable));
}
}
});
}
void
add_guard_condition(std::shared_ptr<rclcpp::GuardCondition> guard_condition)
{
if (nullptr == guard_condition) {
throw std::invalid_argument("guard_condition is nullptr");
}
// this method comes from the SynchronizationPolicy
this->sync_add_guard_condition(
std::move(guard_condition),
[this](std::shared_ptr<rclcpp::GuardCondition> && inner_guard_condition) {
bool already_in_use = inner_guard_condition->exchange_in_use_by_wait_set_state(true);
if (already_in_use) {
throw std::runtime_error("guard condition already in use by another wait set");
}
// This method comes from the StoragePolicy, and it may not exist for
// fixed sized storage policies.
// It will throw if the guard condition has already been added.
this->storage_add_guard_condition(std::move(inner_guard_condition));
});
}
void
remove_guard_condition(std::shared_ptr<rclcpp::GuardCondition> guard_condition)
{
if (nullptr == guard_condition) {
throw std::invalid_argument("guard_condition is nullptr");
}
// this method comes from the SynchronizationPolicy
this->sync_remove_guard_condition(
std::move(guard_condition),
[this](std::shared_ptr<rclcpp::GuardCondition> && inner_guard_condition) {
inner_guard_condition->exchange_in_use_by_wait_set_state(false);
// This method comes from the StoragePolicy, and it may not exist for
// fixed sized storage policies.
// It will throw if the guard condition is not in the wait set.
this->storage_remove_guard_condition(std::move(inner_guard_condition));
});
}
void
add_timer(std::shared_ptr<rclcpp::TimerBase> timer)
{
if (nullptr == timer) {
throw std::invalid_argument("timer is nullptr");
}
// this method comes from the SynchronizationPolicy
this->sync_add_timer(
std::move(timer),
[this](std::shared_ptr<rclcpp::TimerBase> && inner_timer) {
bool already_in_use = inner_timer->exchange_in_use_by_wait_set_state(true);
if (already_in_use) {
throw std::runtime_error("timer already in use by another wait set");
}
// This method comes from the StoragePolicy, and it may not exist for
// fixed sized storage policies.
// It will throw if the timer has already been added.
this->storage_add_timer(std::move(inner_timer));
});
}
void
remove_timer(std::shared_ptr<rclcpp::TimerBase> timer)
{
if (nullptr == timer) {
throw std::invalid_argument("timer is nullptr");
}
// this method comes from the SynchronizationPolicy
this->sync_remove_timer(
std::move(timer),
[this](std::shared_ptr<rclcpp::TimerBase> && inner_timer) {
inner_timer->exchange_in_use_by_wait_set_state(false);
// This method comes from the StoragePolicy, and it may not exist for
// fixed sized storage policies.
// It will throw if the timer is not in the wait set.
this->storage_remove_timer(std::move(inner_timer));
});
}
void
add_client(std::shared_ptr<rclcpp::ClientBase> client)
{
if (nullptr == client) {
throw std::invalid_argument("client is nullptr");
}
// this method comes from the SynchronizationPolicy
this->sync_add_client(
std::move(client),
[this](std::shared_ptr<rclcpp::ClientBase> && inner_client) {
bool already_in_use = inner_client->exchange_in_use_by_wait_set_state(true);
if (already_in_use) {
throw std::runtime_error("client already in use by another wait set");
}
// This method comes from the StoragePolicy, and it may not exist for
// fixed sized storage policies.
// It will throw if the client has already been added.
this->storage_add_client(std::move(inner_client));
});
}
void
remove_client(std::shared_ptr<rclcpp::ClientBase> client)
{
if (nullptr == client) {
throw std::invalid_argument("client is nullptr");
}
// this method comes from the SynchronizationPolicy
this->sync_remove_client(
std::move(client),
[this](std::shared_ptr<rclcpp::ClientBase> && inner_client) {
inner_client->exchange_in_use_by_wait_set_state(false);
// This method comes from the StoragePolicy, and it may not exist for
// fixed sized storage policies.
// It will throw if the client is not in the wait set.
this->storage_remove_client(std::move(inner_client));
});
}
void
add_service(std::shared_ptr<rclcpp::ServiceBase> service)
{
if (nullptr == service) {
throw std::invalid_argument("service is nullptr");
}
// this method comes from the SynchronizationPolicy
this->sync_add_service(
std::move(service),
[this](std::shared_ptr<rclcpp::ServiceBase> && inner_service) {
bool already_in_use = inner_service->exchange_in_use_by_wait_set_state(true);
if (already_in_use) {
throw std::runtime_error("service already in use by another wait set");
}
// This method comes from the StoragePolicy, and it may not exist for
// fixed sized storage policies.
// It will throw if the service has already been added.
this->storage_add_service(std::move(inner_service));
});
}
void
remove_service(std::shared_ptr<rclcpp::ServiceBase> service)
{
if (nullptr == service) {
throw std::invalid_argument("service is nullptr");
}
// this method comes from the SynchronizationPolicy
this->sync_remove_service(
std::move(service),
[this](std::shared_ptr<rclcpp::ServiceBase> && inner_service) {
inner_service->exchange_in_use_by_wait_set_state(false);
// This method comes from the StoragePolicy, and it may not exist for
// fixed sized storage policies.
// It will throw if the service is not in the wait set.
this->storage_remove_service(std::move(inner_service));
});
}
void
add_waitable(
std::shared_ptr<rclcpp::Waitable> waitable,
std::shared_ptr<void> associated_entity = nullptr)
{
if (nullptr == waitable) {
throw std::invalid_argument("waitable is nullptr");
}
// this method comes from the SynchronizationPolicy
this->sync_add_waitable(
std::move(waitable),
std::move(associated_entity),
[this](
std::shared_ptr<rclcpp::Waitable> && inner_waitable,
std::shared_ptr<void> && associated_entity)
{
bool already_in_use = inner_waitable->exchange_in_use_by_wait_set_state(true);
if (already_in_use) {
throw std::runtime_error("waitable already in use by another wait set");
}
// This method comes from the StoragePolicy, and it may not exist for
// fixed sized storage policies.
// It will throw if the waitable has already been added.
this->storage_add_waitable(std::move(inner_waitable), std::move(associated_entity));
});
}
void
remove_waitable(std::shared_ptr<rclcpp::Waitable> waitable)
{
if (nullptr == waitable) {
throw std::invalid_argument("waitable is nullptr");
}
// this method comes from the SynchronizationPolicy
this->sync_remove_waitable(
std::move(waitable),
[this](std::shared_ptr<rclcpp::Waitable> && inner_waitable) {
inner_waitable->exchange_in_use_by_wait_set_state(false);
// This method comes from the StoragePolicy, and it may not exist for
// fixed sized storage policies.
// It will throw if the waitable is not in the wait set.
this->storage_remove_waitable(std::move(inner_waitable));
});
}
void
prune_deleted_entities()
{
// this method comes from the SynchronizationPolicy
this->sync_prune_deleted_entities(
[this]() {
// This method comes from the StoragePolicy, and it may not exist for
// fixed sized storage policies.
this->storage_prune_deleted_entities();
});
}
template<class Rep = int64_t, class Period = std::milli>
RCUTILS_WARN_UNUSED
WaitResult<WaitSetTemplate>
wait(std::chrono::duration<Rep, Period> time_to_wait = std::chrono::duration<Rep, Period>(-1))
{
auto time_to_wait_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(time_to_wait);
// ensure the ownership of the entities in the wait set is shared for the duration of wait
this->storage_acquire_ownerships();
RCPPUTILS_SCOPE_EXIT({this->storage_release_ownerships();});
// this method comes from the SynchronizationPolicy
return this->template sync_wait<WaitResult<WaitSetTemplate>>(
// pass along the time_to_wait duration as nanoseconds
time_to_wait_ns,
// this method provides the ability to rebuild the wait set, if needed
[this]() {
// This method comes from the StoragePolicy
this->storage_rebuild_rcl_wait_set(
// This method comes from the SynchronizationPolicy
this->get_extra_guard_conditions()
);
},
// this method provides access to the rcl wait set
[this]() -> rcl_wait_set_t & {
// This method comes from the StoragePolicy
return this->storage_get_rcl_wait_set();
},
// this method provides a way to create the WaitResult
[this](WaitResultKind wait_result_kind) -> WaitResult<WaitSetTemplate> {
// convert the result into a WaitResult
switch (wait_result_kind) {
case WaitResultKind::Ready:
return WaitResult<WaitSetTemplate>::from_ready_wait_result_kind(*this);
case WaitResultKind::Timeout:
return WaitResult<WaitSetTemplate>::from_timeout_wait_result_kind();
case WaitResultKind::Empty:
return WaitResult<WaitSetTemplate>::from_empty_wait_result_kind();
default:
auto msg = "unknown WaitResultKind with value: " + std::to_string(wait_result_kind);
throw std::runtime_error(msg);
}
}
);
}
private:
// Add WaitResult type as a friend so it can call private methods for
// acquiring and releasing resources as the WaitResult is initialized and
// destructed, respectively.
friend WaitResult<WaitSetTemplate>;
void
wait_result_acquire()
{
if (wait_result_holding_) {
throw std::runtime_error("wait_result_acquire() called while already holding");
}
wait_result_holding_ = true;
// this method comes from the SynchronizationPolicy
this->sync_wait_result_acquire();
// this method comes from the StoragePolicy
this->storage_acquire_ownerships();
}
void
wait_result_release()
{
if (!wait_result_holding_) {
throw std::runtime_error("wait_result_release() called while not holding");
}
wait_result_holding_ = false;
// this method comes from the StoragePolicy
this->storage_release_ownerships();
// this method comes from the SynchronizationPolicy
this->sync_wait_result_release();
}
bool wait_result_holding_ = false;
};
} // namespace rclcpp
#endif // RCLCPP__WAIT_SET_TEMPLATE_HPP_