.. _program_listing_file_include_rclcpp_client.hpp: Program Listing for File client.hpp =================================== |exhale_lsh| :ref:`Return to documentation for file ` (``include/rclcpp/client.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp // Copyright 2014 Open Source Robotics Foundation, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #ifndef RCLCPP__CLIENT_HPP_ #define RCLCPP__CLIENT_HPP_ #include #include #include #include #include #include // NOLINT, cpplint doesn't think this is a cpp std header #include #include #include #include #include // NOLINT #include #include "rcl/client.h" #include "rcl/error_handling.h" #include "rcl/event_callback.h" #include "rcl/wait.h" #include "rclcpp/detail/cpp_callback_trampoline.hpp" #include "rclcpp/exceptions.hpp" #include "rclcpp/expand_topic_or_service_name.hpp" #include "rclcpp/function_traits.hpp" #include "rclcpp/logging.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/node_interfaces/node_graph_interface.hpp" #include "rclcpp/qos.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/utilities.hpp" #include "rclcpp/visibility_control.hpp" #include "rmw/error_handling.h" #include "rmw/impl/cpp/demangle.hpp" #include "rmw/rmw.h" namespace rclcpp { namespace detail { template struct FutureAndRequestId { FutureT future; int64_t request_id; FutureAndRequestId(FutureT impl, int64_t req_id) : future(std::move(impl)), request_id(req_id) {} operator FutureT &() {return this->future;} [[deprecated("FutureAndRequestId: use .future instead of an implicit conversion")]] operator FutureT() {return this->future;} // delegate future like methods in the std::future impl_ auto get() {return this->future.get();} bool valid() const noexcept {return this->future.valid();} void wait() const {return this->future.wait();} template std::future_status wait_for( const std::chrono::duration & timeout_duration) const { return this->future.wait_for(timeout_duration); } template std::future_status wait_until( const std::chrono::time_point & timeout_time) const { return this->future.wait_until(timeout_time); } // Rule of five, we could use the rule of zero here, but better be explicit as some of the // methods are deleted. FutureAndRequestId(FutureAndRequestId && other) noexcept = default; FutureAndRequestId(const FutureAndRequestId & other) = delete; FutureAndRequestId & operator=(FutureAndRequestId && other) noexcept = default; FutureAndRequestId & operator=(const FutureAndRequestId & other) = delete; ~FutureAndRequestId() = default; }; } // namespace detail namespace node_interfaces { class NodeBaseInterface; } // namespace node_interfaces class ClientBase { public: RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(ClientBase) RCLCPP_PUBLIC ClientBase( rclcpp::node_interfaces::NodeBaseInterface * node_base, rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph); RCLCPP_PUBLIC virtual ~ClientBase(); RCLCPP_PUBLIC bool take_type_erased_response(void * response_out, rmw_request_id_t & request_header_out); RCLCPP_PUBLIC const char * get_service_name() const; RCLCPP_PUBLIC std::shared_ptr get_client_handle(); RCLCPP_PUBLIC std::shared_ptr get_client_handle() const; RCLCPP_PUBLIC bool service_is_ready() const; template bool wait_for_service( std::chrono::duration timeout = std::chrono::duration(-1)) { return wait_for_service_nanoseconds( std::chrono::duration_cast(timeout) ); } virtual std::shared_ptr create_response() = 0; virtual std::shared_ptr create_request_header() = 0; virtual void handle_response( std::shared_ptr request_header, std::shared_ptr response) = 0; RCLCPP_PUBLIC bool exchange_in_use_by_wait_set_state(bool in_use_state); RCLCPP_PUBLIC rclcpp::QoS get_request_publisher_actual_qos() const; RCLCPP_PUBLIC rclcpp::QoS get_response_subscription_actual_qos() const; void set_on_new_response_callback(std::function callback) { if (!callback) { throw std::invalid_argument( "The callback passed to set_on_new_response_callback " "is not callable."); } auto new_callback = [callback, this](size_t number_of_responses) { try { callback(number_of_responses); } catch (const std::exception & exception) { RCLCPP_ERROR_STREAM( node_logger_, "rclcpp::ClientBase@" << this << " caught " << rmw::impl::cpp::demangle(exception) << " exception in user-provided callback for the 'on new response' callback: " << exception.what()); } catch (...) { RCLCPP_ERROR_STREAM( node_logger_, "rclcpp::ClientBase@" << this << " caught unhandled exception in user-provided callback " << "for the 'on new response' callback"); } }; std::lock_guard lock(callback_mutex_); // Set it temporarily to the new callback, while we replace the old one. // This two-step setting, prevents a gap where the old std::function has // been replaced but the middleware hasn't been told about the new one yet. set_on_new_response_callback( rclcpp::detail::cpp_callback_trampoline, static_cast(&new_callback)); // Store the std::function to keep it in scope, also overwrites the existing one. on_new_response_callback_ = new_callback; // Set it again, now using the permanent storage. set_on_new_response_callback( rclcpp::detail::cpp_callback_trampoline, static_cast(&on_new_response_callback_)); } void clear_on_new_response_callback() { std::lock_guard lock(callback_mutex_); if (on_new_response_callback_) { set_on_new_response_callback(nullptr, nullptr); on_new_response_callback_ = nullptr; } } protected: RCLCPP_DISABLE_COPY(ClientBase) RCLCPP_PUBLIC bool wait_for_service_nanoseconds(std::chrono::nanoseconds timeout); RCLCPP_PUBLIC rcl_node_t * get_rcl_node_handle(); RCLCPP_PUBLIC const rcl_node_t * get_rcl_node_handle() const; RCLCPP_PUBLIC void set_on_new_response_callback(rcl_event_callback_t callback, const void * user_data); rclcpp::node_interfaces::NodeGraphInterface::WeakPtr node_graph_; std::shared_ptr node_handle_; std::shared_ptr context_; rclcpp::Logger node_logger_; std::shared_ptr client_handle_; std::atomic in_use_by_wait_set_{false}; std::recursive_mutex callback_mutex_; std::function on_new_response_callback_{nullptr}; }; template class Client : public ClientBase { public: using Request = typename ServiceT::Request; using Response = typename ServiceT::Response; using SharedRequest = typename ServiceT::Request::SharedPtr; using SharedResponse = typename ServiceT::Response::SharedPtr; using Promise = std::promise; using PromiseWithRequest = std::promise>; using SharedPromise = std::shared_ptr; using SharedPromiseWithRequest = std::shared_ptr; using Future = std::future; using SharedFuture = std::shared_future; using SharedFutureWithRequest = std::shared_future>; using CallbackType = std::function; using CallbackWithRequestType = std::function; RCLCPP_SMART_PTR_DEFINITIONS(Client) struct FutureAndRequestId : detail::FutureAndRequestId> { using detail::FutureAndRequestId>::FutureAndRequestId; [[deprecated( "FutureAndRequestId: use .future.share() instead of an implicit conversion")]] operator SharedFuture() {return this->future.share();} // delegate future like methods in the std::future impl_ SharedFuture share() noexcept {return this->future.share();} }; struct SharedFutureAndRequestId : detail::FutureAndRequestId> { using detail::FutureAndRequestId>::FutureAndRequestId; }; struct SharedFutureWithRequestAndRequestId : detail::FutureAndRequestId>> { using detail::FutureAndRequestId< std::shared_future> >::FutureAndRequestId; }; Client( rclcpp::node_interfaces::NodeBaseInterface * node_base, rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph, const std::string & service_name, rcl_client_options_t & client_options) : ClientBase(node_base, node_graph) { using rosidl_typesupport_cpp::get_service_type_support_handle; auto service_type_support_handle = get_service_type_support_handle(); rcl_ret_t ret = rcl_client_init( this->get_client_handle().get(), this->get_rcl_node_handle(), service_type_support_handle, service_name.c_str(), &client_options); if (ret != RCL_RET_OK) { if (ret == RCL_RET_SERVICE_NAME_INVALID) { auto rcl_node_handle = this->get_rcl_node_handle(); // this will throw on any validation problem rcl_reset_error(); expand_topic_or_service_name( service_name, rcl_node_get_name(rcl_node_handle), rcl_node_get_namespace(rcl_node_handle), true); } rclcpp::exceptions::throw_from_rcl_error(ret, "could not create client"); } } virtual ~Client() { } bool take_response(typename ServiceT::Response & response_out, rmw_request_id_t & request_header_out) { return this->take_type_erased_response(&response_out, request_header_out); } std::shared_ptr create_response() override { return std::shared_ptr(new typename ServiceT::Response()); } std::shared_ptr create_request_header() override { // TODO(wjwwood): This should probably use rmw_request_id's allocator. // (since it is a C type) return std::shared_ptr(new rmw_request_id_t); } void handle_response( std::shared_ptr request_header, std::shared_ptr response) override { std::optional optional_pending_request = this->get_and_erase_pending_request(request_header->sequence_number); if (!optional_pending_request) { return; } auto & value = *optional_pending_request; auto typed_response = std::static_pointer_cast( std::move(response)); if (std::holds_alternative(value)) { auto & promise = std::get(value); promise.set_value(std::move(typed_response)); } else if (std::holds_alternative(value)) { auto & inner = std::get(value); const auto & callback = std::get(inner); auto & promise = std::get(inner); auto & future = std::get(inner); promise.set_value(std::move(typed_response)); callback(std::move(future)); } else if (std::holds_alternative(value)) { auto & inner = std::get(value); const auto & callback = std::get(inner); auto & promise = std::get(inner); auto & future = std::get(inner); auto & request = std::get(inner); promise.set_value(std::make_pair(std::move(request), std::move(typed_response))); callback(std::move(future)); } } FutureAndRequestId async_send_request(SharedRequest request) { Promise promise; auto future = promise.get_future(); auto req_id = async_send_request_impl( *request, std::move(promise)); return FutureAndRequestId(std::move(future), req_id); } template< typename CallbackT, typename std::enable_if< rclcpp::function_traits::same_arguments< CallbackT, CallbackType >::value >::type * = nullptr > SharedFutureAndRequestId async_send_request(SharedRequest request, CallbackT && cb) { Promise promise; auto shared_future = promise.get_future().share(); auto req_id = async_send_request_impl( *request, std::make_tuple( CallbackType{std::forward(cb)}, shared_future, std::move(promise))); return SharedFutureAndRequestId{std::move(shared_future), req_id}; } template< typename CallbackT, typename std::enable_if< rclcpp::function_traits::same_arguments< CallbackT, CallbackWithRequestType >::value >::type * = nullptr > SharedFutureWithRequestAndRequestId async_send_request(SharedRequest request, CallbackT && cb) { PromiseWithRequest promise; auto shared_future = promise.get_future().share(); auto req_id = async_send_request_impl( *request, std::make_tuple( CallbackWithRequestType{std::forward(cb)}, request, shared_future, std::move(promise))); return SharedFutureWithRequestAndRequestId{std::move(shared_future), req_id}; } bool remove_pending_request(int64_t request_id) { std::lock_guard guard(pending_requests_mutex_); return pending_requests_.erase(request_id) != 0u; } bool remove_pending_request(const FutureAndRequestId & future) { return this->remove_pending_request(future.request_id); } bool remove_pending_request(const SharedFutureAndRequestId & future) { return this->remove_pending_request(future.request_id); } bool remove_pending_request(const SharedFutureWithRequestAndRequestId & future) { return this->remove_pending_request(future.request_id); } size_t prune_pending_requests() { std::lock_guard guard(pending_requests_mutex_); auto ret = pending_requests_.size(); pending_requests_.clear(); return ret; } template> size_t prune_requests_older_than( std::chrono::time_point time_point, std::vector * pruned_requests = nullptr) { std::lock_guard guard(pending_requests_mutex_); auto old_size = pending_requests_.size(); for (auto it = pending_requests_.begin(), last = pending_requests_.end(); it != last; ) { if (it->second.first < time_point) { if (pruned_requests) { pruned_requests->push_back(it->first); } it = pending_requests_.erase(it); } else { ++it; } } return old_size - pending_requests_.size(); } protected: using CallbackTypeValueVariant = std::tuple; using CallbackWithRequestTypeValueVariant = std::tuple< CallbackWithRequestType, SharedRequest, SharedFutureWithRequest, PromiseWithRequest>; using CallbackInfoVariant = std::variant< std::promise, CallbackTypeValueVariant, CallbackWithRequestTypeValueVariant>; int64_t async_send_request_impl(const Request & request, CallbackInfoVariant value) { int64_t sequence_number; std::lock_guard lock(pending_requests_mutex_); rcl_ret_t ret = rcl_send_request(get_client_handle().get(), &request, &sequence_number); if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error(ret, "failed to send request"); } pending_requests_.try_emplace( sequence_number, std::make_pair(std::chrono::system_clock::now(), std::move(value))); return sequence_number; } std::optional get_and_erase_pending_request(int64_t request_number) { std::unique_lock lock(pending_requests_mutex_); auto it = this->pending_requests_.find(request_number); if (it == this->pending_requests_.end()) { RCUTILS_LOG_DEBUG_NAMED( "rclcpp", "Received invalid sequence number. Ignoring..."); return std::nullopt; } std::optional value = std::move(it->second.second); this->pending_requests_.erase(request_number); return value; } RCLCPP_DISABLE_COPY(Client) std::unordered_map< int64_t, std::pair< std::chrono::time_point, CallbackInfoVariant>> pending_requests_; std::mutex pending_requests_mutex_; }; } // namespace rclcpp #endif // RCLCPP__CLIENT_HPP_