Program Listing for File service.hpp

Return to documentation for file (include/rclcpp/service.hpp)

// Copyright 2014 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__SERVICE_HPP_
#define RCLCPP__SERVICE_HPP_

#include <atomic>
#include <functional>
#include <iostream>
#include <memory>
#include <mutex>
#include <sstream>
#include <string>

#include "rcl/error_handling.h"
#include "rcl/event_callback.h"
#include "rcl/service.h"
#include "rcl/service_introspection.h"

#include "rmw/error_handling.h"
#include "rmw/impl/cpp/demangle.hpp"
#include "rmw/rmw.h"

#include "tracetools/tracetools.h"

#include "rclcpp/any_service_callback.hpp"
#include "rclcpp/clock.hpp"
#include "rclcpp/detail/cpp_callback_trampoline.hpp"
#include "rclcpp/exceptions.hpp"
#include "rclcpp/expand_topic_or_service_name.hpp"
#include "rclcpp/logging.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/visibility_control.hpp"

namespace rclcpp
{

class ServiceBase
{
public:
  RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(ServiceBase)

  RCLCPP_PUBLIC
  explicit ServiceBase(std::shared_ptr<rcl_node_t> node_handle);

  RCLCPP_PUBLIC
  virtual ~ServiceBase() = default;


  RCLCPP_PUBLIC
  const char *
  get_service_name();


  RCLCPP_PUBLIC
  std::shared_ptr<rcl_service_t>
  get_service_handle();


  RCLCPP_PUBLIC
  std::shared_ptr<const rcl_service_t>
  get_service_handle() const;


  RCLCPP_PUBLIC
  bool
  take_type_erased_request(void * request_out, rmw_request_id_t & request_id_out);

  virtual
  std::shared_ptr<void>
  create_request() = 0;

  virtual
  std::shared_ptr<rmw_request_id_t>
  create_request_header() = 0;

  virtual
  void
  handle_request(
    std::shared_ptr<rmw_request_id_t> request_header,
    std::shared_ptr<void> request) = 0;


  RCLCPP_PUBLIC
  bool
  exchange_in_use_by_wait_set_state(bool in_use_state);


  RCLCPP_PUBLIC
  rclcpp::QoS
  get_response_publisher_actual_qos() const;


  RCLCPP_PUBLIC
  rclcpp::QoS
  get_request_subscription_actual_qos() const;


  void
  set_on_new_request_callback(std::function<void(size_t)> callback)
  {
    if (!callback) {
      throw std::invalid_argument(
              "The callback passed to set_on_new_request_callback "
              "is not callable.");
    }

    auto new_callback =
      [callback, this](size_t number_of_requests) {
        try {
          callback(number_of_requests);
        } catch (const std::exception & exception) {
          RCLCPP_ERROR_STREAM(
            node_logger_,
            "rclcpp::ServiceBase@" << this <<
              " caught " << rmw::impl::cpp::demangle(exception) <<
              " exception in user-provided callback for the 'on new request' callback: " <<
              exception.what());
        } catch (...) {
          RCLCPP_ERROR_STREAM(
            node_logger_,
            "rclcpp::ServiceBase@" << this <<
              " caught unhandled exception in user-provided callback " <<
              "for the 'on new request' callback");
        }
      };

    std::lock_guard<std::recursive_mutex> lock(callback_mutex_);

    // Set it temporarily to the new callback, while we replace the old one.
    // This two-step setting, prevents a gap where the old std::function has
    // been replaced but the middleware hasn't been told about the new one yet.
    set_on_new_request_callback(
      rclcpp::detail::cpp_callback_trampoline<decltype(new_callback), const void *, size_t>,
      static_cast<const void *>(&new_callback));

    // Store the std::function to keep it in scope, also overwrites the existing one.
    on_new_request_callback_ = new_callback;

    // Set it again, now using the permanent storage.
    set_on_new_request_callback(
      rclcpp::detail::cpp_callback_trampoline<
        decltype(on_new_request_callback_), const void *, size_t>,
      static_cast<const void *>(&on_new_request_callback_));
  }

  void
  clear_on_new_request_callback()
  {
    std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
    if (on_new_request_callback_) {
      set_on_new_request_callback(nullptr, nullptr);
      on_new_request_callback_ = nullptr;
    }
  }

protected:
  RCLCPP_DISABLE_COPY(ServiceBase)

  RCLCPP_PUBLIC
  rcl_node_t *
  get_rcl_node_handle();

  RCLCPP_PUBLIC
  const rcl_node_t *
  get_rcl_node_handle() const;

  RCLCPP_PUBLIC
  void
  set_on_new_request_callback(rcl_event_callback_t callback, const void * user_data);

  std::shared_ptr<rcl_node_t> node_handle_;

  std::shared_ptr<rcl_service_t> service_handle_;
  bool owns_rcl_handle_ = true;

  rclcpp::Logger node_logger_;

  std::atomic<bool> in_use_by_wait_set_{false};

  std::recursive_mutex callback_mutex_;
  std::function<void(size_t)> on_new_request_callback_{nullptr};
};

template<typename ServiceT>
class Service
  : public ServiceBase,
  public std::enable_shared_from_this<Service<ServiceT>>
{
public:
  using CallbackType = std::function<
    void (
      const std::shared_ptr<typename ServiceT::Request>,
      std::shared_ptr<typename ServiceT::Response>)>;

  using CallbackWithHeaderType = std::function<
    void (
      const std::shared_ptr<rmw_request_id_t>,
      const std::shared_ptr<typename ServiceT::Request>,
      std::shared_ptr<typename ServiceT::Response>)>;
  RCLCPP_SMART_PTR_DEFINITIONS(Service)


  Service(
    std::shared_ptr<rcl_node_t> node_handle,
    const std::string & service_name,
    AnyServiceCallback<ServiceT> any_callback,
    rcl_service_options_t & service_options)
  : ServiceBase(node_handle), any_callback_(any_callback),
    srv_type_support_handle_(rosidl_typesupport_cpp::get_service_type_support_handle<ServiceT>())
  {
    // rcl does the static memory allocation here
    service_handle_ = std::shared_ptr<rcl_service_t>(
      new rcl_service_t, [handle = node_handle_, service_name](rcl_service_t * service)
      {
        if (rcl_service_fini(service, handle.get()) != RCL_RET_OK) {
          RCLCPP_ERROR(
            rclcpp::get_node_logger(handle.get()).get_child("rclcpp"),
            "Error in destruction of rcl service handle: %s",
            rcl_get_error_string().str);
          rcl_reset_error();
        }
        delete service;
      });
    *service_handle_.get() = rcl_get_zero_initialized_service();

    rcl_ret_t ret = rcl_service_init(
      service_handle_.get(),
      node_handle.get(),
      srv_type_support_handle_,
      service_name.c_str(),
      &service_options);
    if (ret != RCL_RET_OK) {
      if (ret == RCL_RET_SERVICE_NAME_INVALID) {
        auto rcl_node_handle = get_rcl_node_handle();
        // this will throw on any validation problem
        rcl_reset_error();
        expand_topic_or_service_name(
          service_name,
          rcl_node_get_name(rcl_node_handle),
          rcl_node_get_namespace(rcl_node_handle),
          true);
      }

      rclcpp::exceptions::throw_from_rcl_error(ret, "could not create service");
    }
    TRACEPOINT(
      rclcpp_service_callback_added,
      static_cast<const void *>(get_service_handle().get()),
      static_cast<const void *>(&any_callback_));
#ifndef TRACETOOLS_DISABLED
    any_callback_.register_callback_for_tracing();
#endif
  }


  Service(
    std::shared_ptr<rcl_node_t> node_handle,
    std::shared_ptr<rcl_service_t> service_handle,
    AnyServiceCallback<ServiceT> any_callback)
  : ServiceBase(node_handle), any_callback_(any_callback),
    srv_type_support_handle_(rosidl_typesupport_cpp::get_service_type_support_handle<ServiceT>())
  {
    // check if service handle was initialized
    if (!rcl_service_is_valid(service_handle.get())) {
      // *INDENT-OFF* (prevent uncrustify from making unnecessary indents here)
      throw std::runtime_error(
        std::string("rcl_service_t in constructor argument must be initialized beforehand."));
      // *INDENT-ON*
    }

    service_handle_ = service_handle;
    TRACEPOINT(
      rclcpp_service_callback_added,
      static_cast<const void *>(get_service_handle().get()),
      static_cast<const void *>(&any_callback_));
#ifndef TRACETOOLS_DISABLED
    any_callback_.register_callback_for_tracing();
#endif
  }


  Service(
    std::shared_ptr<rcl_node_t> node_handle,
    rcl_service_t * service_handle,
    AnyServiceCallback<ServiceT> any_callback)
  : ServiceBase(node_handle), any_callback_(any_callback),
    srv_type_support_handle_(rosidl_typesupport_cpp::get_service_type_support_handle<ServiceT>())
  {
    // check if service handle was initialized
    if (!rcl_service_is_valid(service_handle)) {
      // *INDENT-OFF* (prevent uncrustify from making unnecessary indents here)
      throw std::runtime_error(
        std::string("rcl_service_t in constructor argument must be initialized beforehand."));
      // *INDENT-ON*
    }

    // In this case, rcl owns the service handle memory
    service_handle_ = std::shared_ptr<rcl_service_t>(new rcl_service_t);
    service_handle_->impl = service_handle->impl;
    TRACEPOINT(
      rclcpp_service_callback_added,
      static_cast<const void *>(get_service_handle().get()),
      static_cast<const void *>(&any_callback_));
#ifndef TRACETOOLS_DISABLED
    any_callback_.register_callback_for_tracing();
#endif
  }

  Service() = delete;

  virtual ~Service()
  {
  }


  bool
  take_request(typename ServiceT::Request & request_out, rmw_request_id_t & request_id_out)
  {
    return this->take_type_erased_request(&request_out, request_id_out);
  }

  std::shared_ptr<void>
  create_request() override
  {
    return std::make_shared<typename ServiceT::Request>();
  }

  std::shared_ptr<rmw_request_id_t>
  create_request_header() override
  {
    return std::make_shared<rmw_request_id_t>();
  }

  void
  handle_request(
    std::shared_ptr<rmw_request_id_t> request_header,
    std::shared_ptr<void> request) override
  {
    auto typed_request = std::static_pointer_cast<typename ServiceT::Request>(request);
    auto response = any_callback_.dispatch(this->shared_from_this(), request_header, typed_request);
    if (response) {
      send_response(*request_header, *response);
    }
  }

  void
  send_response(rmw_request_id_t & req_id, typename ServiceT::Response & response)
  {
    rcl_ret_t ret = rcl_send_response(get_service_handle().get(), &req_id, &response);

    if (ret == RCL_RET_TIMEOUT) {
      RCLCPP_WARN(
        node_logger_.get_child("rclcpp"),
        "failed to send response to %s (timeout): %s",
        this->get_service_name(), rcl_get_error_string().str);
      rcl_reset_error();
      return;
    }
    if (ret != RCL_RET_OK) {
      rclcpp::exceptions::throw_from_rcl_error(ret, "failed to send response");
    }
  }


  void
  configure_introspection(
    Clock::SharedPtr clock, const QoS & qos_service_event_pub,
    rcl_service_introspection_state_t introspection_state)
  {
    rcl_publisher_options_t pub_opts = rcl_publisher_get_default_options();
    pub_opts.qos = qos_service_event_pub.get_rmw_qos_profile();

    rcl_ret_t ret = rcl_service_configure_service_introspection(
      service_handle_.get(),
      node_handle_.get(),
      clock->get_clock_handle(),
      srv_type_support_handle_,
      pub_opts,
      introspection_state);

    if (RCL_RET_OK != ret) {
      rclcpp::exceptions::throw_from_rcl_error(ret, "failed to configure service introspection");
    }
  }

private:
  RCLCPP_DISABLE_COPY(Service)

  AnyServiceCallback<ServiceT> any_callback_;

  const rosidl_service_type_support_t * srv_type_support_handle_;
};

}  // namespace rclcpp

#endif  // RCLCPP__SERVICE_HPP_