Class AggregationManager
Defined in File aggregation_manager.hpp
Nested Relationships
Nested Types
Class Documentation
-
class AggregationManager
Coordinator for peer aggregation.
Manages PeerClients, health monitoring, routing table, entity merging, and fan-out logic. Thread-safe: shared_mutex protects peers_ and routing_table_. Readers use shared lock, writers use exclusive lock.
Public Functions
-
explicit AggregationManager(const AggregationConfig &config, rclcpp::Logger *logger = nullptr)
Construct an AggregationManager from config.
Creates a PeerClient for each statically configured peer. Validates TLS requirements and logs warnings for cleartext peer URLs.
- Parameters:
config – Aggregation configuration
logger – Optional logger for TLS warnings (pass nullptr to suppress)
-
size_t peer_count() const
Get the number of known peers (static + discovered)
-
void add_discovered_peer(const std::string &url, const std::string &name)
Add a dynamically discovered peer.
Thread-safe. If a peer with the given name already exists, this is a no-op.
- Parameters:
url – Base URL of the peer
name – Human-readable peer name
-
void remove_discovered_peer(const std::string &name)
Remove a dynamically discovered peer by name.
Thread-safe. If the peer is not found, this is a no-op.
- Parameters:
name – Peer name to remove
-
void check_all_health()
Check health of all peers.
Calls check_health() on each PeerClient.
-
size_t healthy_peer_count() const
Get count of currently healthy peers.
- Returns:
Number of peers that report healthy status
-
PeerEntities fetch_all_peer_entities()
Fetch entities from all healthy peers and merge them.
- Returns:
Merged PeerEntities from all reachable peers
-
MergedPeerResult fetch_and_merge_peer_entities(const std::vector<Area> &local_areas, const std::vector<Component> &local_components, const std::vector<App> &local_apps, const std::vector<Function> &local_functions, size_t max_entities_per_peer = 10000, rclcpp::Logger *logger = nullptr)
Fetch entities from all healthy peers, merge with local entities, and build routing table.
Snapshots peer shared_ptrs under lock, releases before network I/O. The shared_ptr copies keep PeerClients alive even if remove_discovered_peer() runs concurrently. Uses EntityMerger per-peer so that collision-prefixed IDs are correctly tracked in the routing table.
- Parameters:
local_areas – Local areas to merge with
local_components – Local components to merge with
local_apps – Local apps to merge with
local_functions – Local functions to merge with
max_entities_per_peer – Maximum total entities accepted from a single peer
logger – Optional logger for warnings (pass nullptr to suppress)
- Returns:
MergedPeerResult with merged entity vectors and routing table
-
void update_routing_table(const std::unordered_map<std::string, std::string> &table)
Update the routing table (entity_id -> peer_name)
- Parameters:
table – New routing table to replace the current one
-
void set_leaf_warnings(std::vector<LeafCollisionWarning> warnings)
Publish the latest leaf-collision warnings (replaces previous list).
Warnings describe deployment-level anomalies where more than one peer announced the same leaf Component ID. Exposed on /health.warnings so operators can notice without tailing logs. Thread-safe.
-
std::vector<LeafCollisionWarning> get_leaf_warnings() const
Snapshot the current leaf-collision warnings.
- Returns:
Copy of the warning list (may be empty). Thread-safe.
-
std::optional<std::string> find_peer_for_entity(const std::string &entity_id) const
Look up which peer owns a given entity.
- Parameters:
entity_id – Entity ID to look up
- Returns:
Peer name if entity is remote, std::nullopt if local or unknown
-
void update_peer_contributors(std::unordered_map<std::string, std::vector<std::string>> contributors)
Replace the map of per-entity peer contributors.
Covers both routed leaf entities (entity fully hosted by a peer - also tracked via the routing table) and merged/hierarchical entities (served locally with contributions from one or more peers - e.g. a Component aggregating subcomponents from multiple gateways, or an Area/Function with an ID shared across peers).
Fan-out helpers consult this map to target per-entity collection requests only at the peers that actually host / contribute to a given entity, avoiding spurious 404s from non-contributing peers.
- Parameters:
contributors – New map: entity id -> list of peer names that contribute to it. Entries with empty lists are discarded.
-
bool has_peer_contributors(const std::string &entity_id) const
Check whether an entity id has at least one peer contributor.
Returns true when the entity is either routed to a peer (remote leaf) or served locally but aggregated from peers (hierarchical / merged). Returns false for local-only entities and for unknown IDs. Thread-safe.
-
std::vector<std::string> get_peer_contributors(const std::string &entity_id) const
List the peers that host / contribute to a given entity.
Combines information from the routing table (routed leaves) and the per-entity contributors map (merged / hierarchical entities). Duplicates are removed while preserving first-seen order. Returns an empty vector for local-only entities and unknown IDs. Thread-safe.
-
std::string get_peer_url(const std::string &peer_name) const
Get the URL for a known peer by name.
- Parameters:
peer_name – Name of the peer
- Returns:
URL if found, empty string otherwise
-
void forward_request(const std::string &peer_name, const httplib::Request &req, httplib::Response &res)
Forward an HTTP request to a specific peer.
Finds the PeerClient by name and calls forward_request(). If the peer is not found, sends a 502 error response.
- Parameters:
peer_name – Name of the target peer
req – Incoming HTTP request to forward
res – Outgoing HTTP response to populate
-
FanOutResult fan_out_get(const std::string &path, const std::string &auth_header, const std::vector<std::string> *target_peers = nullptr)
Fan-out a GET request to healthy peers in parallel.
Sends GET requests concurrently via std::async, merges the “items” arrays from their responses. Returns partial results if some peers fail.
- Parameters:
path – Request path (e.g., “/api/v1/components”)
auth_header – Authorization header value (empty to omit)
target_peers – When non-null, only peers whose name is in this list AND are currently healthy are queried. A non-null but empty list means “no matching peers” and produces an empty result with no fan-out. When null (default), all healthy peers are queried, preserving the pre-filtering behavior for global endpoints.
- Returns:
FanOutResult with merged items and failure info.
-
nlohmann::json get_peer_status() const
Get peer status for /health endpoint.
- Returns:
JSON array of peer objects with name, url, status
-
struct FanOutResult
Result of a fan-out GET across all healthy peers.
-
struct MergedPeerResult
Result of fetching and merging entities from all healthy peers.
Contains merged entity vectors and a routing table mapping remote entity IDs to the peer name that owns them.
Public Members
-
std::unordered_map<std::string, std::string> routing_table
-
std::vector<LeafCollisionWarning> leaf_warnings
Multi-peer leaf-Component collisions detected during merge. Surfaced to operators via /health.warnings; runtime falls back to last-writer for routing.
-
std::unordered_map<std::string, std::string> routing_table
-
explicit AggregationManager(const AggregationConfig &config, rclcpp::Logger *logger = nullptr)