Class SubscriptionManager

Class Documentation

class SubscriptionManager

Thread-safe in-memory store for cyclic subscriptions.

Pure C++ class (no ROS 2 dependency). Manages subscription lifecycle: CRUD operations, capacity enforcement, expiry cleanup, and stream synchronization via per-subscription condition variables.

Public Functions

explicit SubscriptionManager(size_t max_subscriptions = 100)
~SubscriptionManager()
SubscriptionManager(const SubscriptionManager&) = delete
SubscriptionManager &operator=(const SubscriptionManager&) = delete
SubscriptionManager(SubscriptionManager&&) = delete
SubscriptionManager &operator=(SubscriptionManager&&) = delete
tl::expected<CyclicSubscriptionInfo, std::string> create(const std::string &entity_id, const std::string &entity_type, const std::string &resource_uri, const std::string &collection, const std::string &resource_path, const std::string &protocol, CyclicInterval interval, int duration_sec)

Create a new subscription. Returns subscription info or error string.

std::optional<CyclicSubscriptionInfo> get(const std::string &sub_id) const

Get a subscription by ID.

std::vector<CyclicSubscriptionInfo> list(const std::string &entity_id) const

List all subscriptions for an entity.

tl::expected<CyclicSubscriptionInfo, std::string> update(const std::string &sub_id, std::optional<CyclicInterval> new_interval, std::optional<int> new_duration)

Update interval and/or duration. Returns updated info or error.

bool remove(const std::string &sub_id)

Remove a subscription. Returns true if found and removed.

size_t cleanup_expired()

Remove all expired subscriptions. Returns number removed.

size_t active_count() const

Number of active subscriptions.

size_t max_subscriptions() const

Maximum allowed subscriptions.

void shutdown()

Signal shutdown — wakes all waiting streams.

void set_on_removed(std::function<void(const CyclicSubscriptionInfo&)> callback)

Set callback invoked when a subscription is removed (remove, cleanup_expired, shutdown). MUST be called during initialization only, before any concurrent access.

bool wait_for_update(const std::string &sub_id, std::chrono::milliseconds timeout)

Wait for an update or removal on the given subscription.

Used by SSE stream threads to sleep between samples. Returns true if woken by notify/remove/shutdown, false on timeout (normal interval wait).

bool is_active(const std::string &sub_id) const

Check if a subscription is active (exists and not expired)

void notify(const std::string &sub_id)

Notify a subscription’s waiting stream (e.g., after update or for shutdown)