Class AggregationManager
Defined in File aggregation_manager.hpp
Nested Relationships
Nested Types
Class Documentation
-
class AggregationManager
Coordinator for peer aggregation.
Manages PeerClients, health monitoring, routing table, entity merging, and fan-out logic. Thread-safe: shared_mutex protects peers_ and routing_table_. Readers use shared lock, writers use exclusive lock.
Public Functions
-
explicit AggregationManager(const AggregationConfig &config, rclcpp::Logger *logger = nullptr)
Construct an AggregationManager from config.
Creates a PeerClient for each statically configured peer. Validates TLS requirements and logs warnings for cleartext peer URLs.
- Parameters:
config – Aggregation configuration
logger – Optional logger for TLS warnings (pass nullptr to suppress)
-
size_t peer_count() const
Get the number of known peers (static + discovered)
-
void add_discovered_peer(const std::string &url, const std::string &name)
Add a dynamically discovered peer.
Thread-safe. If a peer with the given name already exists, this is a no-op.
- Parameters:
url – Base URL of the peer
name – Human-readable peer name
-
void remove_discovered_peer(const std::string &name)
Remove a dynamically discovered peer by name.
Thread-safe. If the peer is not found, this is a no-op.
- Parameters:
name – Peer name to remove
-
void check_all_health()
Check health of all peers.
Calls check_health() on each PeerClient.
-
size_t healthy_peer_count() const
Get count of currently healthy peers.
- Returns:
Number of peers that report healthy status
-
PeerEntities fetch_all_peer_entities()
Fetch entities from all healthy peers and merge them.
- Returns:
Merged PeerEntities from all reachable peers
-
MergedPeerResult fetch_and_merge_peer_entities(const std::vector<Area> &local_areas, const std::vector<Component> &local_components, const std::vector<App> &local_apps, const std::vector<Function> &local_functions, size_t max_entities_per_peer = 10000, rclcpp::Logger *logger = nullptr)
Fetch entities from all healthy peers, merge with local entities, and build routing table.
Snapshots peer shared_ptrs under lock, releases before network I/O. The shared_ptr copies keep PeerClients alive even if remove_discovered_peer() runs concurrently. Uses EntityMerger per-peer so that collision-prefixed IDs are correctly tracked in the routing table.
- Parameters:
local_areas – Local areas to merge with
local_components – Local components to merge with
local_apps – Local apps to merge with
local_functions – Local functions to merge with
max_entities_per_peer – Maximum total entities accepted from a single peer
logger – Optional logger for warnings (pass nullptr to suppress)
- Returns:
MergedPeerResult with merged entity vectors and routing table
-
void update_routing_table(const std::unordered_map<std::string, std::string> &table)
Update the routing table (entity_id -> peer_name)
- Parameters:
table – New routing table to replace the current one
-
std::optional<std::string> find_peer_for_entity(const std::string &entity_id) const
Look up which peer owns a given entity.
- Parameters:
entity_id – Entity ID to look up
- Returns:
Peer name if entity is remote, std::nullopt if local or unknown
-
std::string get_peer_url(const std::string &peer_name) const
Get the URL for a known peer by name.
- Parameters:
peer_name – Name of the peer
- Returns:
URL if found, empty string otherwise
-
void forward_request(const std::string &peer_name, const httplib::Request &req, httplib::Response &res)
Forward an HTTP request to a specific peer.
Finds the PeerClient by name and calls forward_request(). If the peer is not found, sends a 502 error response.
- Parameters:
peer_name – Name of the target peer
req – Incoming HTTP request to forward
res – Outgoing HTTP response to populate
-
FanOutResult fan_out_get(const std::string &path, const std::string &auth_header)
Fan-out a GET request to all healthy peers in parallel.
Sends GET requests to all healthy peers concurrently via std::async, merges the “items” arrays from their responses. Returns partial results if some peers fail.
- Parameters:
path – Request path (e.g., “/api/v1/components”)
auth_header – Authorization header value (empty to omit)
- Returns:
FanOutResult with merged items and failure info
-
nlohmann::json get_peer_status() const
Get peer status for /health endpoint.
- Returns:
JSON array of peer objects with name, url, status
-
struct FanOutResult
Result of a fan-out GET across all healthy peers.
-
struct MergedPeerResult
Result of fetching and merging entities from all healthy peers.
Contains merged entity vectors and a routing table mapping remote entity IDs to the peer name that owns them.
-
explicit AggregationManager(const AggregationConfig &config, rclcpp::Logger *logger = nullptr)