Program Listing for File thread_safe_synchronization.hpp
↰ Return to documentation for file (include/rclcpp/wait_set_policies/thread_safe_synchronization.hpp
)
// Copyright 2020 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP__WAIT_SET_POLICIES__THREAD_SAFE_SYNCHRONIZATION_HPP_
#define RCLCPP__WAIT_SET_POLICIES__THREAD_SAFE_SYNCHRONIZATION_HPP_
#include <chrono>
#include <functional>
#include <memory>
#include <utility>
#include "rclcpp/client.hpp"
#include "rclcpp/exceptions.hpp"
#include "rclcpp/guard_condition.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/service.hpp"
#include "rclcpp/subscription_base.hpp"
#include "rclcpp/subscription_wait_set_mask.hpp"
#include "rclcpp/timer.hpp"
#include "rclcpp/visibility_control.hpp"
#include "rclcpp/wait_result.hpp"
#include "rclcpp/wait_result_kind.hpp"
#include "rclcpp/wait_set_policies/detail/synchronization_policy_common.hpp"
#include "rclcpp/wait_set_policies/detail/write_preferring_read_write_lock.hpp"
#include "rclcpp/waitable.hpp"
namespace rclcpp
{
namespace wait_set_policies
{
class ThreadSafeSynchronization : public detail::SynchronizationPolicyCommon
{
protected:
explicit ThreadSafeSynchronization(rclcpp::Context::SharedPtr context)
: extra_guard_conditions_{{std::make_shared<rclcpp::GuardCondition>(context)}},
wprw_lock_([this]() {this->interrupt_waiting_wait_set();})
{}
~ThreadSafeSynchronization() = default;
const std::array<std::shared_ptr<rclcpp::GuardCondition>, 1> &
get_extra_guard_conditions()
{
return extra_guard_conditions_;
}
void
interrupt_waiting_wait_set()
{
extra_guard_conditions_[0]->trigger();
}
void
sync_add_subscription(
std::shared_ptr<rclcpp::SubscriptionBase> && subscription,
const rclcpp::SubscriptionWaitSetMask & mask,
std::function<
void(std::shared_ptr<rclcpp::SubscriptionBase>&&, const rclcpp::SubscriptionWaitSetMask &)
> add_subscription_function)
{
using rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock;
std::lock_guard<WritePreferringReadWriteLock::WriteMutex> lock(wprw_lock_.get_write_mutex());
add_subscription_function(std::move(subscription), mask);
}
void
sync_remove_subscription(
std::shared_ptr<rclcpp::SubscriptionBase> && subscription,
const rclcpp::SubscriptionWaitSetMask & mask,
std::function<
void(std::shared_ptr<rclcpp::SubscriptionBase>&&, const rclcpp::SubscriptionWaitSetMask &)
> remove_subscription_function)
{
using rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock;
std::lock_guard<WritePreferringReadWriteLock::WriteMutex> lock(wprw_lock_.get_write_mutex());
remove_subscription_function(std::move(subscription), mask);
}
void
sync_add_guard_condition(
std::shared_ptr<rclcpp::GuardCondition> && guard_condition,
std::function<void(std::shared_ptr<rclcpp::GuardCondition>&&)> add_guard_condition_function)
{
using rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock;
std::lock_guard<WritePreferringReadWriteLock::WriteMutex> lock(wprw_lock_.get_write_mutex());
add_guard_condition_function(std::move(guard_condition));
}
void
sync_remove_guard_condition(
std::shared_ptr<rclcpp::GuardCondition> && guard_condition,
std::function<void(std::shared_ptr<rclcpp::GuardCondition>&&)> remove_guard_condition_function)
{
using rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock;
std::lock_guard<WritePreferringReadWriteLock::WriteMutex> lock(wprw_lock_.get_write_mutex());
remove_guard_condition_function(std::move(guard_condition));
}
void
sync_add_timer(
std::shared_ptr<rclcpp::TimerBase> && timer,
std::function<void(std::shared_ptr<rclcpp::TimerBase>&&)> add_timer_function)
{
using rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock;
std::lock_guard<WritePreferringReadWriteLock::WriteMutex> lock(wprw_lock_.get_write_mutex());
add_timer_function(std::move(timer));
}
void
sync_remove_timer(
std::shared_ptr<rclcpp::TimerBase> && timer,
std::function<void(std::shared_ptr<rclcpp::TimerBase>&&)> remove_timer_function)
{
using rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock;
std::lock_guard<WritePreferringReadWriteLock::WriteMutex> lock(wprw_lock_.get_write_mutex());
remove_timer_function(std::move(timer));
}
void
sync_add_client(
std::shared_ptr<rclcpp::ClientBase> && client,
std::function<void(std::shared_ptr<rclcpp::ClientBase>&&)> add_client_function)
{
using rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock;
std::lock_guard<WritePreferringReadWriteLock::WriteMutex> lock(wprw_lock_.get_write_mutex());
add_client_function(std::move(client));
}
void
sync_remove_client(
std::shared_ptr<rclcpp::ClientBase> && client,
std::function<void(std::shared_ptr<rclcpp::ClientBase>&&)> remove_client_function)
{
using rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock;
std::lock_guard<WritePreferringReadWriteLock::WriteMutex> lock(wprw_lock_.get_write_mutex());
remove_client_function(std::move(client));
}
void
sync_add_service(
std::shared_ptr<rclcpp::ServiceBase> && service,
std::function<void(std::shared_ptr<rclcpp::ServiceBase>&&)> add_service_function)
{
using rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock;
std::lock_guard<WritePreferringReadWriteLock::WriteMutex> lock(wprw_lock_.get_write_mutex());
add_service_function(std::move(service));
}
void
sync_remove_service(
std::shared_ptr<rclcpp::ServiceBase> && service,
std::function<void(std::shared_ptr<rclcpp::ServiceBase>&&)> remove_service_function)
{
using rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock;
std::lock_guard<WritePreferringReadWriteLock::WriteMutex> lock(wprw_lock_.get_write_mutex());
remove_service_function(std::move(service));
}
void
sync_add_waitable(
std::shared_ptr<rclcpp::Waitable> && waitable,
std::shared_ptr<void> && associated_entity,
std::function<
void(std::shared_ptr<rclcpp::Waitable>&&, std::shared_ptr<void> &&)
> add_waitable_function)
{
using rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock;
std::lock_guard<WritePreferringReadWriteLock::WriteMutex> lock(wprw_lock_.get_write_mutex());
add_waitable_function(std::move(waitable), std::move(associated_entity));
}
void
sync_remove_waitable(
std::shared_ptr<rclcpp::Waitable> && waitable,
std::function<void(std::shared_ptr<rclcpp::Waitable>&&)> remove_waitable_function)
{
using rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock;
std::lock_guard<WritePreferringReadWriteLock::WriteMutex> lock(wprw_lock_.get_write_mutex());
remove_waitable_function(std::move(waitable));
}
void
sync_prune_deleted_entities(std::function<void()> prune_deleted_entities_function)
{
using rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock;
std::lock_guard<WritePreferringReadWriteLock::WriteMutex> lock(wprw_lock_.get_write_mutex());
prune_deleted_entities_function();
}
template<class WaitResultT>
WaitResultT
sync_wait(
std::chrono::nanoseconds time_to_wait_ns,
std::function<void()> rebuild_rcl_wait_set,
std::function<rcl_wait_set_t & ()> get_rcl_wait_set,
std::function<WaitResultT(WaitResultKind wait_result_kind)> create_wait_result)
{
// Assumption: this function assumes that some measure has been taken to
// ensure none of the entities being waited on by the wait set are allowed
// to go out of scope and therefore be deleted.
// In the case of the StaticStorage policy, this is ensured because it
// retains shared ownership of all entites for the duration of its own life.
// In the case of the DynamicStorage policy, this is ensured by the function
// which calls this function, by acquiring shared ownership of the entites
// for the duration of this function.
// Setup looping predicate.
auto start = std::chrono::steady_clock::now();
std::function<bool()> should_loop = this->create_loop_predicate(time_to_wait_ns, start);
// Wait until exit condition is met.
do {
{
// We have to prevent the entity sets from being mutated while building
// the rcl wait set.
using rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock;
std::lock_guard<WritePreferringReadWriteLock::ReadMutex> lock(wprw_lock_.get_read_mutex());
// Rebuild the wait set.
// This will resize the wait set if needed, due to e.g. adding or removing
// entities since the last wait, but this should never occur in static
// storage wait sets since they cannot be changed after construction.
// This will also clear the wait set and re-add all the entities, which
// prepares it to be waited on again.
rebuild_rcl_wait_set();
}
rcl_wait_set_t & rcl_wait_set = get_rcl_wait_set();
// Wait unconditionally until timeout condition occurs since we assume
// there are no conditions that would require the wait to stop and reset,
// like asynchronously adding or removing an entity, i.e. explicitly
// providing no thread-safety.
// Calculate how much time there is left to wait, unless blocking indefinitely.
auto time_left_to_wait_ns = this->calculate_time_left_to_wait(time_to_wait_ns, start);
// Then wait for entities to become ready.
// It is ok to wait while not having the lock acquired, because the state
// in the rcl wait set will not be updated until this method calls
// rebuild_rcl_wait_set().
rcl_ret_t ret = rcl_wait(&rcl_wait_set, time_left_to_wait_ns.count());
if (RCL_RET_OK == ret) {
// Something has become ready in the wait set, first check if it was
// the guard condition added by this class and/or a user defined guard condition.
const rcl_guard_condition_t * interrupt_guard_condition_ptr =
&(extra_guard_conditions_[0]->get_rcl_guard_condition());
bool was_interrupted_by_this_class = false;
bool any_user_guard_conditions_triggered = false;
for (size_t index = 0; index < rcl_wait_set.size_of_guard_conditions; ++index) {
const rcl_guard_condition_t * current = rcl_wait_set.guard_conditions[index];
if (nullptr != current) {
// Something is ready.
if (rcl_wait_set.guard_conditions[index] == interrupt_guard_condition_ptr) {
// This means that this class triggered a guard condition to interrupt this wait.
was_interrupted_by_this_class = true;
} else {
// This means it was a user guard condition.
any_user_guard_conditions_triggered = true;
}
}
}
if (!was_interrupted_by_this_class || any_user_guard_conditions_triggered) {
// In this case we know:
// - something was ready
// - it was either:
// - not interrupted by this class, or
// - maybe it was, but there were also user defined guard conditions.
//
// We cannot ignore user defined guard conditions, but we can ignore
// other kinds of user defined entities, because they will still be
// ready next time we wait, whereas guard conditions are cleared.
// Therefore we need to create a WaitResult and return it.
// The WaitResult will call sync_wait_result_acquire() and
// sync_wait_result_release() to ensure thread-safety by preventing
// the mutation of the entity sets while introspecting after waiting.
return create_wait_result(WaitResultKind::Ready);
}
// If we get here the we interrupted the wait set and there were no user
// guard conditions that needed to be handled.
// So we will loop and it will re-acquire the lock and rebuild the
// rcl wait set.
} else if (RCL_RET_TIMEOUT == ret) {
// The wait set timed out, exit the loop.
break;
} else if (RCL_RET_WAIT_SET_EMPTY == ret) {
// Wait set was empty, return Empty.
return create_wait_result(WaitResultKind::Empty);
} else {
// Some other error case, throw.
rclcpp::exceptions::throw_from_rcl_error(ret);
}
} while (should_loop());
// Wait did not result in ready items, return timeout.
return create_wait_result(WaitResultKind::Timeout);
}
void
sync_wait_result_acquire()
{
wprw_lock_.get_read_mutex().lock();
}
void
sync_wait_result_release()
{
wprw_lock_.get_read_mutex().unlock();
}
protected:
std::array<std::shared_ptr<rclcpp::GuardCondition>, 1> extra_guard_conditions_;
rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock wprw_lock_;
};
} // namespace wait_set_policies
} // namespace rclcpp
#endif // RCLCPP__WAIT_SET_POLICIES__THREAD_SAFE_SYNCHRONIZATION_HPP_