telemetry/Service.cpp
Go to the documentation of this file.
2 #include <swarmio/data/Helper.h>
3 #include <g3log/g3log.hpp>
4 #include <chrono>
5 
6 using namespace swarmio;
7 using namespace swarmio::services;
8 using namespace swarmio::services::telemetry;
9 using namespace std::literals::chrono_literals;
10 
11 UpdateAwaiter Service::Subscribe(Endpoint* endpoint, const Node* node, uint32_t interval, const std::list<std::string>& keys)
12 {
13  // Sanity checks
14  CHECK(endpoint != nullptr) << "No endpoint specified";
15  CHECK(node != nullptr) << "No target node specified";
16 
17  // Build message
18  data::Message request;
19  request.mutable_header()->set_reliability(data::Reliability::NACK_REQUESTED);
20  request.mutable_tm_subscribe_request()->set_interval(interval);
21 
22  // Add keys
23  for (auto& key : keys)
24  {
25  request.mutable_tm_subscribe_request()->add_keys(key);
26  }
27 
28  // Log outgoing request
29  LOG(DBUG) << "A SUBSCRIBE request " << ((keys.size() > 0) ? "for specific keys" : "for all keys") << " was sent to node [" << node->GetUUID() << "]";
30 
31  // Send and await response
32  endpoint->Tag(&request);
33  UpdateAwaiter awaiter(endpoint, request.header().identifier(), node);
34  endpoint->Send(&request, node);
35  return awaiter;
36 }
37 
39 {
40  // Make copy of current list of values
41  std::shared_lock<std::shared_timed_mutex> guardValues(_valuesMutex);
42  auto cache = _values;
43  guardValues.unlock();
44 
45  // Make copy of the status keys
46  std::shared_lock<std::shared_timed_mutex> guardSchema(_schemaMutex);
47  auto statusKeys = _statusKeys;
48  guardSchema.unlock();
49 
50  // Send status broadcast
51  if (!statusKeys.empty())
52  {
53  // Build message
54  data::Message message;
55  auto& pairs = *message.mutable_tm_status()->mutable_values();
56  for (const auto& key : statusKeys)
57  {
58  auto entry = cache.find(key);
59  if (entry != cache.end())
60  {
61  pairs[key] = (*entry).second;
62  }
63  else
64  {
65  // Quietly ignore keys not found locally
66  }
67  }
68 
69  // Send message
70  if (!pairs.empty())
71  {
72  GetEndpoint()->Send(&message, nullptr);
73  }
74  }
75 
76  // Send updates to telemetry subscribers
77  std::shared_lock<std::shared_timed_mutex> guardTrackers(_trackersMutex);
78  bool hasInvalidTrackers = false;
79  for (auto& tracker : _trackers)
80  {
81  if (tracker.IsValid())
82  {
83  try
84  {
85  // Get current tick
86  uint32_t tick = tracker.GetAndIncrementTick();
87 
88  // Check if we need to send an update
89  if (tick % tracker.GetRequest().interval() == 0)
90  {
91  // Build message
92  data::Message response;
93  response.mutable_header()->set_reply_to(tracker.GetIdentifier());
94  response.mutable_tm_update()->set_tick(tick);
95 
96  // Add values
97  auto values = response.mutable_tm_update()->mutable_values();
98  if (tracker.GetRequest().keys_size() > 0)
99  {
100  // Only take keys as specified in the request
101  for (auto& key : tracker.GetRequest().keys())
102  {
103  auto entry = cache.find(key);
104  if (entry != cache.end())
105  {
106  (*values)[key] = (*entry).second;
107  }
108  else
109  {
110  // Quietly ignore keys not found locally
111  }
112  }
113  }
114  else
115  {
116  // Take all keys
117  for (auto& pair : cache)
118  {
119  (*values)[pair.first] = pair.second;
120  }
121  }
122 
123  // Send update
124  LOG(DBUG) << "An UPDATE " << ((tracker.GetRequest().keys_size() > 0) ? "for specific keys" : "for all keys") << " was sent to node [" << tracker.GetNode()->GetUUID() << "]";
125  GetEndpoint()->Send(&response, tracker.GetNode());
126  }
127  }
128  catch (const std::exception& e)
129  {
130  LOG(WARNING) << "An error has occurred while trying to send an update to node [" << tracker.GetNode()->GetUUID() << "]: " << e.what();
131  }
132  }
133  else
134  {
135  LOG(DBUG) << "Tracker #" << tracker.GetIdentifier() << " of node [" << tracker.GetNode()->GetUUID() << "] was marked for removal";
136  hasInvalidTrackers = false;
137  }
138  }
139  guardTrackers.unlock();
140 
141  // Remove invalid trackers
142  if (hasInvalidTrackers)
143  {
144  std::unique_lock<std::shared_timed_mutex> guard(_trackersMutex);
145  _trackers.remove_if([](const Tracker& tracker){ return !tracker.IsValid(); });
146  LOG(DBUG) << "Invalidated trackers have been removed";
147  }
148 }
149 
150 bool Service::ReceiveMessage(const Node* sender, const data::Message* message)
151 {
152  if (message->content_case() == data::Message::ContentCase::kTmSubscribeRequest)
153  {
154  if (message->tm_subscribe_request().interval() > 0)
155  {
156  std::unique_lock<std::shared_timed_mutex> guard(_trackersMutex);
157 
158  // Add new tracker
159  _trackers.emplace_back(sender, message->header().identifier(), message->tm_subscribe_request());
160  return true;
161  }
162  else
163  {
164  // Invalid request
165  throw Exception("Invalid interval for SUBSCRIBE request");
166  }
167  }
168  else if (message->content_case() == data::Message::ContentCase::kTmUnsubscribeRequest)
169  {
170  std::shared_lock<std::shared_timed_mutex> guardTrackers(_trackersMutex);
171 
172  // Try and find the tracker
173  for (auto& tracker : _trackers)
174  {
175  if (tracker.GetIdentifier() == message->tm_unsubscribe_request().identifier())
176  {
177  // Invalidate tracker
178  tracker.Invalidate();
179  return true;
180  }
181  }
182 
183  // No such tracker found
184  return false;
185  }
186  else if (message->content_case() == data::Message::ContentCase::kTmStatus)
187  {
188  // Cache report
189  std::unique_lock<std::shared_timed_mutex> reportsGuard(_reportsMutex);
190  _reports[sender] = message->tm_status();
191  reportsGuard.unlock();
192 
193  // Log
194  LOG(DBUG) << "A status update from node [" << sender->GetUUID() << "] was cached.";
195 
196  // Call observers
197  std::shared_lock<std::shared_timed_mutex> observersGuard(_observersMutex);
198  for (auto observer : _observers)
199  {
200  observer->CachedStatusWasUpdated(sender, message->tm_status());
201  }
202 
203  // Mark as handled
204  return true;
205  }
206  else
207  {
208  return false;
209  }
210 }
211 
212 void Service::DescribeService(data::discovery::Response& descriptor)
213 {
214  std::shared_lock<std::shared_timed_mutex> guard(_schemaMutex);
215 
216  // Simply return with the cached schema
217  *descriptor.mutable_telemetry_schema() = _schema;
218 }
virtual bool ReceiveMessage(const Node *sender, const data::Message *message) override
Delivery point of all messages.
virtual void Tag(data::Message *message)=0
Set the message identifier for a message.
static UpdateAwaiter Subscribe(Endpoint *endpoint, const Node *node, uint32_t interval=1)
Subscribe to all named values on the remote node.
virtual void Update() overridefinal
Send an update to all subscribers.
Exception class thrown by all library classes.
Telemetry Service can subscribe to receive updates from remote nodes on named values.
Definition: Tracker.h:16
virtual void Send(data::Message *message, const Node *node)=0
Send a message to a specific member of the swarm. Call with node set to nullptr to send a message to ...
bool IsValid() const
Is the tracker still valid?
Definition: Tracker.h:118
An Awaiter that has a longer lifetime and is updated periodically.
Definition: UpdateAwaiter.h:12
Abstract base class for Endpoint implementations.
Definition: Endpoint.h:25
virtual void DescribeService(data::discovery::Response &descriptor) override
Add descriptors for the service to the discovery descriptor.
virtual const std::string & GetUUID() const =0
Returns the unique identifier of the node.
Represents a Node the Endpoint knows about and can send messages to.


swarmros
Author(s):
autogenerated on Fri Apr 3 2020 03:42:48