Class AggregationManager

Nested Relationships

Nested Types

Class Documentation

class AggregationManager

Coordinator for peer aggregation.

Manages PeerClients, health monitoring, routing table, entity merging, and fan-out logic. Thread-safe: shared_mutex protects peers_ and routing_table_. Readers use shared lock, writers use exclusive lock.

Public Functions

explicit AggregationManager(const AggregationConfig &config, rclcpp::Logger *logger = nullptr)

Construct an AggregationManager from config.

Creates a PeerClient for each statically configured peer. Validates TLS requirements and logs warnings for cleartext peer URLs.

Parameters:
  • config – Aggregation configuration

  • logger – Optional logger for TLS warnings (pass nullptr to suppress)

size_t peer_count() const

Get the number of known peers (static + discovered)

void add_discovered_peer(const std::string &url, const std::string &name)

Add a dynamically discovered peer.

Thread-safe. If a peer with the given name already exists, this is a no-op.

Parameters:
  • url – Base URL of the peer

  • name – Human-readable peer name

void remove_discovered_peer(const std::string &name)

Remove a dynamically discovered peer by name.

Thread-safe. If the peer is not found, this is a no-op.

Parameters:

name – Peer name to remove

void check_all_health()

Check health of all peers.

Calls check_health() on each PeerClient.

size_t healthy_peer_count() const

Get count of currently healthy peers.

Returns:

Number of peers that report healthy status

PeerEntities fetch_all_peer_entities()

Fetch entities from all healthy peers and merge them.

Returns:

Merged PeerEntities from all reachable peers

MergedPeerResult fetch_and_merge_peer_entities(const std::vector<Area> &local_areas, const std::vector<Component> &local_components, const std::vector<App> &local_apps, const std::vector<Function> &local_functions, size_t max_entities_per_peer = 10000, rclcpp::Logger *logger = nullptr)

Fetch entities from all healthy peers, merge with local entities, and build routing table.

Snapshots peer shared_ptrs under lock, releases before network I/O. The shared_ptr copies keep PeerClients alive even if remove_discovered_peer() runs concurrently. Uses EntityMerger per-peer so that collision-prefixed IDs are correctly tracked in the routing table.

Parameters:
  • local_areas – Local areas to merge with

  • local_components – Local components to merge with

  • local_apps – Local apps to merge with

  • local_functions – Local functions to merge with

  • max_entities_per_peer – Maximum total entities accepted from a single peer

  • logger – Optional logger for warnings (pass nullptr to suppress)

Returns:

MergedPeerResult with merged entity vectors and routing table

void update_routing_table(const std::unordered_map<std::string, std::string> &table)

Update the routing table (entity_id -> peer_name)

Parameters:

table – New routing table to replace the current one

std::optional<std::string> find_peer_for_entity(const std::string &entity_id) const

Look up which peer owns a given entity.

Parameters:

entity_id – Entity ID to look up

Returns:

Peer name if entity is remote, std::nullopt if local or unknown

std::string get_peer_url(const std::string &peer_name) const

Get the URL for a known peer by name.

Parameters:

peer_name – Name of the peer

Returns:

URL if found, empty string otherwise

void forward_request(const std::string &peer_name, const httplib::Request &req, httplib::Response &res)

Forward an HTTP request to a specific peer.

Finds the PeerClient by name and calls forward_request(). If the peer is not found, sends a 502 error response.

Parameters:
  • peer_name – Name of the target peer

  • req – Incoming HTTP request to forward

  • res – Outgoing HTTP response to populate

FanOutResult fan_out_get(const std::string &path, const std::string &auth_header)

Fan-out a GET request to all healthy peers in parallel.

Sends GET requests to all healthy peers concurrently via std::async, merges the “items” arrays from their responses. Returns partial results if some peers fail.

Parameters:
  • path – Request path (e.g., “/api/v1/components”)

  • auth_header – Authorization header value (empty to omit)

Returns:

FanOutResult with merged items and failure info

nlohmann::json get_peer_status() const

Get peer status for /health endpoint.

Returns:

JSON array of peer objects with name, url, status

struct FanOutResult

Result of a fan-out GET across all healthy peers.

Public Members

nlohmann::json merged_items

Merged “items” array from all peers.

bool is_partial = {false}

True if some peers failed.

std::vector<std::string> failed_peers

Names of peers that failed.

struct MergedPeerResult

Result of fetching and merging entities from all healthy peers.

Contains merged entity vectors and a routing table mapping remote entity IDs to the peer name that owns them.

Public Members

std::vector<Area> areas
std::vector<Component> components
std::vector<App> apps
std::vector<Function> functions
std::unordered_map<std::string, std::string> routing_table