rosbridge_library.util.ros module

rosbridge_library.util.ros.is_topic_published(node: Node, topic_name: str) bool

Check if a node has at least one publisher on the given topic.

Reads node.publishers (the local entity list) rather than the rmw graph cache. The local list is populated synchronously inside create_publisher and cleared inside destroy_publisher, so the result is deterministic with respect to the calling thread — no DDS-discovery round-trip is involved.

rosbridge_library.util.ros.is_topic_subscribed(node: Node, topic_name: str) bool

Check if a node has at least one subscription on the given topic.

Reads node.subscriptions (the local entity list) rather than the rmw graph cache; see is_topic_published for why this matters.

rosbridge_library.util.ros.wait_for_executor_idle(executor: Executor, timeout: float = 5.0) None

Block until all tasks already queued on executor have been processed.

Used by tests that schedule work on the executor and then need to assert on the post-condition from a different thread. Submits a no-op task and waits for it to run: tasks are FIFO on SingleThreadedExecutor, so when the no-op completes every task enqueued before it has also completed.

Raises TimeoutError if the executor does not drain within timeout seconds, which usually means it is not being spun.