Program Listing for File executor.hpp

Return to documentation for file (include/rclcpp/executor.hpp)

// Copyright 2014 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__EXECUTOR_HPP_
#define RCLCPP__EXECUTOR_HPP_

#include <algorithm>
#include <cassert>
#include <chrono>
#include <cstdlib>
#include <iostream>
#include <list>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <vector>

#include "rcl/guard_condition.h"
#include "rcl/wait.h"
#include "rcpputils/scope_exit.hpp"

#include "rclcpp/context.hpp"
#include "rclcpp/contexts/default_context.hpp"
#include "rclcpp/guard_condition.hpp"
#include "rclcpp/executor_options.hpp"
#include "rclcpp/future_return_code.hpp"
#include "rclcpp/memory_strategies.hpp"
#include "rclcpp/memory_strategy.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/utilities.hpp"
#include "rclcpp/visibility_control.hpp"

namespace rclcpp
{

typedef std::map<rclcpp::CallbackGroup::WeakPtr,
    rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
    std::owner_less<rclcpp::CallbackGroup::WeakPtr>> WeakCallbackGroupsToNodesMap;

// Forward declaration is used in convenience method signature.
class Node;
class ExecutorImplementation;


class Executor
{
public:
  RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(Executor)


  RCLCPP_PUBLIC
  explicit Executor(const rclcpp::ExecutorOptions & options = rclcpp::ExecutorOptions());

  RCLCPP_PUBLIC
  virtual ~Executor();

  // It is up to the implementation of Executor to implement spin.
  virtual void
  spin() = 0;


  RCLCPP_PUBLIC
  virtual void
  add_callback_group(
    rclcpp::CallbackGroup::SharedPtr group_ptr,
    rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
    bool notify = true);


  RCLCPP_PUBLIC
  virtual std::vector<rclcpp::CallbackGroup::WeakPtr>
  get_all_callback_groups();


  RCLCPP_PUBLIC
  virtual std::vector<rclcpp::CallbackGroup::WeakPtr>
  get_manually_added_callback_groups();


  RCLCPP_PUBLIC
  virtual std::vector<rclcpp::CallbackGroup::WeakPtr>
  get_automatically_added_callback_groups_from_nodes();


  RCLCPP_PUBLIC
  virtual void
  remove_callback_group(
    rclcpp::CallbackGroup::SharedPtr group_ptr,
    bool notify = true);


  RCLCPP_PUBLIC
  virtual void
  add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify = true);


  RCLCPP_PUBLIC
  virtual void
  add_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify = true);


  RCLCPP_PUBLIC
  virtual void
  remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify = true);


  RCLCPP_PUBLIC
  virtual void
  remove_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify = true);


  template<typename RepT = int64_t, typename T = std::milli>
  void
  spin_node_once(
    rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node,
    std::chrono::duration<RepT, T> timeout = std::chrono::duration<RepT, T>(-1))
  {
    return spin_node_once_nanoseconds(
      node,
      std::chrono::duration_cast<std::chrono::nanoseconds>(timeout)
    );
  }

  template<typename NodeT = rclcpp::Node, typename RepT = int64_t, typename T = std::milli>
  void
  spin_node_once(
    std::shared_ptr<NodeT> node,
    std::chrono::duration<RepT, T> timeout = std::chrono::duration<RepT, T>(-1))
  {
    return spin_node_once_nanoseconds(
      node->get_node_base_interface(),
      std::chrono::duration_cast<std::chrono::nanoseconds>(timeout)
    );
  }


  RCLCPP_PUBLIC
  void
  spin_node_some(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node);

  RCLCPP_PUBLIC
  void
  spin_node_some(std::shared_ptr<rclcpp::Node> node);


  RCLCPP_PUBLIC
  virtual void
  spin_some(std::chrono::nanoseconds max_duration = std::chrono::nanoseconds(0));


  RCLCPP_PUBLIC
  virtual void
  spin_all(std::chrono::nanoseconds max_duration);



  RCLCPP_PUBLIC
  virtual void
  spin_once(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));


  template<typename FutureT, typename TimeRepT = int64_t, typename TimeT = std::milli>
  FutureReturnCode
  spin_until_future_complete(
    const FutureT & future,
    std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
  {
    // TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete
    // inside a callback executed by an executor.

    // Check the future before entering the while loop.
    // If the future is already complete, don't try to spin.
    std::future_status status = future.wait_for(std::chrono::seconds(0));
    if (status == std::future_status::ready) {
      return FutureReturnCode::SUCCESS;
    }

    auto end_time = std::chrono::steady_clock::now();
    std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
      timeout);
    if (timeout_ns > std::chrono::nanoseconds::zero()) {
      end_time += timeout_ns;
    }
    std::chrono::nanoseconds timeout_left = timeout_ns;

    if (spinning.exchange(true)) {
      throw std::runtime_error("spin_until_future_complete() called while already spinning");
    }
    RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
    while (rclcpp::ok(this->context_) && spinning.load()) {
      // Do one item of work.
      spin_once_impl(timeout_left);

      // Check if the future is set, return SUCCESS if it is.
      status = future.wait_for(std::chrono::seconds(0));
      if (status == std::future_status::ready) {
        return FutureReturnCode::SUCCESS;
      }
      // If the original timeout is < 0, then this is blocking, never TIMEOUT.
      if (timeout_ns < std::chrono::nanoseconds::zero()) {
        continue;
      }
      // Otherwise check if we still have time to wait, return TIMEOUT if not.
      auto now = std::chrono::steady_clock::now();
      if (now >= end_time) {
        return FutureReturnCode::TIMEOUT;
      }
      // Subtract the elapsed time from the original timeout.
      timeout_left = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - now);
    }

    // The future did not complete before ok() returned false, return INTERRUPTED.
    return FutureReturnCode::INTERRUPTED;
  }


  RCLCPP_PUBLIC
  void
  cancel();


  RCLCPP_PUBLIC
  void
  set_memory_strategy(memory_strategy::MemoryStrategy::SharedPtr memory_strategy);


  RCLCPP_PUBLIC
  bool
  is_spinning();

protected:

  RCLCPP_PUBLIC
  void
  spin_node_once_nanoseconds(
    rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node,
    std::chrono::nanoseconds timeout);


  RCLCPP_PUBLIC
  void
  spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive);


  RCLCPP_PUBLIC
  void
  execute_any_executable(AnyExecutable & any_exec);


  RCLCPP_PUBLIC
  static void
  execute_subscription(
    rclcpp::SubscriptionBase::SharedPtr subscription);


  RCLCPP_PUBLIC
  static void
  execute_timer(rclcpp::TimerBase::SharedPtr timer);


  RCLCPP_PUBLIC
  static void
  execute_service(rclcpp::ServiceBase::SharedPtr service);


  RCLCPP_PUBLIC
  static void
  execute_client(rclcpp::ClientBase::SharedPtr client);


  RCLCPP_PUBLIC
  void
  wait_for_work(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));


  RCLCPP_PUBLIC
  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr
  get_node_by_group(
    const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
    rclcpp::CallbackGroup::SharedPtr group);


  RCLCPP_PUBLIC
  bool
  has_node(
    const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
    const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const;


  RCLCPP_PUBLIC
  rclcpp::CallbackGroup::SharedPtr
  get_group_by_timer(rclcpp::TimerBase::SharedPtr timer);


  RCLCPP_PUBLIC
  virtual void
  add_callback_group_to_map(
    rclcpp::CallbackGroup::SharedPtr group_ptr,
    rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
    WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
    bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_);


  RCLCPP_PUBLIC
  virtual void
  remove_callback_group_from_map(
    rclcpp::CallbackGroup::SharedPtr group_ptr,
    WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
    bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_);


  RCLCPP_PUBLIC
  bool
  get_next_ready_executable(AnyExecutable & any_executable);


  RCLCPP_PUBLIC
  bool
  get_next_ready_executable_from_map(
    AnyExecutable & any_executable,
    const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes);


  RCLCPP_PUBLIC
  bool
  get_next_executable(
    AnyExecutable & any_executable,
    std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));


  RCLCPP_PUBLIC
  virtual void
  add_callback_groups_from_nodes_associated_to_executor() RCPPUTILS_TSA_REQUIRES(mutex_);

  std::atomic_bool spinning;

  std::shared_ptr<rclcpp::GuardCondition> interrupt_guard_condition_;

  std::shared_ptr<rclcpp::GuardCondition> shutdown_guard_condition_;

  rcl_wait_set_t wait_set_ = rcl_get_zero_initialized_wait_set();

  // Mutex to protect the subsequent memory_strategy_.
  mutable std::mutex mutex_;

  memory_strategy::MemoryStrategy::SharedPtr
  memory_strategy_ RCPPUTILS_TSA_PT_GUARDED_BY(mutex_);

  std::shared_ptr<rclcpp::Context> context_;

  RCLCPP_DISABLE_COPY(Executor)

  RCLCPP_PUBLIC
  virtual void
  spin_once_impl(std::chrono::nanoseconds timeout);

  typedef std::map<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
      const rclcpp::GuardCondition *,
      std::owner_less<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>>
    WeakNodesToGuardConditionsMap;

  typedef std::map<rclcpp::CallbackGroup::WeakPtr,
      const rclcpp::GuardCondition *,
      std::owner_less<rclcpp::CallbackGroup::WeakPtr>>
    WeakCallbackGroupsToGuardConditionsMap;

  WeakNodesToGuardConditionsMap
  weak_nodes_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

  WeakCallbackGroupsToGuardConditionsMap
  weak_groups_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

  WeakCallbackGroupsToNodesMap
  weak_groups_associated_with_executor_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

  WeakCallbackGroupsToNodesMap
  weak_groups_to_nodes_associated_with_executor_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

  WeakCallbackGroupsToNodesMap
  weak_groups_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

  std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>
  weak_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

  rclcpp::OnShutdownCallbackHandle shutdown_callback_handle_;

  std::unique_ptr<ExecutorImplementation> impl_;
};

}  // namespace rclcpp

#endif  // RCLCPP__EXECUTOR_HPP_