Program Listing for File client.hpp

Return to documentation for file (include/rclcpp_action/client.hpp)

// Copyright 2018 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP_ACTION__CLIENT_HPP_
#define RCLCPP_ACTION__CLIENT_HPP_

#include <algorithm>
#include <chrono>
#include <functional>
#include <future>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <utility>

#include "rcl/event_callback.h"

#include "rclcpp/exceptions.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/node_interfaces/node_logging_interface.hpp"
#include "rclcpp/node_interfaces/node_graph_interface.hpp"
#include "rclcpp/logger.hpp"
#include "rclcpp/time.hpp"
#include "rclcpp/waitable.hpp"

#include "rosidl_runtime_c/action_type_support_struct.h"
#include "rosidl_typesupport_cpp/action_type_support.hpp"

#include "rclcpp_action/client_goal_handle.hpp"
#include "rclcpp_action/exceptions.hpp"
#include "rclcpp_action/types.hpp"
#include "rclcpp_action/visibility_control.hpp"


namespace rclcpp_action
{
// Forward declaration
class ClientBaseImpl;


class ClientBase : public rclcpp::Waitable
{
public:
  RCLCPP_ACTION_PUBLIC
  virtual ~ClientBase();

  enum class EntityType : std::size_t
  {
    GoalClient,
    ResultClient,
    CancelClient,
    FeedbackSubscription,
    StatusSubscription,
  };

  RCLCPP_ACTION_PUBLIC
  bool
  action_server_is_ready() const;

  template<typename RepT = int64_t, typename RatioT = std::milli>
  bool
  wait_for_action_server(
    std::chrono::duration<RepT, RatioT> timeout = std::chrono::duration<RepT, RatioT>(-1))
  {
    return wait_for_action_server_nanoseconds(
      std::chrono::duration_cast<std::chrono::nanoseconds>(timeout)
    );
  }

  // -------------
  // Waitables API

  RCLCPP_ACTION_PUBLIC
  size_t
  get_number_of_ready_subscriptions() override;

  RCLCPP_ACTION_PUBLIC
  size_t
  get_number_of_ready_timers() override;

  RCLCPP_ACTION_PUBLIC
  size_t
  get_number_of_ready_clients() override;

  RCLCPP_ACTION_PUBLIC
  size_t
  get_number_of_ready_services() override;

  RCLCPP_ACTION_PUBLIC
  size_t
  get_number_of_ready_guard_conditions() override;

  RCLCPP_ACTION_PUBLIC
  void
  add_to_wait_set(rcl_wait_set_t * wait_set) override;

  RCLCPP_ACTION_PUBLIC
  bool
  is_ready(rcl_wait_set_t * wait_set) override;

  RCLCPP_ACTION_PUBLIC
  std::shared_ptr<void>
  take_data() override;

  RCLCPP_ACTION_PUBLIC
  std::shared_ptr<void>
  take_data_by_entity_id(size_t id) override;

  RCLCPP_ACTION_PUBLIC
  void
  execute(std::shared_ptr<void> & data) override;


  RCLCPP_ACTION_PUBLIC
  void
  set_on_ready_callback(std::function<void(size_t, int)> callback) override;

  RCLCPP_ACTION_PUBLIC
  void
  clear_on_ready_callback() override;

  // End Waitables API
  // -----------------

protected:
  RCLCPP_ACTION_PUBLIC
  ClientBase(
    rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base,
    rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph,
    rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr node_logging,
    const std::string & action_name,
    const rosidl_action_type_support_t * type_support,
    const rcl_action_client_options_t & options);

  RCLCPP_ACTION_PUBLIC
  bool
  wait_for_action_server_nanoseconds(std::chrono::nanoseconds timeout);

  // -----------------------------------------------------
  // API for communication between ClientBase and Client<>
  using ResponseCallback = std::function<void (std::shared_ptr<void> response)>;

  RCLCPP_ACTION_PUBLIC
  rclcpp::Logger get_logger();

  RCLCPP_ACTION_PUBLIC
  virtual
  GoalUUID
  generate_goal_id();

  RCLCPP_ACTION_PUBLIC
  virtual
  void
  send_goal_request(
    std::shared_ptr<void> request,
    ResponseCallback callback);

  RCLCPP_ACTION_PUBLIC
  virtual
  void
  send_result_request(
    std::shared_ptr<void> request,
    ResponseCallback callback);

  RCLCPP_ACTION_PUBLIC
  virtual
  void
  send_cancel_request(
    std::shared_ptr<void> request,
    ResponseCallback callback);

  virtual
  std::shared_ptr<void>
  create_goal_response() const = 0;

  RCLCPP_ACTION_PUBLIC
  virtual
  void
  handle_goal_response(
    const rmw_request_id_t & response_header,
    std::shared_ptr<void> goal_response);

  virtual
  std::shared_ptr<void>
  create_result_response() const = 0;

  RCLCPP_ACTION_PUBLIC
  virtual
  void
  handle_result_response(
    const rmw_request_id_t & response_header,
    std::shared_ptr<void> result_response);

  virtual
  std::shared_ptr<void>
  create_cancel_response() const = 0;

  RCLCPP_ACTION_PUBLIC
  virtual
  void
  handle_cancel_response(
    const rmw_request_id_t & response_header,
    std::shared_ptr<void> cancel_response);

  virtual
  std::shared_ptr<void>
  create_feedback_message() const = 0;

  virtual
  void
  handle_feedback_message(std::shared_ptr<void> message) = 0;

  virtual
  std::shared_ptr<void>
  create_status_message() const = 0;

  virtual
  void
  handle_status_message(std::shared_ptr<void> message) = 0;

  // End API for communication between ClientBase and Client<>
  // ---------------------------------------------------------

  RCLCPP_ACTION_PUBLIC
  void
  set_on_ready_callback(
    EntityType entity_type,
    rcl_event_callback_t callback,
    const void * user_data);

  // Mutex to protect the callbacks storage.
  std::recursive_mutex listener_mutex_;
  // Storage for std::function callbacks to keep them in scope
  std::unordered_map<EntityType, std::function<void(size_t)>> entity_type_to_on_ready_callback_;

private:
  std::unique_ptr<ClientBaseImpl> pimpl_;

  RCLCPP_ACTION_PUBLIC
  void
  set_callback_to_entity(
    EntityType entity_type,
    std::function<void(size_t, int)> callback);

  bool on_ready_callback_set_{false};
};


template<typename ActionT>
class Client : public ClientBase
{
public:
  RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(Client<ActionT>)

  using Goal = typename ActionT::Goal;
  using Feedback = typename ActionT::Feedback;
  using GoalHandle = ClientGoalHandle<ActionT>;
  using WrappedResult = typename GoalHandle::WrappedResult;
  using GoalResponseCallback = std::function<void (typename GoalHandle::SharedPtr)>;
  using FeedbackCallback = typename GoalHandle::FeedbackCallback;
  using ResultCallback = typename GoalHandle::ResultCallback;
  using CancelRequest = typename ActionT::Impl::CancelGoalService::Request;
  using CancelResponse = typename ActionT::Impl::CancelGoalService::Response;
  using CancelCallback = std::function<void (typename CancelResponse::SharedPtr)>;


  struct SendGoalOptions
  {
    SendGoalOptions()
    : goal_response_callback(nullptr),
      feedback_callback(nullptr),
      result_callback(nullptr)
    {
    }


    GoalResponseCallback goal_response_callback;

    FeedbackCallback feedback_callback;

    ResultCallback result_callback;
  };


  Client(
    rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base,
    rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph,
    rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr node_logging,
    const std::string & action_name,
    const rcl_action_client_options_t & client_options = rcl_action_client_get_default_options()
  )
  : ClientBase(
      node_base, node_graph, node_logging, action_name,
      rosidl_typesupport_cpp::get_action_type_support_handle<ActionT>(),
      client_options)
  {
  }


  std::shared_future<typename GoalHandle::SharedPtr>
  async_send_goal(const Goal & goal, const SendGoalOptions & options = SendGoalOptions())
  {
    // Put promise in the heap to move it around.
    auto promise = std::make_shared<std::promise<typename GoalHandle::SharedPtr>>();
    std::shared_future<typename GoalHandle::SharedPtr> future(promise->get_future());
    using GoalRequest = typename ActionT::Impl::SendGoalService::Request;
    auto goal_request = std::make_shared<GoalRequest>();
    goal_request->goal_id.uuid = this->generate_goal_id();
    goal_request->goal = goal;
    this->send_goal_request(
      std::static_pointer_cast<void>(goal_request),
      [this, goal_request, options, promise](std::shared_ptr<void> response) mutable
      {
        using GoalResponse = typename ActionT::Impl::SendGoalService::Response;
        auto goal_response = std::static_pointer_cast<GoalResponse>(response);
        if (!goal_response->accepted) {
          promise->set_value(nullptr);
          if (options.goal_response_callback) {
            options.goal_response_callback(nullptr);
          }
          return;
        }
        GoalInfo goal_info;
        goal_info.goal_id.uuid = goal_request->goal_id.uuid;
        goal_info.stamp = goal_response->stamp;
        // Do not use std::make_shared as friendship cannot be forwarded.
        std::shared_ptr<GoalHandle> goal_handle(
          new GoalHandle(goal_info, options.feedback_callback, options.result_callback));
        {
          std::lock_guard<std::mutex> guard(goal_handles_mutex_);
          goal_handles_[goal_handle->get_goal_id()] = goal_handle;
        }
        promise->set_value(goal_handle);
        if (options.goal_response_callback) {
          options.goal_response_callback(goal_handle);
        }

        if (options.result_callback) {
          this->make_result_aware(goal_handle);
        }
      });

    // TODO(jacobperron): Encapsulate into it's own function and
    //                    consider exposing an option to disable this cleanup
    // To prevent the list from growing out of control, forget about any goals
    // with no more user references
    {
      std::lock_guard<std::mutex> guard(goal_handles_mutex_);
      auto goal_handle_it = goal_handles_.begin();
      while (goal_handle_it != goal_handles_.end()) {
        if (!goal_handle_it->second.lock()) {
          RCLCPP_DEBUG(
            this->get_logger(),
            "Dropping weak reference to goal handle during send_goal()");
          goal_handle_it = goal_handles_.erase(goal_handle_it);
        } else {
          ++goal_handle_it;
        }
      }
    }

    return future;
  }


  std::shared_future<WrappedResult>
  async_get_result(
    typename GoalHandle::SharedPtr goal_handle,
    ResultCallback result_callback = nullptr)
  {
    std::lock_guard<std::mutex> lock(goal_handles_mutex_);
    if (goal_handles_.count(goal_handle->get_goal_id()) == 0) {
      throw exceptions::UnknownGoalHandleError();
    }
    if (goal_handle->is_invalidated()) {
      // This case can happen if there was a failure to send the result request
      // during the goal response callback
      throw goal_handle->invalidate_exception_;
    }
    if (result_callback) {
      // This will override any previously registered callback
      goal_handle->set_result_callback(result_callback);
    }
    this->make_result_aware(goal_handle);
    return goal_handle->async_get_result();
  }


  std::shared_future<typename CancelResponse::SharedPtr>
  async_cancel_goal(
    typename GoalHandle::SharedPtr goal_handle,
    CancelCallback cancel_callback = nullptr)
  {
    std::lock_guard<std::mutex> lock(goal_handles_mutex_);
    if (goal_handles_.count(goal_handle->get_goal_id()) == 0) {
      throw exceptions::UnknownGoalHandleError();
    }
    auto cancel_request = std::make_shared<CancelRequest>();
    // cancel_request->goal_info.goal_id = goal_handle->get_goal_id();
    cancel_request->goal_info.goal_id.uuid = goal_handle->get_goal_id();
    return async_cancel(cancel_request, cancel_callback);
  }


  std::shared_future<typename CancelResponse::SharedPtr>
  async_cancel_all_goals(CancelCallback cancel_callback = nullptr)
  {
    auto cancel_request = std::make_shared<CancelRequest>();
    // std::fill(cancel_request->goal_info.goal_id.uuid, 0u);
    std::fill(
      cancel_request->goal_info.goal_id.uuid.begin(),
      cancel_request->goal_info.goal_id.uuid.end(), 0u);
    return async_cancel(cancel_request, cancel_callback);
  }


  std::shared_future<typename CancelResponse::SharedPtr>
  async_cancel_goals_before(
    const rclcpp::Time & stamp,
    CancelCallback cancel_callback = nullptr)
  {
    auto cancel_request = std::make_shared<CancelRequest>();
    // std::fill(cancel_request->goal_info.goal_id.uuid, 0u);
    std::fill(
      cancel_request->goal_info.goal_id.uuid.begin(),
      cancel_request->goal_info.goal_id.uuid.end(), 0u);
    cancel_request->goal_info.stamp = stamp;
    return async_cancel(cancel_request, cancel_callback);
  }

  virtual
  ~Client()
  {
    std::lock_guard<std::mutex> guard(goal_handles_mutex_);
    auto it = goal_handles_.begin();
    while (it != goal_handles_.end()) {
      typename GoalHandle::SharedPtr goal_handle = it->second.lock();
      if (goal_handle) {
        goal_handle->invalidate(exceptions::UnawareGoalHandleError());
      }
      it = goal_handles_.erase(it);
    }
  }

private:
  std::shared_ptr<void>
  create_goal_response() const override
  {
    using GoalResponse = typename ActionT::Impl::SendGoalService::Response;
    return std::shared_ptr<void>(new GoalResponse());
  }

  std::shared_ptr<void>
  create_result_response() const override
  {
    using GoalResultResponse = typename ActionT::Impl::GetResultService::Response;
    return std::shared_ptr<void>(new GoalResultResponse());
  }

  std::shared_ptr<void>
  create_cancel_response() const override
  {
    return std::shared_ptr<void>(new CancelResponse());
  }

  std::shared_ptr<void>
  create_feedback_message() const override
  {
    using FeedbackMessage = typename ActionT::Impl::FeedbackMessage;
    return std::shared_ptr<void>(new FeedbackMessage());
  }

  void
  handle_feedback_message(std::shared_ptr<void> message) override
  {
    std::lock_guard<std::mutex> guard(goal_handles_mutex_);
    using FeedbackMessage = typename ActionT::Impl::FeedbackMessage;
    typename FeedbackMessage::SharedPtr feedback_message =
      std::static_pointer_cast<FeedbackMessage>(message);
    const GoalUUID & goal_id = feedback_message->goal_id.uuid;
    if (goal_handles_.count(goal_id) == 0) {
      RCLCPP_DEBUG(
        this->get_logger(),
        "Received feedback for unknown goal. Ignoring...");
      return;
    }
    typename GoalHandle::SharedPtr goal_handle = goal_handles_[goal_id].lock();
    // Forget about the goal if there are no more user references
    if (!goal_handle) {
      RCLCPP_DEBUG(
        this->get_logger(),
        "Dropping weak reference to goal handle during feedback callback");
      goal_handles_.erase(goal_id);
      return;
    }
    auto feedback = std::make_shared<Feedback>();
    *feedback = feedback_message->feedback;
    goal_handle->call_feedback_callback(goal_handle, feedback);
  }

  std::shared_ptr<void>
  create_status_message() const override
  {
    using GoalStatusMessage = typename ActionT::Impl::GoalStatusMessage;
    return std::shared_ptr<void>(new GoalStatusMessage());
  }

  void
  handle_status_message(std::shared_ptr<void> message) override
  {
    std::lock_guard<std::mutex> guard(goal_handles_mutex_);
    using GoalStatusMessage = typename ActionT::Impl::GoalStatusMessage;
    auto status_message = std::static_pointer_cast<GoalStatusMessage>(message);
    for (const GoalStatus & status : status_message->status_list) {
      const GoalUUID & goal_id = status.goal_info.goal_id.uuid;
      if (goal_handles_.count(goal_id) == 0) {
        RCLCPP_DEBUG(
          this->get_logger(),
          "Received status for unknown goal. Ignoring...");
        continue;
      }
      typename GoalHandle::SharedPtr goal_handle = goal_handles_[goal_id].lock();
      // Forget about the goal if there are no more user references
      if (!goal_handle) {
        RCLCPP_DEBUG(
          this->get_logger(),
          "Dropping weak reference to goal handle during status callback");
        goal_handles_.erase(goal_id);
        continue;
      }
      goal_handle->set_status(status.status);
    }
  }

  void
  make_result_aware(typename GoalHandle::SharedPtr goal_handle)
  {
    // Avoid making more than one request
    if (goal_handle->set_result_awareness(true)) {
      return;
    }
    using GoalResultRequest = typename ActionT::Impl::GetResultService::Request;
    auto goal_result_request = std::make_shared<GoalResultRequest>();
    goal_result_request->goal_id.uuid = goal_handle->get_goal_id();
    try {
      this->send_result_request(
        std::static_pointer_cast<void>(goal_result_request),
        [goal_handle, this](std::shared_ptr<void> response) mutable
        {
          // Wrap the response in a struct with the fields a user cares about
          WrappedResult wrapped_result;
          using GoalResultResponse = typename ActionT::Impl::GetResultService::Response;
          auto result_response = std::static_pointer_cast<GoalResultResponse>(response);
          wrapped_result.result = std::make_shared<typename ActionT::Result>();
          *wrapped_result.result = result_response->result;
          wrapped_result.goal_id = goal_handle->get_goal_id();
          wrapped_result.code = static_cast<ResultCode>(result_response->status);
          goal_handle->set_result(wrapped_result);
          std::lock_guard<std::mutex> lock(goal_handles_mutex_);
          goal_handles_.erase(goal_handle->get_goal_id());
        });
    } catch (rclcpp::exceptions::RCLError & ex) {
      // This will cause an exception when the user tries to access the result
      goal_handle->invalidate(exceptions::UnawareGoalHandleError(ex.message));
    }
  }

  std::shared_future<typename CancelResponse::SharedPtr>
  async_cancel(
    typename CancelRequest::SharedPtr cancel_request,
    CancelCallback cancel_callback = nullptr)
  {
    // Put promise in the heap to move it around.
    auto promise = std::make_shared<std::promise<typename CancelResponse::SharedPtr>>();
    std::shared_future<typename CancelResponse::SharedPtr> future(promise->get_future());
    this->send_cancel_request(
      std::static_pointer_cast<void>(cancel_request),
      [cancel_callback, promise](std::shared_ptr<void> response) mutable
      {
        auto cancel_response = std::static_pointer_cast<CancelResponse>(response);
        promise->set_value(cancel_response);
        if (cancel_callback) {
          cancel_callback(cancel_response);
        }
      });
    return future;
  }

  std::map<GoalUUID, typename GoalHandle::WeakPtr> goal_handles_;
  std::mutex goal_handles_mutex_;
};
}  // namespace rclcpp_action

#endif  // RCLCPP_ACTION__CLIENT_HPP_