Program Listing for File client.hpp

Return to documentation for file (include/rclcpp/client.hpp)

// Copyright 2014 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__CLIENT_HPP_
#define RCLCPP__CLIENT_HPP_

#include <atomic>
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <optional>
#include <sstream>
#include <string>
#include <tuple>
#include <unordered_map>
#include <utility>
#include <variant>
#include <vector>

#include "rcl/client.h"
#include "rcl/error_handling.h"
#include "rcl/event_callback.h"
#include "rcl/service_introspection.h"
#include "rcl/wait.h"

#include "rclcpp/clock.hpp"
#include "rclcpp/detail/cpp_callback_trampoline.hpp"
#include "rclcpp/exceptions.hpp"
#include "rclcpp/expand_topic_or_service_name.hpp"
#include "rclcpp/function_traits.hpp"
#include "rclcpp/logging.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/node_interfaces/node_graph_interface.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/utilities.hpp"
#include "rclcpp/visibility_control.hpp"

#include "rmw/error_handling.h"
#include "rmw/impl/cpp/demangle.hpp"
#include "rmw/rmw.h"

namespace rclcpp
{

namespace detail
{
template<typename FutureT>
struct FutureAndRequestId
{
  FutureT future;
  int64_t request_id;

  FutureAndRequestId(FutureT impl, int64_t req_id)
  : future(std::move(impl)), request_id(req_id)
  {}

  operator FutureT &() {return this->future;}


  [[deprecated("FutureAndRequestId: use .future instead of an implicit conversion")]]
  operator FutureT() {return this->future;}

  // delegate future like methods in the std::future impl_

  auto get() {return this->future.get();}
  bool valid() const noexcept {return this->future.valid();}
  void wait() const {return this->future.wait();}
  template<class Rep, class Period>
  std::future_status wait_for(
    const std::chrono::duration<Rep, Period> & timeout_duration) const
  {
    return this->future.wait_for(timeout_duration);
  }
  template<class Clock, class Duration>
  std::future_status wait_until(
    const std::chrono::time_point<Clock, Duration> & timeout_time) const
  {
    return this->future.wait_until(timeout_time);
  }

  // Rule of five, we could use the rule of zero here, but better be explicit as some of the
  // methods are deleted.

  FutureAndRequestId(FutureAndRequestId && other) noexcept = default;
  FutureAndRequestId(const FutureAndRequestId & other) = delete;
  FutureAndRequestId & operator=(FutureAndRequestId && other) noexcept = default;
  FutureAndRequestId & operator=(const FutureAndRequestId & other) = delete;
  ~FutureAndRequestId() = default;
};

template<typename PendingRequestsT, typename AllocatorT = std::allocator<int64_t>>
size_t
prune_requests_older_than_impl(
  PendingRequestsT & pending_requests,
  std::mutex & pending_requests_mutex,
  std::chrono::time_point<std::chrono::system_clock> time_point,
  std::vector<int64_t, AllocatorT> * pruned_requests = nullptr)
{
  std::lock_guard guard(pending_requests_mutex);
  auto old_size = pending_requests.size();
  for (auto it = pending_requests.begin(), last = pending_requests.end(); it != last; ) {
    if (it->second.first < time_point) {
      if (pruned_requests) {
        pruned_requests->push_back(it->first);
      }
      it = pending_requests.erase(it);
    } else {
      ++it;
    }
  }
  return old_size - pending_requests.size();
}
}  // namespace detail

namespace node_interfaces
{
class NodeBaseInterface;
}  // namespace node_interfaces

class ClientBase
{
public:
  RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(ClientBase)

  RCLCPP_PUBLIC
  ClientBase(
    rclcpp::node_interfaces::NodeBaseInterface * node_base,
    rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph);

  RCLCPP_PUBLIC
  virtual ~ClientBase() = default;


  RCLCPP_PUBLIC
  bool
  take_type_erased_response(void * response_out, rmw_request_id_t & request_header_out);


  RCLCPP_PUBLIC
  const char *
  get_service_name() const;


  RCLCPP_PUBLIC
  std::shared_ptr<rcl_client_t>
  get_client_handle();


  RCLCPP_PUBLIC
  std::shared_ptr<const rcl_client_t>
  get_client_handle() const;


  RCLCPP_PUBLIC
  bool
  service_is_ready() const;


  template<typename RepT = int64_t, typename RatioT = std::milli>
  bool
  wait_for_service(
    std::chrono::duration<RepT, RatioT> timeout = std::chrono::duration<RepT, RatioT>(-1))
  {
    return wait_for_service_nanoseconds(
      std::chrono::duration_cast<std::chrono::nanoseconds>(timeout)
    );
  }

  virtual std::shared_ptr<void> create_response() = 0;
  virtual std::shared_ptr<rmw_request_id_t> create_request_header() = 0;
  virtual void handle_response(
    std::shared_ptr<rmw_request_id_t> request_header, std::shared_ptr<void> response) = 0;


  RCLCPP_PUBLIC
  bool
  exchange_in_use_by_wait_set_state(bool in_use_state);


  RCLCPP_PUBLIC
  rclcpp::QoS
  get_request_publisher_actual_qos() const;


  RCLCPP_PUBLIC
  rclcpp::QoS
  get_response_subscription_actual_qos() const;


  void
  set_on_new_response_callback(std::function<void(size_t)> callback)
  {
    if (!callback) {
      throw std::invalid_argument(
              "The callback passed to set_on_new_response_callback "
              "is not callable.");
    }

    auto new_callback =
      [callback, this](size_t number_of_responses) {
        try {
          callback(number_of_responses);
        } catch (const std::exception & exception) {
          RCLCPP_ERROR_STREAM(
            node_logger_,
            "rclcpp::ClientBase@" << this <<
              " caught " << rmw::impl::cpp::demangle(exception) <<
              " exception in user-provided callback for the 'on new response' callback: " <<
              exception.what());
        } catch (...) {
          RCLCPP_ERROR_STREAM(
            node_logger_,
            "rclcpp::ClientBase@" << this <<
              " caught unhandled exception in user-provided callback " <<
              "for the 'on new response' callback");
        }
      };

    std::lock_guard<std::recursive_mutex> lock(callback_mutex_);

    // Set it temporarily to the new callback, while we replace the old one.
    // This two-step setting, prevents a gap where the old std::function has
    // been replaced but the middleware hasn't been told about the new one yet.
    set_on_new_response_callback(
      rclcpp::detail::cpp_callback_trampoline<decltype(new_callback), const void *, size_t>,
      static_cast<const void *>(&new_callback));

    // Store the std::function to keep it in scope, also overwrites the existing one.
    on_new_response_callback_ = new_callback;

    // Set it again, now using the permanent storage.
    set_on_new_response_callback(
      rclcpp::detail::cpp_callback_trampoline<
        decltype(on_new_response_callback_), const void *, size_t>,
      static_cast<const void *>(&on_new_response_callback_));
  }

  void
  clear_on_new_response_callback()
  {
    std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
    if (on_new_response_callback_) {
      set_on_new_response_callback(nullptr, nullptr);
      on_new_response_callback_ = nullptr;
    }
  }

protected:
  RCLCPP_DISABLE_COPY(ClientBase)

  RCLCPP_PUBLIC
  bool
  wait_for_service_nanoseconds(std::chrono::nanoseconds timeout);

  RCLCPP_PUBLIC
  rcl_node_t *
  get_rcl_node_handle();

  RCLCPP_PUBLIC
  const rcl_node_t *
  get_rcl_node_handle() const;

  RCLCPP_PUBLIC
  void
  set_on_new_response_callback(rcl_event_callback_t callback, const void * user_data);

  rclcpp::node_interfaces::NodeGraphInterface::WeakPtr node_graph_;
  std::shared_ptr<rcl_node_t> node_handle_;
  std::shared_ptr<rclcpp::Context> context_;
  rclcpp::Logger node_logger_;

  std::recursive_mutex callback_mutex_;
  // It is important to declare on_new_response_callback_ before
  // client_handle_, so on destruction the client is
  // destroyed first. Otherwise, the rmw client callback
  // would point briefly to a destroyed function.
  std::function<void(size_t)> on_new_response_callback_{nullptr};
  // Declare client_handle_ after callback
  std::shared_ptr<rcl_client_t> client_handle_;

  std::atomic<bool> in_use_by_wait_set_{false};
};

template<typename ServiceT>
class Client : public ClientBase
{
public:
  using Request = typename ServiceT::Request;
  using Response = typename ServiceT::Response;

  using SharedRequest = typename ServiceT::Request::SharedPtr;
  using SharedResponse = typename ServiceT::Response::SharedPtr;

  using Promise = std::promise<SharedResponse>;
  using PromiseWithRequest = std::promise<std::pair<SharedRequest, SharedResponse>>;

  using SharedPromise = std::shared_ptr<Promise>;
  using SharedPromiseWithRequest = std::shared_ptr<PromiseWithRequest>;

  using Future = std::future<SharedResponse>;
  using SharedFuture = std::shared_future<SharedResponse>;
  using SharedFutureWithRequest = std::shared_future<std::pair<SharedRequest, SharedResponse>>;

  using CallbackType = std::function<void (SharedFuture)>;
  using CallbackWithRequestType = std::function<void (SharedFutureWithRequest)>;

  RCLCPP_SMART_PTR_DEFINITIONS(Client)


  struct FutureAndRequestId
    : detail::FutureAndRequestId<std::future<SharedResponse>>
  {
    using detail::FutureAndRequestId<std::future<SharedResponse>>::FutureAndRequestId;


    [[deprecated(
      "FutureAndRequestId: use .future.share() instead of an implicit conversion")]]
    operator SharedFuture() {return this->future.share();}

    // delegate future like methods in the std::future impl_

    SharedFuture share() noexcept {return this->future.share();}
  };


  struct SharedFutureAndRequestId
    : detail::FutureAndRequestId<std::shared_future<SharedResponse>>
  {
    using detail::FutureAndRequestId<std::shared_future<SharedResponse>>::FutureAndRequestId;
  };


  struct SharedFutureWithRequestAndRequestId
    : detail::FutureAndRequestId<std::shared_future<std::pair<SharedRequest, SharedResponse>>>
  {
    using detail::FutureAndRequestId<
      std::shared_future<std::pair<SharedRequest, SharedResponse>>
    >::FutureAndRequestId;
  };


  Client(
    rclcpp::node_interfaces::NodeBaseInterface * node_base,
    rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph,
    const std::string & service_name,
    rcl_client_options_t & client_options)
  : ClientBase(node_base, node_graph),
    srv_type_support_handle_(rosidl_typesupport_cpp::get_service_type_support_handle<ServiceT>())
  {
    rcl_ret_t ret = rcl_client_init(
      this->get_client_handle().get(),
      this->get_rcl_node_handle(),
      srv_type_support_handle_,
      service_name.c_str(),
      &client_options);
    if (ret != RCL_RET_OK) {
      if (ret == RCL_RET_SERVICE_NAME_INVALID) {
        auto rcl_node_handle = this->get_rcl_node_handle();
        // this will throw on any validation problem
        rcl_reset_error();
        expand_topic_or_service_name(
          service_name,
          rcl_node_get_name(rcl_node_handle),
          rcl_node_get_namespace(rcl_node_handle),
          true);
      }
      rclcpp::exceptions::throw_from_rcl_error(ret, "could not create client");
    }
  }

  virtual ~Client()
  {
  }


  bool
  take_response(typename ServiceT::Response & response_out, rmw_request_id_t & request_header_out)
  {
    return this->take_type_erased_response(&response_out, request_header_out);
  }


  std::shared_ptr<void>
  create_response() override
  {
    return std::shared_ptr<void>(new typename ServiceT::Response());
  }


  std::shared_ptr<rmw_request_id_t>
  create_request_header() override
  {
    // TODO(wjwwood): This should probably use rmw_request_id's allocator.
    //                (since it is a C type)
    return std::shared_ptr<rmw_request_id_t>(new rmw_request_id_t);
  }


  void
  handle_response(
    std::shared_ptr<rmw_request_id_t> request_header,
    std::shared_ptr<void> response) override
  {
    std::optional<CallbackInfoVariant>
    optional_pending_request = this->get_and_erase_pending_request(request_header->sequence_number);
    if (!optional_pending_request) {
      return;
    }
    auto & value = *optional_pending_request;
    auto typed_response = std::static_pointer_cast<typename ServiceT::Response>(
      std::move(response));
    if (std::holds_alternative<Promise>(value)) {
      auto & promise = std::get<Promise>(value);
      promise.set_value(std::move(typed_response));
    } else if (std::holds_alternative<CallbackTypeValueVariant>(value)) {
      auto & inner = std::get<CallbackTypeValueVariant>(value);
      const auto & callback = std::get<CallbackType>(inner);
      auto & promise = std::get<Promise>(inner);
      auto & future = std::get<SharedFuture>(inner);
      promise.set_value(std::move(typed_response));
      callback(std::move(future));
    } else if (std::holds_alternative<CallbackWithRequestTypeValueVariant>(value)) {
      auto & inner = std::get<CallbackWithRequestTypeValueVariant>(value);
      const auto & callback = std::get<CallbackWithRequestType>(inner);
      auto & promise = std::get<PromiseWithRequest>(inner);
      auto & future = std::get<SharedFutureWithRequest>(inner);
      auto & request = std::get<SharedRequest>(inner);
      promise.set_value(std::make_pair(std::move(request), std::move(typed_response)));
      callback(std::move(future));
    }
  }


  FutureAndRequestId
  async_send_request(SharedRequest request)
  {
    Promise promise;
    auto future = promise.get_future();
    auto req_id = async_send_request_impl(
      *request,
      std::move(promise));
    return FutureAndRequestId(std::move(future), req_id);
  }


  template<
    typename CallbackT,
    typename std::enable_if<
      rclcpp::function_traits::same_arguments<
        CallbackT,
        CallbackType
      >::value
    >::type * = nullptr
  >
  SharedFutureAndRequestId
  async_send_request(SharedRequest request, CallbackT && cb)
  {
    Promise promise;
    auto shared_future = promise.get_future().share();
    auto req_id = async_send_request_impl(
      *request,
      std::make_tuple(
        CallbackType{std::forward<CallbackT>(cb)},
        shared_future,
        std::move(promise)));
    return SharedFutureAndRequestId{std::move(shared_future), req_id};
  }


  template<
    typename CallbackT,
    typename std::enable_if<
      rclcpp::function_traits::same_arguments<
        CallbackT,
        CallbackWithRequestType
      >::value
    >::type * = nullptr
  >
  SharedFutureWithRequestAndRequestId
  async_send_request(SharedRequest request, CallbackT && cb)
  {
    PromiseWithRequest promise;
    auto shared_future = promise.get_future().share();
    auto req_id = async_send_request_impl(
      *request,
      std::make_tuple(
        CallbackWithRequestType{std::forward<CallbackT>(cb)},
        request,
        shared_future,
        std::move(promise)));
    return SharedFutureWithRequestAndRequestId{std::move(shared_future), req_id};
  }


  bool
  remove_pending_request(int64_t request_id)
  {
    std::lock_guard guard(pending_requests_mutex_);
    return pending_requests_.erase(request_id) != 0u;
  }


  bool
  remove_pending_request(const FutureAndRequestId & future)
  {
    return this->remove_pending_request(future.request_id);
  }


  bool
  remove_pending_request(const SharedFutureAndRequestId & future)
  {
    return this->remove_pending_request(future.request_id);
  }


  bool
  remove_pending_request(const SharedFutureWithRequestAndRequestId & future)
  {
    return this->remove_pending_request(future.request_id);
  }


  size_t
  prune_pending_requests()
  {
    std::lock_guard guard(pending_requests_mutex_);
    auto ret = pending_requests_.size();
    pending_requests_.clear();
    return ret;
  }


  template<typename AllocatorT = std::allocator<int64_t>>
  size_t
  prune_requests_older_than(
    std::chrono::time_point<std::chrono::system_clock> time_point,
    std::vector<int64_t, AllocatorT> * pruned_requests = nullptr)
  {
    return detail::prune_requests_older_than_impl(
      pending_requests_,
      pending_requests_mutex_,
      time_point,
      pruned_requests);
  }


  void
  configure_introspection(
    Clock::SharedPtr clock, const QoS & qos_service_event_pub,
    rcl_service_introspection_state_t introspection_state)
  {
    rcl_publisher_options_t pub_opts = rcl_publisher_get_default_options();
    pub_opts.qos = qos_service_event_pub.get_rmw_qos_profile();

    rcl_ret_t ret = rcl_client_configure_service_introspection(
      client_handle_.get(),
      node_handle_.get(),
      clock->get_clock_handle(),
      srv_type_support_handle_,
      pub_opts,
      introspection_state);

    if (RCL_RET_OK != ret) {
      rclcpp::exceptions::throw_from_rcl_error(ret, "failed to configure client introspection");
    }
  }

protected:
  using CallbackTypeValueVariant = std::tuple<CallbackType, SharedFuture, Promise>;
  using CallbackWithRequestTypeValueVariant = std::tuple<
    CallbackWithRequestType, SharedRequest, SharedFutureWithRequest, PromiseWithRequest>;

  using CallbackInfoVariant = std::variant<
    std::promise<SharedResponse>,
    CallbackTypeValueVariant,
    CallbackWithRequestTypeValueVariant>;

  int64_t
  async_send_request_impl(const Request & request, CallbackInfoVariant value)
  {
    int64_t sequence_number;
    std::lock_guard<std::mutex> lock(pending_requests_mutex_);
    rcl_ret_t ret = rcl_send_request(get_client_handle().get(), &request, &sequence_number);
    if (RCL_RET_OK != ret) {
      rclcpp::exceptions::throw_from_rcl_error(ret, "failed to send request");
    }
    pending_requests_.try_emplace(
      sequence_number,
      std::make_pair(std::chrono::system_clock::now(), std::move(value)));
    return sequence_number;
  }

  std::optional<CallbackInfoVariant>
  get_and_erase_pending_request(int64_t request_number)
  {
    std::unique_lock<std::mutex> lock(pending_requests_mutex_);
    auto it = this->pending_requests_.find(request_number);
    if (it == this->pending_requests_.end()) {
      RCUTILS_LOG_DEBUG_NAMED(
        "rclcpp",
        "Received invalid sequence number. Ignoring...");
      return std::nullopt;
    }
    std::optional<CallbackInfoVariant> value = std::move(it->second.second);
    this->pending_requests_.erase(request_number);
    return value;
  }

  RCLCPP_DISABLE_COPY(Client)

  std::unordered_map<
    int64_t,
    std::pair<
      std::chrono::time_point<std::chrono::system_clock>,
      CallbackInfoVariant>>
  pending_requests_;
  std::mutex pending_requests_mutex_;

private:
  const rosidl_service_type_support_t * srv_type_support_handle_;
};

}  // namespace rclcpp

#endif  // RCLCPP__CLIENT_HPP_