Program Listing for File action_server.hpp
↰ Return to documentation for file (include/psdk_wrapper/utils/action_server.hpp
)
// Copyright (c) 2019 Intel Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef PSDK_WRAPPER_INCLUDE_PSDK_WRAPPER_UTILS_ACTION_SERVER_HPP_
#define PSDK_WRAPPER_INCLUDE_PSDK_WRAPPER_UTILS_ACTION_SERVER_HPP_
#include <chrono> //NOLINT
#include <future> //NOLINT
#include <memory>
#include <mutex> //NOLINT
#include <string>
#include <thread> //NOLINT
#include "psdk_wrapper/utils/node_thread.hpp"
#include "rclcpp/rclcpp.hpp"
#include "rclcpp_action/rclcpp_action.hpp"
namespace utils
{
template <typename ActionT>
class ActionServer
{
public:
// Callback function to complete main work. This should itself deal with its
// own exceptions, but if for some reason one is thrown, it will be caught
// in ActionServer and terminate the action itself.
typedef std::function<void()> ExecuteCallback;
// Callback function to notify the user that an exception was thrown that
// the simple action server caught (or another failure) and the action was
// terminated. To avoid using, catch exceptions in your application such that
// the ActionServer will never need to terminate based on failed action
// ExecuteCallback.
typedef std::function<void()> CompletionCallback;
template <typename NodeT>
explicit ActionServer(
NodeT node, const std::string& action_name,
ExecuteCallback execute_callback,
CompletionCallback completion_callback = nullptr,
std::chrono::milliseconds server_timeout = std::chrono::milliseconds(500),
bool spin_thread = false,
const rcl_action_server_options_t& options =
rcl_action_server_get_default_options())
: ActionServer(
node->get_node_base_interface(), node->get_node_clock_interface(),
node->get_node_logging_interface(),
node->get_node_waitables_interface(), action_name, execute_callback,
completion_callback, server_timeout, spin_thread, options)
{
}
explicit ActionServer(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base_interface,
rclcpp::node_interfaces::NodeClockInterface::SharedPtr
node_clock_interface,
rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr
node_logging_interface,
rclcpp::node_interfaces::NodeWaitablesInterface::SharedPtr
node_waitables_interface,
const std::string& action_name, ExecuteCallback execute_callback,
CompletionCallback completion_callback = nullptr,
std::chrono::milliseconds server_timeout = std::chrono::milliseconds(500),
bool spin_thread = false,
const rcl_action_server_options_t& options =
rcl_action_server_get_default_options())
: node_base_interface_(node_base_interface),
node_clock_interface_(node_clock_interface),
node_logging_interface_(node_logging_interface),
node_waitables_interface_(node_waitables_interface),
action_name_(action_name),
execute_callback_(execute_callback),
completion_callback_(completion_callback),
server_timeout_(server_timeout),
spin_thread_(spin_thread)
{
using namespace std::placeholders; // NOLINT
if (spin_thread_)
{
callback_group_ = node_base_interface->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive, false);
}
action_server_ = rclcpp_action::create_server<ActionT>(
node_base_interface_, node_clock_interface_, node_logging_interface_,
node_waitables_interface_, action_name_,
std::bind(&ActionServer::handle_goal, this, _1, _2),
std::bind(&ActionServer::handle_cancel, this, _1),
std::bind(&ActionServer::handle_accepted, this, _1), options,
callback_group_);
if (spin_thread_)
{
executor_ = std::make_shared<rclcpp::executors::SingleThreadedExecutor>();
executor_->add_callback_group(callback_group_, node_base_interface_);
executor_thread_ = std::make_unique<utils::NodeThread>(executor_);
}
}
rclcpp_action::GoalResponse
handle_goal(const rclcpp_action::GoalUUID& /*uuid*/,
std::shared_ptr<const typename ActionT::Goal> /*goal*/)
{
std::lock_guard<std::recursive_mutex> lock(update_mutex_);
if (!server_active_)
{
return rclcpp_action::GoalResponse::REJECT;
}
debug_msg("Received request for goal acceptance");
return rclcpp_action::GoalResponse::ACCEPT_AND_EXECUTE;
}
rclcpp_action::CancelResponse
handle_cancel(
const std::shared_ptr<rclcpp_action::ServerGoalHandle<ActionT>> handle)
{
std::lock_guard<std::recursive_mutex> lock(update_mutex_);
if (!handle->is_active())
{
warn_msg(
"Received request for goal cancellation,"
"but the handle is inactive, so reject the request");
return rclcpp_action::CancelResponse::REJECT;
}
debug_msg("Received request for goal cancellation");
return rclcpp_action::CancelResponse::ACCEPT;
}
void
handle_accepted(
const std::shared_ptr<rclcpp_action::ServerGoalHandle<ActionT>> handle)
{
std::lock_guard<std::recursive_mutex> lock(update_mutex_);
debug_msg("Receiving a new goal");
if (is_active(current_handle_) || is_running())
{
debug_msg(
"An older goal is active, moving the new goal to a pending slot.");
if (is_active(pending_handle_))
{
debug_msg(
"The pending slot is occupied."
" The previous pending goal will be terminated and replaced.");
terminate(pending_handle_);
}
pending_handle_ = handle;
preempt_requested_ = true;
}
else
{
if (is_active(pending_handle_))
{
// Shouldn't reach a state with a pending goal but no current one.
error_msg(
"Forgot to handle a preemption. Terminating the pending goal.");
terminate(pending_handle_);
preempt_requested_ = false;
}
current_handle_ = handle;
// Return quickly to avoid blocking the executor, so spin up a new thread
debug_msg("Executing goal asynchronously.");
execution_future_ = std::async(std::launch::async, [this]() { work(); });
}
}
void
work()
{
while (rclcpp::ok() && !stop_execution_ && is_active(current_handle_))
{
debug_msg("Executing the goal...");
try
{
execute_callback_();
}
catch (std::exception& ex)
{
RCLCPP_ERROR(
node_logging_interface_->get_logger(),
"Action server failed while executing action callback: \"%s\"",
ex.what());
terminate_all();
completion_callback_();
return;
}
debug_msg("Blocking processing of new goal handles.");
std::lock_guard<std::recursive_mutex> lock(update_mutex_);
if (stop_execution_)
{
warn_msg("Stopping the thread per request.");
terminate_all();
completion_callback_();
break;
}
if (is_active(current_handle_))
{
warn_msg("Current goal was not completed successfully.");
terminate(current_handle_);
completion_callback_();
}
if (is_active(pending_handle_))
{
debug_msg("Executing a pending handle on the existing thread.");
accept_pending_goal();
}
else
{
debug_msg("Done processing available goals.");
break;
}
}
debug_msg("Worker thread done.");
}
void
activate()
{
std::lock_guard<std::recursive_mutex> lock(update_mutex_);
server_active_ = true;
stop_execution_ = false;
}
void
deactivate()
{
debug_msg("Deactivating...");
{
std::lock_guard<std::recursive_mutex> lock(update_mutex_);
server_active_ = false;
stop_execution_ = true;
}
if (!execution_future_.valid())
{
return;
}
if (is_running())
{
warn_msg(
"Requested to deactivate server but goal is still executing."
" Should check if action server is running before deactivating.");
}
using namespace std::chrono; // NOLINT
auto start_time = steady_clock::now();
while (execution_future_.wait_for(milliseconds(100)) !=
std::future_status::ready)
{
info_msg("Waiting for async process to finish.");
if (steady_clock::now() - start_time >= server_timeout_)
{
terminate_all();
completion_callback_();
throw std::runtime_error(
"Action callback is still running and missed deadline to stop");
}
}
debug_msg("Deactivation completed.");
}
bool
is_running()
{
return execution_future_.valid() &&
(execution_future_.wait_for(std::chrono::milliseconds(0)) ==
std::future_status::timeout);
}
bool
is_server_active()
{
std::lock_guard<std::recursive_mutex> lock(update_mutex_);
return server_active_;
}
bool
is_preempt_requested() const
{
std::lock_guard<std::recursive_mutex> lock(update_mutex_);
return preempt_requested_;
}
const std::shared_ptr<const typename ActionT::Goal>
accept_pending_goal()
{
std::lock_guard<std::recursive_mutex> lock(update_mutex_);
if (!pending_handle_ || !pending_handle_->is_active())
{
error_msg("Attempting to get pending goal when not available");
return std::shared_ptr<const typename ActionT::Goal>();
}
if (is_active(current_handle_) && current_handle_ != pending_handle_)
{
debug_msg("Cancelling the previous goal");
current_handle_->abort(empty_result());
}
current_handle_ = pending_handle_;
pending_handle_.reset();
preempt_requested_ = false;
debug_msg("Preempted goal");
return current_handle_->get_goal();
}
void
terminate_pending_goal()
{
std::lock_guard<std::recursive_mutex> lock(update_mutex_);
if (!pending_handle_ || !pending_handle_->is_active())
{
error_msg("Attempting to terminate pending goal when not available");
return;
}
terminate(pending_handle_);
preempt_requested_ = false;
debug_msg("Pending goal terminated");
}
const std::shared_ptr<const typename ActionT::Goal>
get_current_goal() const
{
std::lock_guard<std::recursive_mutex> lock(update_mutex_);
if (!is_active(current_handle_))
{
error_msg("A goal is not available or has reached a final state");
return std::shared_ptr<const typename ActionT::Goal>();
}
return current_handle_->get_goal();
}
const std::shared_ptr<const typename ActionT::Goal>
get_pending_goal() const
{
std::lock_guard<std::recursive_mutex> lock(update_mutex_);
if (!pending_handle_ || !pending_handle_->is_active())
{
error_msg("Attempting to get pending goal when not available");
return std::shared_ptr<const typename ActionT::Goal>();
}
return pending_handle_->get_goal();
}
bool
is_cancel_requested() const
{
std::lock_guard<std::recursive_mutex> lock(update_mutex_);
// A cancel request is assumed if either handle is canceled by the client.
if (current_handle_ == nullptr)
{
error_msg("Checking for cancel but current goal is not available");
return false;
}
if (pending_handle_ != nullptr)
{
return pending_handle_->is_canceling();
}
return current_handle_->is_canceling();
}
void
terminate_all(typename std::shared_ptr<typename ActionT::Result> result =
std::make_shared<typename ActionT::Result>())
{
std::lock_guard<std::recursive_mutex> lock(update_mutex_);
terminate(current_handle_, result);
terminate(pending_handle_, result);
preempt_requested_ = false;
}
void
terminate_current(typename std::shared_ptr<typename ActionT::Result> result =
std::make_shared<typename ActionT::Result>())
{
std::lock_guard<std::recursive_mutex> lock(update_mutex_);
terminate(current_handle_, result);
}
void
succeeded_current(typename std::shared_ptr<typename ActionT::Result> result =
std::make_shared<typename ActionT::Result>())
{
std::lock_guard<std::recursive_mutex> lock(update_mutex_);
if (is_active(current_handle_))
{
debug_msg("Setting succeed on current goal.");
current_handle_->succeed(result);
current_handle_.reset();
}
}
void
publish_feedback(
typename std::shared_ptr<typename ActionT::Feedback> feedback)
{
if (!is_active(current_handle_))
{
error_msg(
"Trying to publish feedback when the current goal handle is not "
"active");
return;
}
current_handle_->publish_feedback(feedback);
}
protected:
// The ActionServer isn't itself a node, so it needs interfaces to one
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base_interface_;
rclcpp::node_interfaces::NodeClockInterface::SharedPtr node_clock_interface_;
rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr
node_logging_interface_;
rclcpp::node_interfaces::NodeWaitablesInterface::SharedPtr
node_waitables_interface_;
std::string action_name_;
ExecuteCallback execute_callback_;
CompletionCallback completion_callback_;
std::future<void> execution_future_;
bool stop_execution_{false};
mutable std::recursive_mutex update_mutex_;
bool server_active_{false};
bool preempt_requested_{false};
std::chrono::milliseconds server_timeout_;
std::shared_ptr<rclcpp_action::ServerGoalHandle<ActionT>> current_handle_;
std::shared_ptr<rclcpp_action::ServerGoalHandle<ActionT>> pending_handle_;
typename rclcpp_action::Server<ActionT>::SharedPtr action_server_;
bool spin_thread_;
rclcpp::CallbackGroup::SharedPtr callback_group_{nullptr};
rclcpp::executors::SingleThreadedExecutor::SharedPtr executor_;
std::unique_ptr<utils::NodeThread> executor_thread_;
constexpr auto
empty_result() const
{
return std::make_shared<typename ActionT::Result>();
}
constexpr bool
is_active(const std::shared_ptr<rclcpp_action::ServerGoalHandle<ActionT>>
handle) const
{
return handle != nullptr && handle->is_active();
}
void
terminate(std::shared_ptr<rclcpp_action::ServerGoalHandle<ActionT>> handle,
typename std::shared_ptr<typename ActionT::Result> result =
std::make_shared<typename ActionT::Result>())
{
std::lock_guard<std::recursive_mutex> lock(update_mutex_);
if (is_active(handle))
{
if (handle->is_canceling())
{
warn_msg("Client requested to cancel the goal. Cancelling.");
handle->canceled(result);
}
else
{
warn_msg("Aborting handle.");
handle->abort(result);
}
handle.reset();
}
}
void
info_msg(const std::string& msg) const
{
RCLCPP_INFO(node_logging_interface_->get_logger(), "[%s] [ActionServer] %s",
action_name_.c_str(), msg.c_str());
}
void
debug_msg(const std::string& msg) const
{
RCLCPP_DEBUG(node_logging_interface_->get_logger(),
"[%s] [ActionServer] %s", action_name_.c_str(), msg.c_str());
}
void
error_msg(const std::string& msg) const
{
RCLCPP_ERROR(node_logging_interface_->get_logger(),
"[%s] [ActionServer] %s", action_name_.c_str(), msg.c_str());
}
void
warn_msg(const std::string& msg) const
{
RCLCPP_WARN(node_logging_interface_->get_logger(), "[%s] [ActionServer] %s",
action_name_.c_str(), msg.c_str());
}
};
} // namespace utils
#endif // PSDK_WRAPPER_INCLUDE_PSDK_WRAPPER_UTILS_ACTION_SERVER_HPP_