Program Listing for File agnocast_epoll.hpp
↰ Return to documentation for file (include/agnocast/agnocast_epoll.hpp)
#pragma once
#include "agnocast/agnocast_callback_info.hpp"
#include "agnocast/agnocast_timer_info.hpp"
#include "sys/epoll.h"
#include <atomic>
#include <mutex>
#include <shared_mutex>
#include <vector>
namespace agnocast
{
struct AgnocastExecutable;
extern std::atomic<bool> need_epoll_updates;
constexpr uint32_t TIMER_EVENT_FLAG = 0x80000000;
constexpr uint32_t CLOCK_EVENT_FLAG = 0x40000000; // For clock_eventfd events (ROS_TIME timers)
constexpr uint32_t SHUTDOWN_EVENT_FLAG = 0x20000000; // For shutdown events (AgnocastOnlyExecutor)
constexpr uint32_t EPOLL_EVENT_ID_RESERVED_MASK =
TIMER_EVENT_FLAG | CLOCK_EVENT_FLAG | SHUTDOWN_EVENT_FLAG;
// @return true if shutdown event detected, false otherwise
bool wait_and_handle_epoll_event(
int epoll_fd, pid_t my_pid, int timeout_ms, std::mutex & ready_agnocast_executables_mutex,
std::vector<AgnocastExecutable> & ready_agnocast_executables);
template <class ValidateFn>
void prepare_epoll_impl(
const int epoll_fd, const pid_t my_pid, std::mutex & ready_agnocast_executables_mutex,
std::vector<AgnocastExecutable> & ready_agnocast_executables,
ValidateFn && validate_callback_group)
{
// Register subscription callbacks to epoll
{
std::lock_guard<std::mutex> lock(id2_callback_info_mtx);
for (auto & it : id2_callback_info) {
const uint32_t callback_info_id = it.first;
CallbackInfo & callback_info = it.second;
if (!callback_info.need_epoll_update) {
continue;
}
if (!validate_callback_group(callback_info.callback_group)) {
continue;
}
struct epoll_event ev = {};
ev.events = EPOLLIN;
ev.data.u32 = callback_info_id;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, callback_info.mqdes, &ev) == -1) {
RCLCPP_ERROR(logger, "epoll_ctl failed: %s", strerror(errno));
close(agnocast_fd);
exit(EXIT_FAILURE);
}
if (callback_info.is_transient_local) {
agnocast::enqueue_receive_and_execute(
callback_info_id, my_pid, callback_info, ready_agnocast_executables_mutex,
ready_agnocast_executables);
}
callback_info.need_epoll_update = false;
}
}
// Register timers to epoll
{
std::lock_guard<std::mutex> lock(id2_timer_info_mtx);
for (auto & it : id2_timer_info) {
const uint32_t timer_id = it.first;
TimerInfo & timer_info = *it.second;
if (!timer_info.need_epoll_update) {
continue;
}
if (!timer_info.timer.lock() || !validate_callback_group(timer_info.callback_group)) {
continue;
}
std::shared_lock fd_lock(timer_info.fd_mutex);
// Register clock_eventfd for ROS_TIME timers (simulation time support)
if (timer_info.clock_eventfd >= 0) {
struct epoll_event clock_ev = {};
clock_ev.events = EPOLLIN;
clock_ev.data.u32 = timer_id | CLOCK_EVENT_FLAG;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, timer_info.clock_eventfd, &clock_ev) == -1) {
RCLCPP_ERROR(logger, "epoll_ctl failed for clock_eventfd: %s", strerror(errno));
close(agnocast_fd);
exit(EXIT_FAILURE);
}
}
// Register timerfd (wall clock based firing)
if (timer_info.timer_fd >= 0) {
struct epoll_event ev = {};
ev.events = EPOLLIN;
ev.data.u32 = timer_id | TIMER_EVENT_FLAG;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, timer_info.timer_fd, &ev) == -1) {
RCLCPP_ERROR(logger, "epoll_ctl failed for timer: %s", strerror(errno));
close(agnocast_fd);
exit(EXIT_FAILURE);
}
}
timer_info.need_epoll_update = false;
}
}
// Check if all updates are done. Both locks must be held simultaneously to prevent
// a TOCTOU race: without this, a new subscription could set need_epoll_updates=true
// between the two checks (or between the last check and the store), and that update
// would be lost when need_epoll_updates is overwritten to false.
// Lock ordering: id2_callback_info_mtx before id2_timer_info_mtx (see declarations).
{
std::lock_guard<std::mutex> cb_lock(id2_callback_info_mtx);
std::lock_guard<std::mutex> timer_lock(id2_timer_info_mtx);
const bool all_callbacks_updated = std::none_of(
id2_callback_info.begin(), id2_callback_info.end(),
[](const auto & it) { return it.second.need_epoll_update; });
const bool all_timers_updated = std::none_of(
id2_timer_info.begin(), id2_timer_info.end(),
[](const auto & it) { return it.second->need_epoll_update; });
if (all_callbacks_updated && all_timers_updated) {
need_epoll_updates.store(false);
}
}
}
} // namespace agnocast