Program Listing for File subscription_intra_process_base.hpp
↰ Return to documentation for file (include/rclcpp/experimental/subscription_intra_process_base.hpp
)
// Copyright 2019 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BASE_HPP_
#define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BASE_HPP_
#include <algorithm>
#include <memory>
#include <mutex>
#include <string>
#include "rcl/wait.h"
#include "rmw/impl/cpp/demangle.hpp"
#include "rclcpp/guard_condition.hpp"
#include "rclcpp/logging.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/waitable.hpp"
namespace rclcpp
{
namespace experimental
{
class SubscriptionIntraProcessBase : public rclcpp::Waitable
{
public:
RCLCPP_SMART_PTR_ALIASES_ONLY(SubscriptionIntraProcessBase)
enum class EntityType : std::size_t
{
Subscription,
};
RCLCPP_PUBLIC
SubscriptionIntraProcessBase(
rclcpp::Context::SharedPtr context,
const std::string & topic_name,
const rclcpp::QoS & qos_profile)
: gc_(context), topic_name_(topic_name), qos_profile_(qos_profile)
{}
RCLCPP_PUBLIC
virtual ~SubscriptionIntraProcessBase();
RCLCPP_PUBLIC
size_t
get_number_of_ready_guard_conditions() override {return 1;}
RCLCPP_PUBLIC
void
add_to_wait_set(rcl_wait_set_t * wait_set) override;
bool
is_ready(rcl_wait_set_t * wait_set) override = 0;
std::shared_ptr<void>
take_data() override = 0;
std::shared_ptr<void>
take_data_by_entity_id(size_t id) override
{
(void)id;
return take_data();
}
void
execute(std::shared_ptr<void> & data) override = 0;
virtual
bool
use_take_shared_method() const = 0;
RCLCPP_PUBLIC
const char *
get_topic_name() const;
RCLCPP_PUBLIC
QoS
get_actual_qos() const;
void
set_on_ready_callback(std::function<void(size_t, int)> callback) override
{
if (!callback) {
throw std::invalid_argument(
"The callback passed to set_on_ready_callback "
"is not callable.");
}
// Note: we bind the int identifier argument to this waitable's entity types
auto new_callback =
[callback, this](size_t number_of_events) {
try {
callback(number_of_events, static_cast<int>(EntityType::Subscription));
} catch (const std::exception & exception) {
RCLCPP_ERROR_STREAM(
// TODO(wjwwood): get this class access to the node logger it is associated with
rclcpp::get_logger("rclcpp"),
"rclcpp::SubscriptionIntraProcessBase@" << this <<
" caught " << rmw::impl::cpp::demangle(exception) <<
" exception in user-provided callback for the 'on ready' callback: " <<
exception.what());
} catch (...) {
RCLCPP_ERROR_STREAM(
rclcpp::get_logger("rclcpp"),
"rclcpp::SubscriptionIntraProcessBase@" << this <<
" caught unhandled exception in user-provided callback " <<
"for the 'on ready' callback");
}
};
std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
on_new_message_callback_ = new_callback;
if (unread_count_ > 0) {
if (qos_profile_.history() == HistoryPolicy::KeepAll) {
on_new_message_callback_(unread_count_);
} else {
// Use qos profile depth as upper bound for unread_count_
on_new_message_callback_(std::min(unread_count_, qos_profile_.depth()));
}
unread_count_ = 0;
}
}
void
clear_on_ready_callback() override
{
std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
on_new_message_callback_ = nullptr;
}
protected:
std::recursive_mutex callback_mutex_;
std::function<void(size_t)> on_new_message_callback_ {nullptr};
size_t unread_count_{0};
rclcpp::GuardCondition gc_;
virtual void
trigger_guard_condition() = 0;
void
invoke_on_new_message()
{
std::lock_guard<std::recursive_mutex> lock(this->callback_mutex_);
if (this->on_new_message_callback_) {
this->on_new_message_callback_(1);
} else {
this->unread_count_++;
}
}
private:
std::string topic_name_;
QoS qos_profile_;
};
} // namespace experimental
} // namespace rclcpp
#endif // RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BASE_HPP_