.. _program_listing_file_include_rclcpp_wait_set_policies_thread_safe_synchronization.hpp: Program Listing for File thread_safe_synchronization.hpp ======================================================== |exhale_lsh| :ref:`Return to documentation for file ` (``include/rclcpp/wait_set_policies/thread_safe_synchronization.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp // Copyright 2020 Open Source Robotics Foundation, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #ifndef RCLCPP__WAIT_SET_POLICIES__THREAD_SAFE_SYNCHRONIZATION_HPP_ #define RCLCPP__WAIT_SET_POLICIES__THREAD_SAFE_SYNCHRONIZATION_HPP_ #include #include #include #include #include "rclcpp/client.hpp" #include "rclcpp/exceptions.hpp" #include "rclcpp/guard_condition.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/service.hpp" #include "rclcpp/subscription_base.hpp" #include "rclcpp/subscription_wait_set_mask.hpp" #include "rclcpp/timer.hpp" #include "rclcpp/visibility_control.hpp" #include "rclcpp/wait_result.hpp" #include "rclcpp/wait_result_kind.hpp" #include "rclcpp/wait_set_policies/detail/synchronization_policy_common.hpp" #include "rclcpp/wait_set_policies/detail/write_preferring_read_write_lock.hpp" #include "rclcpp/waitable.hpp" namespace rclcpp { namespace wait_set_policies { class ThreadSafeSynchronization : public detail::SynchronizationPolicyCommon { protected: explicit ThreadSafeSynchronization(rclcpp::Context::SharedPtr context) : extra_guard_conditions_{{std::make_shared(context)}}, wprw_lock_([this]() {this->interrupt_waiting_wait_set();}) {} ~ThreadSafeSynchronization() = default; const std::array, 1> & get_extra_guard_conditions() { return extra_guard_conditions_; } void interrupt_waiting_wait_set() { extra_guard_conditions_[0]->trigger(); } void sync_add_subscription( std::shared_ptr && subscription, const rclcpp::SubscriptionWaitSetMask & mask, std::function< void(std::shared_ptr&&, const rclcpp::SubscriptionWaitSetMask &) > add_subscription_function) { using rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock; std::lock_guard lock(wprw_lock_.get_write_mutex()); add_subscription_function(std::move(subscription), mask); } void sync_remove_subscription( std::shared_ptr && subscription, const rclcpp::SubscriptionWaitSetMask & mask, std::function< void(std::shared_ptr&&, const rclcpp::SubscriptionWaitSetMask &) > remove_subscription_function) { using rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock; std::lock_guard lock(wprw_lock_.get_write_mutex()); remove_subscription_function(std::move(subscription), mask); } void sync_add_guard_condition( std::shared_ptr && guard_condition, std::function&&)> add_guard_condition_function) { using rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock; std::lock_guard lock(wprw_lock_.get_write_mutex()); add_guard_condition_function(std::move(guard_condition)); } void sync_remove_guard_condition( std::shared_ptr && guard_condition, std::function&&)> remove_guard_condition_function) { using rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock; std::lock_guard lock(wprw_lock_.get_write_mutex()); remove_guard_condition_function(std::move(guard_condition)); } void sync_add_timer( std::shared_ptr && timer, std::function&&)> add_timer_function) { using rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock; std::lock_guard lock(wprw_lock_.get_write_mutex()); add_timer_function(std::move(timer)); } void sync_remove_timer( std::shared_ptr && timer, std::function&&)> remove_timer_function) { using rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock; std::lock_guard lock(wprw_lock_.get_write_mutex()); remove_timer_function(std::move(timer)); } void sync_add_client( std::shared_ptr && client, std::function&&)> add_client_function) { using rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock; std::lock_guard lock(wprw_lock_.get_write_mutex()); add_client_function(std::move(client)); } void sync_remove_client( std::shared_ptr && client, std::function&&)> remove_client_function) { using rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock; std::lock_guard lock(wprw_lock_.get_write_mutex()); remove_client_function(std::move(client)); } void sync_add_service( std::shared_ptr && service, std::function&&)> add_service_function) { using rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock; std::lock_guard lock(wprw_lock_.get_write_mutex()); add_service_function(std::move(service)); } void sync_remove_service( std::shared_ptr && service, std::function&&)> remove_service_function) { using rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock; std::lock_guard lock(wprw_lock_.get_write_mutex()); remove_service_function(std::move(service)); } void sync_add_waitable( std::shared_ptr && waitable, std::shared_ptr && associated_entity, std::function< void(std::shared_ptr&&, std::shared_ptr &&) > add_waitable_function) { using rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock; std::lock_guard lock(wprw_lock_.get_write_mutex()); add_waitable_function(std::move(waitable), std::move(associated_entity)); } void sync_remove_waitable( std::shared_ptr && waitable, std::function&&)> remove_waitable_function) { using rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock; std::lock_guard lock(wprw_lock_.get_write_mutex()); remove_waitable_function(std::move(waitable)); } void sync_prune_deleted_entities(std::function prune_deleted_entities_function) { using rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock; std::lock_guard lock(wprw_lock_.get_write_mutex()); prune_deleted_entities_function(); } template WaitResultT sync_wait( std::chrono::nanoseconds time_to_wait_ns, std::function rebuild_rcl_wait_set, std::function get_rcl_wait_set, std::function create_wait_result) { // Assumption: this function assumes that some measure has been taken to // ensure none of the entities being waited on by the wait set are allowed // to go out of scope and therefore be deleted. // In the case of the StaticStorage policy, this is ensured because it // retains shared ownership of all entites for the duration of its own life. // In the case of the DynamicStorage policy, this is ensured by the function // which calls this function, by acquiring shared ownership of the entites // for the duration of this function. // Setup looping predicate. auto start = std::chrono::steady_clock::now(); std::function should_loop = this->create_loop_predicate(time_to_wait_ns, start); // Wait until exit condition is met. do { { // We have to prevent the entity sets from being mutated while building // the rcl wait set. using rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock; std::lock_guard lock(wprw_lock_.get_read_mutex()); // Rebuild the wait set. // This will resize the wait set if needed, due to e.g. adding or removing // entities since the last wait, but this should never occur in static // storage wait sets since they cannot be changed after construction. // This will also clear the wait set and re-add all the entities, which // prepares it to be waited on again. rebuild_rcl_wait_set(); } rcl_wait_set_t & rcl_wait_set = get_rcl_wait_set(); // Wait unconditionally until timeout condition occurs since we assume // there are no conditions that would require the wait to stop and reset, // like asynchronously adding or removing an entity, i.e. explicitly // providing no thread-safety. // Calculate how much time there is left to wait, unless blocking indefinitely. auto time_left_to_wait_ns = this->calculate_time_left_to_wait(time_to_wait_ns, start); // Then wait for entities to become ready. // It is ok to wait while not having the lock acquired, because the state // in the rcl wait set will not be updated until this method calls // rebuild_rcl_wait_set(). rcl_ret_t ret = rcl_wait(&rcl_wait_set, time_left_to_wait_ns.count()); if (RCL_RET_OK == ret) { // Something has become ready in the wait set, first check if it was // the guard condition added by this class and/or a user defined guard condition. const rcl_guard_condition_t * interrupt_guard_condition_ptr = &(extra_guard_conditions_[0]->get_rcl_guard_condition()); bool was_interrupted_by_this_class = false; bool any_user_guard_conditions_triggered = false; for (size_t index = 0; index < rcl_wait_set.size_of_guard_conditions; ++index) { const rcl_guard_condition_t * current = rcl_wait_set.guard_conditions[index]; if (nullptr != current) { // Something is ready. if (rcl_wait_set.guard_conditions[index] == interrupt_guard_condition_ptr) { // This means that this class triggered a guard condition to interrupt this wait. was_interrupted_by_this_class = true; } else { // This means it was a user guard condition. any_user_guard_conditions_triggered = true; } } } if (!was_interrupted_by_this_class || any_user_guard_conditions_triggered) { // In this case we know: // - something was ready // - it was either: // - not interrupted by this class, or // - maybe it was, but there were also user defined guard conditions. // // We cannot ignore user defined guard conditions, but we can ignore // other kinds of user defined entities, because they will still be // ready next time we wait, whereas guard conditions are cleared. // Therefore we need to create a WaitResult and return it. // The WaitResult will call sync_wait_result_acquire() and // sync_wait_result_release() to ensure thread-safety by preventing // the mutation of the entity sets while introspecting after waiting. return create_wait_result(WaitResultKind::Ready); } // If we get here the we interrupted the wait set and there were no user // guard conditions that needed to be handled. // So we will loop and it will re-acquire the lock and rebuild the // rcl wait set. } else if (RCL_RET_TIMEOUT == ret) { // The wait set timed out, exit the loop. break; } else if (RCL_RET_WAIT_SET_EMPTY == ret) { // Wait set was empty, return Empty. return create_wait_result(WaitResultKind::Empty); } else { // Some other error case, throw. rclcpp::exceptions::throw_from_rcl_error(ret); } } while (should_loop()); // Wait did not result in ready items, return timeout. return create_wait_result(WaitResultKind::Timeout); } void sync_wait_result_acquire() { wprw_lock_.get_read_mutex().lock(); } void sync_wait_result_release() { wprw_lock_.get_read_mutex().unlock(); } protected: std::array, 1> extra_guard_conditions_; rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock wprw_lock_; }; } // namespace wait_set_policies } // namespace rclcpp #endif // RCLCPP__WAIT_SET_POLICIES__THREAD_SAFE_SYNCHRONIZATION_HPP_