Program Listing for File executor.hpp
↰ Return to documentation for file (include/rclcpp/executor.hpp
)
// Copyright 2014 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP__EXECUTOR_HPP_
#define RCLCPP__EXECUTOR_HPP_
#include <algorithm>
#include <cassert>
#include <chrono>
#include <cstdlib>
#include <iostream>
#include <list>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <vector>
#include "rcl/guard_condition.h"
#include "rcl/wait.h"
#include "rcpputils/scope_exit.hpp"
#include "rclcpp/context.hpp"
#include "rclcpp/contexts/default_context.hpp"
#include "rclcpp/guard_condition.hpp"
#include "rclcpp/executor_options.hpp"
#include "rclcpp/future_return_code.hpp"
#include "rclcpp/memory_strategies.hpp"
#include "rclcpp/memory_strategy.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/utilities.hpp"
#include "rclcpp/visibility_control.hpp"
namespace rclcpp
{
typedef std::map<rclcpp::CallbackGroup::WeakPtr,
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
std::owner_less<rclcpp::CallbackGroup::WeakPtr>> WeakCallbackGroupsToNodesMap;
// Forward declaration is used in convenience method signature.
class Node;
class Executor
{
public:
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(Executor)
RCLCPP_PUBLIC
explicit Executor(const rclcpp::ExecutorOptions & options = rclcpp::ExecutorOptions());
RCLCPP_PUBLIC
virtual ~Executor();
// It is up to the implementation of Executor to implement spin.
virtual void
spin() = 0;
RCLCPP_PUBLIC
virtual void
add_callback_group(
rclcpp::CallbackGroup::SharedPtr group_ptr,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
bool notify = true);
RCLCPP_PUBLIC
virtual std::vector<rclcpp::CallbackGroup::WeakPtr>
get_all_callback_groups();
RCLCPP_PUBLIC
virtual std::vector<rclcpp::CallbackGroup::WeakPtr>
get_manually_added_callback_groups();
RCLCPP_PUBLIC
virtual std::vector<rclcpp::CallbackGroup::WeakPtr>
get_automatically_added_callback_groups_from_nodes();
RCLCPP_PUBLIC
virtual void
remove_callback_group(
rclcpp::CallbackGroup::SharedPtr group_ptr,
bool notify = true);
RCLCPP_PUBLIC
virtual void
add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify = true);
RCLCPP_PUBLIC
virtual void
add_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify = true);
RCLCPP_PUBLIC
virtual void
remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify = true);
RCLCPP_PUBLIC
virtual void
remove_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify = true);
template<typename RepT = int64_t, typename T = std::milli>
void
spin_node_once(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node,
std::chrono::duration<RepT, T> timeout = std::chrono::duration<RepT, T>(-1))
{
return spin_node_once_nanoseconds(
node,
std::chrono::duration_cast<std::chrono::nanoseconds>(timeout)
);
}
template<typename NodeT = rclcpp::Node, typename RepT = int64_t, typename T = std::milli>
void
spin_node_once(
std::shared_ptr<NodeT> node,
std::chrono::duration<RepT, T> timeout = std::chrono::duration<RepT, T>(-1))
{
return spin_node_once_nanoseconds(
node->get_node_base_interface(),
std::chrono::duration_cast<std::chrono::nanoseconds>(timeout)
);
}
RCLCPP_PUBLIC
void
spin_node_some(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node);
RCLCPP_PUBLIC
void
spin_node_some(std::shared_ptr<rclcpp::Node> node);
RCLCPP_PUBLIC
virtual void
spin_some(std::chrono::nanoseconds max_duration = std::chrono::nanoseconds(0));
RCLCPP_PUBLIC
virtual void
spin_all(std::chrono::nanoseconds max_duration);
RCLCPP_PUBLIC
virtual void
spin_once(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
template<typename FutureT, typename TimeRepT = int64_t, typename TimeT = std::milli>
FutureReturnCode
spin_until_future_complete(
const FutureT & future,
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
{
// TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete
// inside a callback executed by an executor.
// Check the future before entering the while loop.
// If the future is already complete, don't try to spin.
std::future_status status = future.wait_for(std::chrono::seconds(0));
if (status == std::future_status::ready) {
return FutureReturnCode::SUCCESS;
}
auto end_time = std::chrono::steady_clock::now();
std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
timeout);
if (timeout_ns > std::chrono::nanoseconds::zero()) {
end_time += timeout_ns;
}
std::chrono::nanoseconds timeout_left = timeout_ns;
if (spinning.exchange(true)) {
throw std::runtime_error("spin_until_future_complete() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
while (rclcpp::ok(this->context_) && spinning.load()) {
// Do one item of work.
spin_once_impl(timeout_left);
// Check if the future is set, return SUCCESS if it is.
status = future.wait_for(std::chrono::seconds(0));
if (status == std::future_status::ready) {
return FutureReturnCode::SUCCESS;
}
// If the original timeout is < 0, then this is blocking, never TIMEOUT.
if (timeout_ns < std::chrono::nanoseconds::zero()) {
continue;
}
// Otherwise check if we still have time to wait, return TIMEOUT if not.
auto now = std::chrono::steady_clock::now();
if (now >= end_time) {
return FutureReturnCode::TIMEOUT;
}
// Subtract the elapsed time from the original timeout.
timeout_left = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - now);
}
// The future did not complete before ok() returned false, return INTERRUPTED.
return FutureReturnCode::INTERRUPTED;
}
RCLCPP_PUBLIC
void
cancel();
RCLCPP_PUBLIC
void
set_memory_strategy(memory_strategy::MemoryStrategy::SharedPtr memory_strategy);
RCLCPP_PUBLIC
bool
is_spinning();
protected:
RCLCPP_PUBLIC
void
spin_node_once_nanoseconds(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node,
std::chrono::nanoseconds timeout);
RCLCPP_PUBLIC
void
spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive);
RCLCPP_PUBLIC
void
execute_any_executable(AnyExecutable & any_exec);
RCLCPP_PUBLIC
static void
execute_subscription(
rclcpp::SubscriptionBase::SharedPtr subscription);
RCLCPP_PUBLIC
static void
execute_timer(rclcpp::TimerBase::SharedPtr timer);
RCLCPP_PUBLIC
static void
execute_service(rclcpp::ServiceBase::SharedPtr service);
RCLCPP_PUBLIC
static void
execute_client(rclcpp::ClientBase::SharedPtr client);
RCLCPP_PUBLIC
void
wait_for_work(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
RCLCPP_PUBLIC
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr
get_node_by_group(
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
rclcpp::CallbackGroup::SharedPtr group);
RCLCPP_PUBLIC
bool
has_node(
const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const;
RCLCPP_PUBLIC
rclcpp::CallbackGroup::SharedPtr
get_group_by_timer(rclcpp::TimerBase::SharedPtr timer);
RCLCPP_PUBLIC
virtual void
add_callback_group_to_map(
rclcpp::CallbackGroup::SharedPtr group_ptr,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_);
RCLCPP_PUBLIC
virtual void
remove_callback_group_from_map(
rclcpp::CallbackGroup::SharedPtr group_ptr,
WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_);
RCLCPP_PUBLIC
bool
get_next_ready_executable(AnyExecutable & any_executable);
RCLCPP_PUBLIC
bool
get_next_ready_executable_from_map(
AnyExecutable & any_executable,
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes);
RCLCPP_PUBLIC
bool
get_next_executable(
AnyExecutable & any_executable,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
RCLCPP_PUBLIC
virtual void
add_callback_groups_from_nodes_associated_to_executor() RCPPUTILS_TSA_REQUIRES(mutex_);
std::atomic_bool spinning;
rclcpp::GuardCondition interrupt_guard_condition_;
std::shared_ptr<rclcpp::GuardCondition> shutdown_guard_condition_;
rcl_wait_set_t wait_set_ = rcl_get_zero_initialized_wait_set();
// Mutex to protect the subsequent memory_strategy_.
mutable std::mutex mutex_;
memory_strategy::MemoryStrategy::SharedPtr
memory_strategy_ RCPPUTILS_TSA_PT_GUARDED_BY(mutex_);
std::shared_ptr<rclcpp::Context> context_;
RCLCPP_DISABLE_COPY(Executor)
RCLCPP_PUBLIC
virtual void
spin_once_impl(std::chrono::nanoseconds timeout);
typedef std::map<rclcpp::CallbackGroup::WeakPtr,
const rclcpp::GuardCondition *,
std::owner_less<rclcpp::CallbackGroup::WeakPtr>>
WeakCallbackGroupsToGuardConditionsMap;
WeakCallbackGroupsToGuardConditionsMap
weak_groups_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
WeakCallbackGroupsToNodesMap
weak_groups_associated_with_executor_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
WeakCallbackGroupsToNodesMap
weak_groups_to_nodes_associated_with_executor_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
WeakCallbackGroupsToNodesMap
weak_groups_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>
weak_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
rclcpp::OnShutdownCallbackHandle shutdown_callback_handle_;
};
} // namespace rclcpp
#endif // RCLCPP__EXECUTOR_HPP_