Class Ros2TopicDataProvider
Defined in File ros2_topic_data_provider.hpp
Nested Relationships
Nested Types
Inheritance Relationships
Base Type
public ros2_medkit_gateway::TopicDataProvider(Class TopicDataProvider)
Class Documentation
-
class Ros2TopicDataProvider : public ros2_medkit_gateway::TopicDataProvider
ROS 2 default implementation of TopicDataProvider.
Provides topic discovery (via direct graph queries on the subscription node) and topic sampling via a pool of long-lived subscriptions managed by Ros2SubscriptionExecutor. Each pool entry keeps the always-newest message, so concurrent samplers on the same topic share the latest without creating additional subscriptions.
- Race fix (issue #375)
Subscription creation / destruction is serialized by the executor. The pool amortizes cost across samples so hot topics reuse the same subscription.
- Thread safety
All methods may be called concurrently. Pool state is protected by
pool_mtx_; per-entry buffers byPoolEntry::buf_mtx.- Eviction
Graph change: registered callback walks the pool and drops entries whose topic has disappeared from the graph.
Pool cap: on miss, if pool is at
max_pool_size, evicts the least-recently-sampled entry and installs the new one (LRU).Shutdown: destructor signals every per-entry CV, resets all slots.
Public Functions
-
~Ros2TopicDataProvider() override
-
virtual tl::expected<TopicSampleResult, ErrorInfo> sample(const std::string &topic, std::chrono::milliseconds timeout) override
Sample a single topic, returning either fresh data or metadata-only.
If the topic has no publishers, returns metadata-only immediately (
has_data == false). Otherwise, waits up totimeoutfor a message.
-
virtual tl::expected<std::vector<TopicSampleResult>, ErrorInfo> sample_parallel(const std::vector<std::string> &topics, std::chrono::milliseconds timeout) override
Sample multiple topics concurrently, returning one result per input topic.
Order of the returned vector matches the input order. Idle topics return immediately with metadata only; active topics wait up to
timeout.- Error semantics (partial-success)
Per-topic recoverable errors (cold-wait cap, subscribe-failed, …) are embedded into the corresponding
TopicSampleResult(error_code,error_message,error_http_status) so that one bad topic does not fail an entire bulk read. The top-leveltl::unexpectedis reserved for batch-fatal conditions only, currentlyERR_X_MEDKIT_GATEWAY_SHUTDOWN.
-
virtual bool has_publishers(const std::string &topic) override
-
virtual std::map<std::string, ComponentTopics> build_component_topic_map() override
-
virtual ComponentTopics get_component_topics(const std::string &component_fqn) override
-
virtual TopicDiscoveryResult discover_topics_by_namespace() override
-
virtual std::set<std::string> discover_topic_namespaces() override
-
virtual ComponentTopics get_topics_for_namespace(const std::string &ns_prefix) override
-
virtual std::vector<TopicEndpoint> get_topic_publishers(const std::string &topic) override
-
virtual std::vector<TopicEndpoint> get_topic_subscribers(const std::string &topic) override
-
virtual TopicConnection get_topic_connection(const std::string &topic) override
-
virtual nlohmann::json x_medkit_stats() const override
Serialize implementation-specific stats for /health exposure.
Default: empty object. Subclasses that carry runtime state (pool, queue, watchdog) populate this with vendor-extension keys prefixed
x-medkit-*. Values must be atomic reads only; /health must not block on this call.
-
void sweep_idle_entries()
Walk the pool and evict entries idle longer than cfg.idle_safety_net.
Normally driven by an internal wall timer on the subscription node. Exposed publicly so tests can trigger a deterministic sweep without relying on timer cadence.
Public Static Functions
-
static bool is_system_topic(const std::string &topic_name)
Identify whether a topic is a ROS 2 system/infrastructure topic (filter target).
-
struct Config
Public Functions
-
inline Config()
Public Members
-
std::size_t max_pool_size
-
std::chrono::milliseconds idle_safety_net
-
std::chrono::milliseconds idle_sweep_tick
-
std::size_t cold_wait_cap
-
std::size_t max_parallel_samples
Upper bound on concurrent sampler threads launched by sample_parallel(). Prevents the provider from becoming a self-DoS vector under large topic lists: excess topics are processed in follow-up chunks instead of spawning one std::async per topic.
-
inline Config()
-
struct PoolStats