telemetry/Service.h
Go to the documentation of this file.
1 #pragma once
2 
8 #include <swarmio/data/Variant.pb.h>
9 #include <list>
10 #include <string>
11 #include <shared_mutex>
12 #include <thread>
13 
15 {
21  class SWARMIO_API Service final : public PeriodicService, public discovery::Discoverable
22  {
23  private:
24 
29  std::map<std::string, data::Variant> _values;
30 
35  std::shared_timed_mutex _valuesMutex;
36 
41  data::discovery::Schema _schema;
42 
47  std::set<std::string> _statusKeys;
48 
53  std::shared_timed_mutex _schemaMutex;
54 
59  std::list<Tracker> _trackers;
60 
65  std::shared_timed_mutex _trackersMutex;
66 
71  std::set<Observer*> _observers;
72 
77  std::shared_timed_mutex _observersMutex;
78 
83  std::map<const Node*, data::telemetry::Status> _reports;
84 
89  std::shared_timed_mutex _reportsMutex;
90 
91  protected:
92 
97  virtual void Update() override final;
98 
99  public:
100 
109  static UpdateAwaiter Subscribe(Endpoint* endpoint, const Node* node, uint32_t interval = 1)
110  {
111  std::list<std::string> emptyList;
112  return Subscribe(endpoint, node, interval, emptyList);
113  }
114 
124  static UpdateAwaiter Subscribe(Endpoint* endpoint, const Node* node, uint32_t interval, const std::list<std::string>& keys);
125 
132  Service(Endpoint* endpoint, std::chrono::milliseconds period = std::chrono::milliseconds(10))
133  : PeriodicService(endpoint, period)
134  {
135  FinishConstruction();
136  }
137 
144  void SetValue(const std::string& key, const data::Variant& value)
145  {
146  std::unique_lock<std::shared_timed_mutex> guard(_valuesMutex);
147  _values[key] = value;
148  }
149 
155  void RemoveValue(const std::string& key)
156  {
157  std::unique_lock<std::shared_timed_mutex> guard(_valuesMutex);
158  _values.erase(key);
159  }
160 
167  void SetFieldDefinitionForKey(const std::string& key, const data::discovery::Field& field, bool includeInStatus)
168  {
169  std::unique_lock<std::shared_timed_mutex> guard(_schemaMutex);
170  (*_schema.mutable_fields())[key] = field;
171  if (includeInStatus)
172  {
173  _statusKeys.insert(key);
174  }
175  else
176  {
177  _statusKeys.erase(key);
178  }
179  }
180 
186  void RemoveFieldDefinitionForKey(const std::string& key)
187  {
188  std::unique_lock<std::shared_timed_mutex> guard(_schemaMutex);
189  _schema.mutable_fields()->erase(key);
190  _statusKeys.erase(key);
191  }
192 
198  void RegisterObserver(Observer* observer)
199  {
200  std::unique_lock<std::shared_timed_mutex> guard(_observersMutex);
201  _observers.insert(observer);
202  }
203 
209  void UnregisterObserver(Observer* observer)
210  {
211  std::unique_lock<std::shared_timed_mutex> guard(_observersMutex);
212  _observers.erase(observer);
213  }
214 
215  data::telemetry::Status GetCachedStatus(const Node* node)
216  {
217  std::shared_lock<std::shared_timed_mutex> guard(_reportsMutex);
218  return _reports[node];
219  }
220 
229  virtual bool ReceiveMessage(const Node* sender, const data::Message* message) override;
230 
238  virtual void DescribeService(data::discovery::Response& descriptor) override;
239  };
240 }
std::shared_timed_mutex _valuesMutex
Mutex to protect the map of values.
void RemoveValue(const std::string &key)
Remove a value from the local telemetry cache.
Telemetry Service can subscribe to receive updates from remote nodes on named values.
std::map< const Node *, data::telemetry::Status > _reports
Cached values for remote status reports.
void SetValue(const std::string &key, const data::Variant &value)
Add or update a value in the local telemetry cache.
void SetFieldDefinitionForKey(const std::string &key, const data::discovery::Field &field, bool includeInStatus)
Add field to the schema.
Interface for discovery observers.
A service that uses a periodic background worker to perform its work.
void UnregisterObserver(Observer *observer)
Unregister a status observer.
std::shared_timed_mutex _trackersMutex
Mutex to protect the list of trackers.
data::discovery::Schema _schema
Schema.
std::set< std::string > _statusKeys
List of keys to include in the status broadcast.
data::telemetry::Status GetCachedStatus(const Node *node)
An Awaiter that has a longer lifetime and is updated periodically.
Definition: UpdateAwaiter.h:12
std::shared_timed_mutex _observersMutex
Mutex to protect the list of observers.
Abstract base class for Endpoint implementations.
Definition: Endpoint.h:25
std::list< Tracker > _trackers
Trackers for each remote subscription.
std::shared_timed_mutex _schemaMutex
Mutex to protect the schema.
void RegisterObserver(Observer *observer)
Register a new status observer.
void RemoveFieldDefinitionForKey(const std::string &key)
Remove a field from the schema.
Service(Endpoint *endpoint, std::chrono::milliseconds period=std::chrono::milliseconds(10))
Construct a new Service object.
Interface for discoverable services.
Definition: Discoverable.h:12
std::map< std::string, data::Variant > _values
List of published telemetry values.
std::set< Observer * > _observers
List of observers.
Represents a Node the Endpoint knows about and can send messages to.
std::shared_timed_mutex _reportsMutex
Mutex to protect remote status reports.


swarmros
Author(s):
autogenerated on Fri Apr 3 2020 03:42:48