3 #include <g3log/g3log.hpp> 14 CHECK(endpoint !=
nullptr) <<
"No endpoint specified";
15 CHECK(node !=
nullptr) <<
"No target node specified";
18 data::Message request;
19 request.mutable_header()->set_reliability(data::Reliability::NACK_REQUESTED);
20 request.mutable_tm_subscribe_request()->set_interval(interval);
23 for (
auto& key : keys)
25 request.mutable_tm_subscribe_request()->add_keys(key);
29 LOG(DBUG) <<
"A SUBSCRIBE request " << ((keys.size() > 0) ?
"for specific keys" :
"for all keys") <<
" was sent to node [" << node->
GetUUID() <<
"]";
32 endpoint->
Tag(&request);
33 UpdateAwaiter awaiter(endpoint, request.header().identifier(), node);
34 endpoint->
Send(&request, node);
41 std::shared_lock<std::shared_timed_mutex> guardValues(_valuesMutex);
46 std::shared_lock<std::shared_timed_mutex> guardSchema(_schemaMutex);
47 auto statusKeys = _statusKeys;
51 if (!statusKeys.empty())
55 auto& pairs = *message.mutable_tm_status()->mutable_values();
56 for (
const auto& key : statusKeys)
58 auto entry = cache.find(key);
59 if (entry != cache.end())
61 pairs[key] = (*entry).second;
72 GetEndpoint()->Send(&message,
nullptr);
77 std::shared_lock<std::shared_timed_mutex> guardTrackers(_trackersMutex);
78 bool hasInvalidTrackers =
false;
79 for (
auto& tracker : _trackers)
81 if (tracker.IsValid())
86 uint32_t tick = tracker.GetAndIncrementTick();
89 if (tick % tracker.GetRequest().interval() == 0)
92 data::Message response;
93 response.mutable_header()->set_reply_to(tracker.GetIdentifier());
94 response.mutable_tm_update()->set_tick(tick);
97 auto values = response.mutable_tm_update()->mutable_values();
98 if (tracker.GetRequest().keys_size() > 0)
101 for (
auto& key : tracker.GetRequest().keys())
103 auto entry = cache.find(key);
104 if (entry != cache.end())
106 (*values)[key] = (*entry).second;
117 for (
auto& pair : cache)
119 (*values)[pair.first] = pair.second;
124 LOG(DBUG) <<
"An UPDATE " << ((tracker.GetRequest().keys_size() > 0) ?
"for specific keys" :
"for all keys") <<
" was sent to node [" << tracker.GetNode()->GetUUID() <<
"]";
125 GetEndpoint()->Send(&response, tracker.GetNode());
128 catch (
const std::exception& e)
130 LOG(WARNING) <<
"An error has occurred while trying to send an update to node [" << tracker.GetNode()->GetUUID() <<
"]: " << e.what();
135 LOG(DBUG) <<
"Tracker #" << tracker.GetIdentifier() <<
" of node [" << tracker.GetNode()->GetUUID() <<
"] was marked for removal";
136 hasInvalidTrackers =
false;
139 guardTrackers.unlock();
142 if (hasInvalidTrackers)
144 std::unique_lock<std::shared_timed_mutex> guard(_trackersMutex);
145 _trackers.remove_if([](
const Tracker& tracker){
return !tracker.
IsValid(); });
146 LOG(DBUG) <<
"Invalidated trackers have been removed";
152 if (message->content_case() == data::Message::ContentCase::kTmSubscribeRequest)
154 if (message->tm_subscribe_request().interval() > 0)
156 std::unique_lock<std::shared_timed_mutex> guard(_trackersMutex);
159 _trackers.emplace_back(sender, message->header().identifier(), message->tm_subscribe_request());
165 throw Exception(
"Invalid interval for SUBSCRIBE request");
168 else if (message->content_case() == data::Message::ContentCase::kTmUnsubscribeRequest)
170 std::shared_lock<std::shared_timed_mutex> guardTrackers(_trackersMutex);
173 for (
auto& tracker : _trackers)
175 if (tracker.GetIdentifier() == message->tm_unsubscribe_request().identifier())
178 tracker.Invalidate();
186 else if (message->content_case() == data::Message::ContentCase::kTmStatus)
189 std::unique_lock<std::shared_timed_mutex> reportsGuard(_reportsMutex);
190 _reports[sender] = message->tm_status();
191 reportsGuard.unlock();
194 LOG(DBUG) <<
"A status update from node [" << sender->
GetUUID() <<
"] was cached.";
197 std::shared_lock<std::shared_timed_mutex> observersGuard(_observersMutex);
198 for (
auto observer : _observers)
200 observer->CachedStatusWasUpdated(sender, message->tm_status());
214 std::shared_lock<std::shared_timed_mutex> guard(_schemaMutex);
217 *descriptor.mutable_telemetry_schema() = _schema;
virtual bool ReceiveMessage(const Node *sender, const data::Message *message) override
Delivery point of all messages.
virtual void Tag(data::Message *message)=0
Set the message identifier for a message.
static UpdateAwaiter Subscribe(Endpoint *endpoint, const Node *node, uint32_t interval=1)
Subscribe to all named values on the remote node.
virtual void Update() overridefinal
Send an update to all subscribers.
Exception class thrown by all library classes.
Telemetry Service can subscribe to receive updates from remote nodes on named values.
virtual void Send(data::Message *message, const Node *node)=0
Send a message to a specific member of the swarm. Call with node set to nullptr to send a message to ...
bool IsValid() const
Is the tracker still valid?
An Awaiter that has a longer lifetime and is updated periodically.
Abstract base class for Endpoint implementations.
virtual void DescribeService(data::discovery::Response &descriptor) override
Add descriptors for the service to the discovery descriptor.
virtual const std::string & GetUUID() const =0
Returns the unique identifier of the node.
Represents a Node the Endpoint knows about and can send messages to.