.. _program_listing_file_include_rclcpp_wait_set_template.hpp: Program Listing for File wait_set_template.hpp ============================================== |exhale_lsh| :ref:`Return to documentation for file ` (``include/rclcpp/wait_set_template.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp // Copyright 2020 Open Source Robotics Foundation, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #ifndef RCLCPP__WAIT_SET_TEMPLATE_HPP_ #define RCLCPP__WAIT_SET_TEMPLATE_HPP_ #include #include #include #include "rcl/wait.h" #include "rcpputils/scope_exit.hpp" #include "rclcpp/client.hpp" #include "rclcpp/context.hpp" #include "rclcpp/contexts/default_context.hpp" #include "rclcpp/guard_condition.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/service.hpp" #include "rclcpp/subscription_base.hpp" #include "rclcpp/subscription_wait_set_mask.hpp" #include "rclcpp/timer.hpp" #include "rclcpp/visibility_control.hpp" #include "rclcpp/wait_result.hpp" #include "rclcpp/waitable.hpp" namespace rclcpp { template class WaitSetTemplate final : private SynchronizationPolicy, private StoragePolicy { public: RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(WaitSetTemplate) using typename StoragePolicy::SubscriptionEntry; using typename StoragePolicy::WaitableEntry; explicit WaitSetTemplate( const typename StoragePolicy::SubscriptionsIterable & subscriptions = {}, const typename StoragePolicy::GuardConditionsIterable & guard_conditions = {}, const typename StoragePolicy::TimersIterable & timers = {}, const typename StoragePolicy::ClientsIterable & clients = {}, const typename StoragePolicy::ServicesIterable & services = {}, const typename StoragePolicy::WaitablesIterable & waitables = {}, rclcpp::Context::SharedPtr context = rclcpp::contexts::get_global_default_context()) : SynchronizationPolicy(context), StoragePolicy( subscriptions, guard_conditions, // this method comes from the SynchronizationPolicy this->get_extra_guard_conditions(), timers, clients, services, waitables, context) {} const rcl_wait_set_t & get_rcl_wait_set() const { // this method comes from the StoragePolicy return this->storage_get_rcl_wait_set(); } void add_subscription( std::shared_ptr subscription, rclcpp::SubscriptionWaitSetMask mask = {}) { if (nullptr == subscription) { throw std::invalid_argument("subscription is nullptr"); } // this method comes from the SynchronizationPolicy this->sync_add_subscription( std::move(subscription), mask, [this]( std::shared_ptr && inner_subscription, const rclcpp::SubscriptionWaitSetMask & mask) { // These methods comes from the StoragePolicy, and may not exist for // fixed sized storage policies. // It will throw if the subscription has already been added. if (mask.include_subscription) { auto local_subscription = inner_subscription; bool already_in_use = local_subscription->exchange_in_use_by_wait_set_state(local_subscription.get(), true); if (already_in_use) { throw std::runtime_error("subscription already associated with a wait set"); } this->storage_add_subscription(std::move(local_subscription)); if (this->wait_result_holding_) {this->wait_result_dirty_ = true;} } if (mask.include_events) { for (auto key_event_pair : inner_subscription->get_event_handlers()) { auto event = key_event_pair.second; auto local_subscription = inner_subscription; bool already_in_use = local_subscription->exchange_in_use_by_wait_set_state(event.get(), true); if (already_in_use) { throw std::runtime_error("subscription event already associated with a wait set"); } this->storage_add_waitable(std::move(event), std::move(local_subscription)); if (this->wait_result_holding_) {this->wait_result_dirty_ = true;} } } if (mask.include_intra_process_waitable) { auto local_subscription = inner_subscription; auto waitable = inner_subscription->get_intra_process_waitable(); if (nullptr != waitable) { bool already_in_use = local_subscription->exchange_in_use_by_wait_set_state( waitable.get(), true); if (already_in_use) { throw std::runtime_error( "subscription intra-process waitable already associated with a wait set"); } this->storage_add_waitable( std::move(inner_subscription->get_intra_process_waitable()), std::move(local_subscription)); if (this->wait_result_holding_) {this->wait_result_dirty_ = true;} } } }); } void remove_subscription( std::shared_ptr subscription, rclcpp::SubscriptionWaitSetMask mask = {}) { if (nullptr == subscription) { throw std::invalid_argument("subscription is nullptr"); } // this method comes from the SynchronizationPolicy this->sync_remove_subscription( std::move(subscription), mask, [this]( std::shared_ptr && inner_subscription, const rclcpp::SubscriptionWaitSetMask & mask) { // This method comes from the StoragePolicy, and it may not exist for // fixed sized storage policies. // It will throw if the subscription is not in the wait set. if (mask.include_subscription) { auto local_subscription = inner_subscription; local_subscription->exchange_in_use_by_wait_set_state(local_subscription.get(), false); this->storage_remove_subscription(std::move(local_subscription)); if (this->wait_result_holding_) {this->wait_result_dirty_ = true;} } if (mask.include_events) { for (auto key_event_pair : inner_subscription->get_event_handlers()) { auto event = key_event_pair.second; auto local_subscription = inner_subscription; local_subscription->exchange_in_use_by_wait_set_state(event.get(), false); this->storage_remove_waitable(std::move(event)); if (this->wait_result_holding_) {this->wait_result_dirty_ = true;} } } if (mask.include_intra_process_waitable) { auto local_waitable = inner_subscription->get_intra_process_waitable(); if (nullptr != local_waitable) { // This is the case when intra process is enabled for the subscription. inner_subscription->exchange_in_use_by_wait_set_state(local_waitable.get(), false); this->storage_remove_waitable(std::move(local_waitable)); if (this->wait_result_holding_) {this->wait_result_dirty_ = true;} } } }); } void add_guard_condition(std::shared_ptr guard_condition) { if (nullptr == guard_condition) { throw std::invalid_argument("guard_condition is nullptr"); } // this method comes from the SynchronizationPolicy this->sync_add_guard_condition( std::move(guard_condition), [this](std::shared_ptr && inner_guard_condition) { bool already_in_use = inner_guard_condition->exchange_in_use_by_wait_set_state(true); if (already_in_use) { throw std::runtime_error("guard condition already in use by another wait set"); } // This method comes from the StoragePolicy, and it may not exist for // fixed sized storage policies. // It will throw if the guard condition has already been added. this->storage_add_guard_condition(std::move(inner_guard_condition)); if (this->wait_result_holding_) {this->wait_result_dirty_ = true;} }); } void remove_guard_condition(std::shared_ptr guard_condition) { if (nullptr == guard_condition) { throw std::invalid_argument("guard_condition is nullptr"); } // this method comes from the SynchronizationPolicy this->sync_remove_guard_condition( std::move(guard_condition), [this](std::shared_ptr && inner_guard_condition) { inner_guard_condition->exchange_in_use_by_wait_set_state(false); // This method comes from the StoragePolicy, and it may not exist for // fixed sized storage policies. // It will throw if the guard condition is not in the wait set. this->storage_remove_guard_condition(std::move(inner_guard_condition)); if (this->wait_result_holding_) {this->wait_result_dirty_ = true;} }); } void add_timer(std::shared_ptr timer) { if (nullptr == timer) { throw std::invalid_argument("timer is nullptr"); } // this method comes from the SynchronizationPolicy this->sync_add_timer( std::move(timer), [this](std::shared_ptr && inner_timer) { bool already_in_use = inner_timer->exchange_in_use_by_wait_set_state(true); if (already_in_use) { throw std::runtime_error("timer already in use by another wait set"); } // This method comes from the StoragePolicy, and it may not exist for // fixed sized storage policies. // It will throw if the timer has already been added. this->storage_add_timer(std::move(inner_timer)); if (this->wait_result_holding_) {this->wait_result_dirty_ = true;} }); } void remove_timer(std::shared_ptr timer) { if (nullptr == timer) { throw std::invalid_argument("timer is nullptr"); } // this method comes from the SynchronizationPolicy this->sync_remove_timer( std::move(timer), [this](std::shared_ptr && inner_timer) { inner_timer->exchange_in_use_by_wait_set_state(false); // This method comes from the StoragePolicy, and it may not exist for // fixed sized storage policies. // It will throw if the timer is not in the wait set. this->storage_remove_timer(std::move(inner_timer)); if (this->wait_result_holding_) {this->wait_result_dirty_ = true;} }); } void add_client(std::shared_ptr client) { if (nullptr == client) { throw std::invalid_argument("client is nullptr"); } // this method comes from the SynchronizationPolicy this->sync_add_client( std::move(client), [this](std::shared_ptr && inner_client) { bool already_in_use = inner_client->exchange_in_use_by_wait_set_state(true); if (already_in_use) { throw std::runtime_error("client already in use by another wait set"); } // This method comes from the StoragePolicy, and it may not exist for // fixed sized storage policies. // It will throw if the client has already been added. this->storage_add_client(std::move(inner_client)); if (this->wait_result_holding_) {this->wait_result_dirty_ = true;} }); } void remove_client(std::shared_ptr client) { if (nullptr == client) { throw std::invalid_argument("client is nullptr"); } // this method comes from the SynchronizationPolicy this->sync_remove_client( std::move(client), [this](std::shared_ptr && inner_client) { inner_client->exchange_in_use_by_wait_set_state(false); // This method comes from the StoragePolicy, and it may not exist for // fixed sized storage policies. // It will throw if the client is not in the wait set. this->storage_remove_client(std::move(inner_client)); if (this->wait_result_holding_) {this->wait_result_dirty_ = true;} }); } void add_service(std::shared_ptr service) { if (nullptr == service) { throw std::invalid_argument("service is nullptr"); } // this method comes from the SynchronizationPolicy this->sync_add_service( std::move(service), [this](std::shared_ptr && inner_service) { bool already_in_use = inner_service->exchange_in_use_by_wait_set_state(true); if (already_in_use) { throw std::runtime_error("service already in use by another wait set"); } // This method comes from the StoragePolicy, and it may not exist for // fixed sized storage policies. // It will throw if the service has already been added. this->storage_add_service(std::move(inner_service)); if (this->wait_result_holding_) {this->wait_result_dirty_ = true;} }); } void remove_service(std::shared_ptr service) { if (nullptr == service) { throw std::invalid_argument("service is nullptr"); } // this method comes from the SynchronizationPolicy this->sync_remove_service( std::move(service), [this](std::shared_ptr && inner_service) { inner_service->exchange_in_use_by_wait_set_state(false); // This method comes from the StoragePolicy, and it may not exist for // fixed sized storage policies. // It will throw if the service is not in the wait set. this->storage_remove_service(std::move(inner_service)); if (this->wait_result_holding_) {this->wait_result_dirty_ = true;} }); } void add_waitable( std::shared_ptr waitable, std::shared_ptr associated_entity = nullptr) { if (nullptr == waitable) { throw std::invalid_argument("waitable is nullptr"); } // this method comes from the SynchronizationPolicy this->sync_add_waitable( std::move(waitable), std::move(associated_entity), [this]( std::shared_ptr && inner_waitable, std::shared_ptr && associated_entity) { bool already_in_use = inner_waitable->exchange_in_use_by_wait_set_state(true); if (already_in_use) { throw std::runtime_error("waitable already in use by another wait set"); } // This method comes from the StoragePolicy, and it may not exist for // fixed sized storage policies. // It will throw if the waitable has already been added. this->storage_add_waitable(std::move(inner_waitable), std::move(associated_entity)); if (this->wait_result_holding_) {this->wait_result_dirty_ = true;} }); } void remove_waitable(std::shared_ptr waitable) { if (nullptr == waitable) { throw std::invalid_argument("waitable is nullptr"); } // this method comes from the SynchronizationPolicy this->sync_remove_waitable( std::move(waitable), [this](std::shared_ptr && inner_waitable) { inner_waitable->exchange_in_use_by_wait_set_state(false); // This method comes from the StoragePolicy, and it may not exist for // fixed sized storage policies. // It will throw if the waitable is not in the wait set. this->storage_remove_waitable(std::move(inner_waitable)); if (this->wait_result_holding_) {this->wait_result_dirty_ = true;} }); } void prune_deleted_entities() { // this method comes from the SynchronizationPolicy this->sync_prune_deleted_entities( [this]() { // This method comes from the StoragePolicy, and it may not exist for // fixed sized storage policies. this->storage_prune_deleted_entities(); }); } template RCUTILS_WARN_UNUSED WaitResult wait(std::chrono::duration time_to_wait = std::chrono::duration(-1)) { auto time_to_wait_ns = std::chrono::duration_cast(time_to_wait); // ensure the ownership of the entities in the wait set is shared for the duration of wait this->storage_acquire_ownerships(); RCPPUTILS_SCOPE_EXIT({this->storage_release_ownerships();}); // this method comes from the SynchronizationPolicy return this->template sync_wait>( // pass along the time_to_wait duration as nanoseconds time_to_wait_ns, // this method provides the ability to rebuild the wait set, if needed [this]() { // This method comes from the StoragePolicy this->storage_rebuild_rcl_wait_set( // This method comes from the SynchronizationPolicy this->get_extra_guard_conditions() ); }, // this method provides access to the rcl wait set [this]() -> rcl_wait_set_t & { // This method comes from the StoragePolicy return this->storage_get_rcl_wait_set(); }, // this method provides a way to create the WaitResult [this](WaitResultKind wait_result_kind) -> WaitResult { // convert the result into a WaitResult switch (wait_result_kind) { case WaitResultKind::Ready: return WaitResult::from_ready_wait_result_kind(*this); case WaitResultKind::Timeout: return WaitResult::from_timeout_wait_result_kind(); case WaitResultKind::Empty: return WaitResult::from_empty_wait_result_kind(); default: auto msg = "unknown WaitResultKind with value: " + std::to_string(wait_result_kind); throw std::runtime_error(msg); } } ); } private: // Add WaitResult type as a friend so it can call private methods for // acquiring and releasing resources as the WaitResult is initialized and // destructed, respectively. friend WaitResult; void wait_result_acquire() { if (wait_result_holding_) { throw std::runtime_error("wait_result_acquire() called while already holding"); } wait_result_holding_ = true; wait_result_dirty_ = false; // this method comes from the SynchronizationPolicy this->sync_wait_result_acquire(); // this method comes from the StoragePolicy this->storage_acquire_ownerships(); } void wait_result_release() { if (!wait_result_holding_) { throw std::runtime_error("wait_result_release() called while not holding"); } wait_result_holding_ = false; wait_result_dirty_ = false; // this method comes from the StoragePolicy this->storage_release_ownerships(); // this method comes from the SynchronizationPolicy this->sync_wait_result_release(); } bool wait_result_holding_ = false; bool wait_result_dirty_ = false; }; } // namespace rclcpp #endif // RCLCPP__WAIT_SET_TEMPLATE_HPP_