Class Ros2TopicDataProvider

Nested Relationships

Nested Types

Inheritance Relationships

Base Type

Class Documentation

class Ros2TopicDataProvider : public ros2_medkit_gateway::TopicDataProvider

ROS 2 default implementation of TopicDataProvider.

Provides topic discovery (via direct graph queries on the subscription node) and topic sampling via a pool of long-lived subscriptions managed by Ros2SubscriptionExecutor. Each pool entry keeps the always-newest message, so concurrent samplers on the same topic share the latest without creating additional subscriptions.

Race fix (issue #375)

Subscription creation / destruction is serialized by the executor. The pool amortizes cost across samples so hot topics reuse the same subscription.

Thread safety

All methods may be called concurrently. Pool state is protected by pool_mtx_; per-entry buffers by PoolEntry::buf_mtx.

Eviction

  • Graph change: registered callback walks the pool and drops entries whose topic has disappeared from the graph.

  • Pool cap: on miss, if pool is at max_pool_size, evicts the least-recently-sampled entry and installs the new one (LRU).

  • Shutdown: destructor signals every per-entry CV, resets all slots.

Public Functions

Ros2TopicDataProvider(std::shared_ptr<ros2_common::Ros2SubscriptionExecutor> exec, std::shared_ptr<ros2_medkit_serialization::JsonSerializer> serializer, Config cfg = Config())
~Ros2TopicDataProvider() override
virtual tl::expected<TopicSampleResult, ErrorInfo> sample(const std::string &topic, std::chrono::milliseconds timeout) override

Sample a single topic, returning either fresh data or metadata-only.

If the topic has no publishers, returns metadata-only immediately (has_data == false). Otherwise, waits up to timeout for a message.

virtual tl::expected<std::vector<TopicSampleResult>, ErrorInfo> sample_parallel(const std::vector<std::string> &topics, std::chrono::milliseconds timeout) override

Sample multiple topics concurrently, returning one result per input topic.

Order of the returned vector matches the input order. Idle topics return immediately with metadata only; active topics wait up to timeout.

Error semantics (partial-success)

Per-topic recoverable errors (cold-wait cap, subscribe-failed, …) are embedded into the corresponding TopicSampleResult (error_code, error_message, error_http_status) so that one bad topic does not fail an entire bulk read. The top-level tl::unexpected is reserved for batch-fatal conditions only, currently ERR_X_MEDKIT_GATEWAY_SHUTDOWN.

virtual std::optional<TopicInfo> get_topic_info(const std::string &topic) override
virtual bool has_publishers(const std::string &topic) override
virtual std::vector<TopicInfo> discover(const std::string &namespace_prefix) override
virtual std::vector<TopicInfo> discover_all() override
virtual std::map<std::string, ComponentTopics> build_component_topic_map() override
virtual ComponentTopics get_component_topics(const std::string &component_fqn) override
virtual TopicDiscoveryResult discover_topics_by_namespace() override
virtual std::set<std::string> discover_topic_namespaces() override
virtual ComponentTopics get_topics_for_namespace(const std::string &ns_prefix) override
virtual std::vector<TopicEndpoint> get_topic_publishers(const std::string &topic) override
virtual std::vector<TopicEndpoint> get_topic_subscribers(const std::string &topic) override
virtual TopicConnection get_topic_connection(const std::string &topic) override
PoolStats stats() const
virtual nlohmann::json x_medkit_stats() const override

Serialize implementation-specific stats for /health exposure.

Default: empty object. Subclasses that carry runtime state (pool, queue, watchdog) populate this with vendor-extension keys prefixed x-medkit-*. Values must be atomic reads only; /health must not block on this call.

void sweep_idle_entries()

Walk the pool and evict entries idle longer than cfg.idle_safety_net.

Normally driven by an internal wall timer on the subscription node. Exposed publicly so tests can trigger a deterministic sweep without relying on timer cadence.

Public Static Functions

static bool is_system_topic(const std::string &topic_name)

Identify whether a topic is a ROS 2 system/infrastructure topic (filter target).

struct Config

Public Functions

inline Config()

Public Members

std::size_t max_pool_size
std::chrono::milliseconds idle_safety_net
std::chrono::milliseconds idle_sweep_tick
std::size_t cold_wait_cap
std::size_t max_parallel_samples

Upper bound on concurrent sampler threads launched by sample_parallel(). Prevents the provider from becoming a self-DoS vector under large topic lists: excess topics are processed in follow-up chunks instead of spawning one std::async per topic.

struct PoolStats

Public Members

std::size_t pool_size
std::size_t pool_cap
std::size_t pool_hits
std::size_t pool_misses
std::size_t evictions_total
std::size_t type_change_events
std::size_t graph_events_received
std::size_t concurrent_cold_waits
std::size_t cold_wait_cap