ros2agnocast.discovery module

Subscribe to /_agnocast_discovery; fall back to ioctl when no agent gossips.

Staleness is bounded by the publisher’s DDS Liveliness lease.

ros2agnocast.discovery.add_gossip_timeout_arg(parser) None

Add the hidden, transitional --gossip-timeout flag to a verb’s parser.

Suppressed from --help because the plan is to replace the fixed wait with a ros2-daemon-driven stop condition. Use warn_if_gossip_timeout_overridden() in main() to nudge any operator who passes it explicitly.

ros2agnocast.discovery.all_nodes(snapshots: list) set

Union of node names across all snapshots.

ros2agnocast.discovery.all_topic_names(snapshots: list) set

Union of topic names across all snapshots.

ros2agnocast.discovery.bridge_label_from_roles(ns_roles: list, ros2_has_pub: bool, ros2_has_sub: bool) str

Classify a topic’s bridge status from its per-NS roles.

Returns BRIDGE_ENABLED / BRIDGE_BRIDGED / BRIDGE_WARN. A bridge is expected only where data must actually cross a boundary: a real publisher needs an outbound bridge only if a consumer exists elsewhere (another NS or ROS 2), and a real subscriber needs an inbound bridge only if a producer exists elsewhere. So WARN fires only when such a bridge is expected but missing (e.g. a talker and listener in different NSes with no bridges); BRIDGED means every expected bridge is present; ENABLED means no bridging is expected (a lone endpoint, or producer/consumer in the same NS).

ns_roles comes from collect_bridge_roles() (empty if the topic has no real Agnocast endpoint). ros2_has_pub / ros2_has_sub are whether the topic has a ROS 2 (DDS) publisher / subscriber in the caller’s NS — the only NS whose DDS view the CLI can see.

ros2agnocast.discovery.collect_announcements(node, timeout_sec: float = 2.0) list

Collect one latest snapshot per (host_uuid, ipc_ns_inode) from gossip.

ros2agnocast.discovery.collect_announcements_with_fallback(node, timeout_sec: float = 2.0) tuple

Gossip first; ioctl fallback. Returns (snapshots, used_fallback).

ros2agnocast.discovery.collect_bridge_roles(snapshots: list) dict

Map topic_name -> per-NS bridge roles in a single pass over snapshots.

Each value is a list with one entry per NS that has a real (non-bridge) endpoint on that topic: (real_pub, real_sub, inbound_bridge, outbound_bridge). A bridge endpoint is directional: an inbound bridge (ROS 2 -> Agnocast) is a bridge publisher; an outbound bridge (Agnocast -> ROS 2) is a bridge subscriber.

Building this once lets the verbs label every topic with an O(1) lookup instead of re-scanning the snapshots per topic.

ros2agnocast.discovery.gossip_qos() rclpy.qos.QoSProfile

Profile for the gossip subscription.

Must match ros2agnocast_discovery_agent.agent._gossip_qos on every field except depth (per-endpoint) so DDS accepts the match.

ros2agnocast.discovery.topic_endpoints(snapshots: list, topic_name: str) tuple

Return (publishers, subscribers) for topic_name across all snapshots.

ros2agnocast.discovery.topics_of_node(snapshots: list, node_name: str) tuple

Return (pub topics, sub topics) for node_name as {topic_name, type_name} dicts.

ros2agnocast.discovery.warn_if_gossip_timeout_overridden(args) None

Emit a stderr WARN when --gossip-timeout is set to a non-default.

ros2agnocast.discovery.warn_if_using_fallback(node, used_fallback: bool, timeout_sec: float) None

Best-effort stderr note when gossip was unavailable and ioctl fallback ran.