discovery/Service.cpp
Go to the documentation of this file.
2 #include <g3log/g3log.hpp>
3 
4 using namespace swarmio;
5 using namespace swarmio::services;
6 using namespace swarmio::services::discovery;
7 
9 {
10  std::unique_lock<std::mutex> guard(_discoverablesMutex);
11 
12  // Mark cache as invalid
13  _cacheValid = false;
14  guard.unlock();
15 
16  // Send invalidation request
17  data::Message message;
18  message.mutable_ds_request()->set_action(data::discovery::Action::INVALIDATE);
19  GetEndpoint()->Send(&message, nullptr);
20 
21  // Log outgoing event
22  LOG(DBUG) << "An invalidation request was sent globally.";
23 }
24 
26 {
27  std::lock_guard<std::mutex> guard(_discoverablesMutex);
28 
29  // Insert new Discoverable and invalidate cache
30  _discoverables.insert(discoverable);
31  _cacheValid = false;
32 }
33 
35 {
36  std::lock_guard<std::mutex> guard(_discoverablesMutex);
37 
38  // Remove Discoverable and invalidate cache
39  _discoverables.erase(discoverable);
40  _cacheValid = false;
41 }
42 
43 bool Service::ReceiveMessage(const Node* sender, const data::Message* message)
44 {
45  // Sanity checks
46  CHECK(sender != nullptr) << "Sender address missing";
47  CHECK(message != nullptr) << "Message is missing";
48 
49  // Handle message types
50  if (message->content_case() == data::Message::ContentCase::kDsRequest)
51  {
52  switch (message->ds_request().action())
53  {
54  case data::discovery::Action::DISCOVER:
55 
56  // Answer discovery
57  HandleDiscoveryRequest(sender, message);
58 
59  // Mark as handled
60  return true;
61 
62  case data::discovery::Action::INVALIDATE:
63 
64  // Invalidate cached information
65  HandleInvalidationRequest(sender, message);
66 
67  // Mark as handled
68  return true;
69 
70  default:
71 
72  // Unknown action
73  LOG(WARNING) << "A discovery request with the unknown action '" << message->ds_request().action() << "' was received and promptly ignored.";
74  return false;
75  }
76  }
77  else if (message->content_case() == data::Message::ContentCase::kDsResponse)
78  {
79  // Cache
80  CacheDiscoveryResponse(sender, message);
81 
82  // Forward to awaiter
83  return false;
84  }
85  else
86  {
87  return false;
88  }
89 }
90 
91 void Service::HandleDiscoveryRequest(const Node* node, const data::Message* message)
92 {
93  std::unique_lock<std::mutex> guard(_discoverablesMutex);
94 
95  // Sanity checks
96  CHECK(node != nullptr) << "Sender address missing";
97  CHECK(message != nullptr) << "Message is missing";
98 
99  // Make sure the cache is valid
100  if (!_cacheValid)
101  {
102  // Clear existing cache
103  _cachedResponse.Clear();
104 
105  // Query services for information
106  for (auto discoverable : _discoverables)
107  {
108  discoverable->DescribeService(_cachedResponse);
109  }
110 
111  // Mark cache as valid
112  _cacheValid = true;
113  }
114 
115  // Wrap into message and unlock
116  data::Message response;
117  response.mutable_ds_response()->CopyFrom(_cachedResponse);
118  guard.unlock();
119 
120  // Send message
121  response.mutable_header()->set_reply_to(message->header().identifier());
122  GetEndpoint()->Send(&response, node);
123 
124  // Log outgoing response
125  LOG(DBUG) << "A discovery request from node [" << node->GetUUID() << "] was answered.";
126 }
127 
128 void Service::CacheDiscoveryResponse(const Node* node, const data::Message* message)
129 {
130  // Sanity checks
131  CHECK(node != nullptr) << "Sender address missing";
132  CHECK(message != nullptr) << "Message is missing";
133 
134  // Cache response
135  std::unique_lock<std::mutex> remotesGuard(_remotesMutex);
136  _remotes[node] = message->ds_response();
137  remotesGuard.unlock();
138 
139  // Log
140  LOG(DBUG) << "A discovery response from node [" << node->GetUUID() << "] was cached.";
141 
142  // Call observers
143  std::unique_lock<std::mutex> observersGuard(_observersMutex);
144  for (auto observer : _observers)
145  {
146  observer->CachedDiscoveryResponseWasUpdated(node, message->ds_response());
147  }
148 }
149 
150 void Service::HandleInvalidationRequest(const Node* node, const data::Message* message)
151 {
152  // Sanity checks
153  CHECK(node != nullptr) << "Sender address missing";
154 
155  // Erase cached information
156  std::unique_lock<std::mutex> guard(_remotesMutex);
157  _remotes.erase(node);
158  _remotesMutex.unlock();
159 
160  // Log
161  LOG(DBUG) << "An invalidation request from node [" << node->GetUUID() << "] was received.";
162 
163  // If active discovery is enabled, immediately send a discovery request
165  {
166  // Build message
167  data::Message request;
168  request.mutable_ds_request()->set_action(data::discovery::Action::DISCOVER);
169 
170  // Send message
171  GetEndpoint()->Send(&request, node);
172 
173  // Log outgoing request
174  LOG(DBUG) << "An automatic discovery request was sent to [" << node->GetUUID() << "] after the node requested invalidation.";
175  }
176 }
177 
178 void Service::NodeWillLeave(const Node* node) noexcept
179 {
180  // Sanity checks
181  CHECK(node != nullptr) << "Node address missing";
182 
183  // Erase cached information
184  std::lock_guard<std::mutex> guard(_remotesMutex);
185  _remotes.erase(node);
186 
187  // Log
188  LOG(DBUG) << "Discovery information on [" << node->GetUUID() << "] was erased after the node left.";
189 }
190 
191 void Service::NodeDidJoin(const Node* node) noexcept
192 {
193  // Sanity checks
194  CHECK(node != nullptr) << "Node address missing";
195 
196  // If active discovery is enabled, immediately send a discovery request
198  {
199  // Build message
200  data::Message request;
201  request.mutable_ds_request()->set_action(data::discovery::Action::DISCOVER);
202 
203  // Send message
204  GetEndpoint()->Send(&request, node);
205 
206  // Log outgoing request
207  LOG(DBUG) << "An automatic discovery request was sent to [" << node->GetUUID() << "] after the node joined.";
208  }
209 }
210 
212 {
213  // Sanity checks
214  CHECK(node != nullptr) << "Destination address missing";
215  CHECK(endpoint != nullptr) << "Endpoint is missing";
216 
217  // Build message
218  data::Message request;
219  request.mutable_header()->set_reliability(data::Reliability::NACK_REQUESTED);
220  request.mutable_ds_request()->set_action(data::discovery::Action::DISCOVER);
221 
222  // Send and await response
223  endpoint->Tag(&request);
224  DiscoveryAwaiter awaiter(endpoint, request.header().identifier());
225  endpoint->Send(&request, node);
226 
227  // Log outgoing message
228  LOG(DBUG) << "A discovery request was sent to [" << node->GetUUID() << "].";
229 
230  return awaiter;
231 }
232 
234 {
235  // Sanity checks
236  CHECK(node != nullptr) << "Destination address missing";
237 
238  // Try finding the cached value
239  std::unique_lock<std::mutex> guard(_remotesMutex);
240  auto existing = _remotes.find(node);
241  if (existing != _remotes.end())
242  {
243  // Log
244  LOG(DBUG) << "Cached discovery information was returned for [" << node->GetUUID() << "].";
245 
246  // Get it from the cache
247  return DiscoveryAwaiter(existing->second);
248  }
249  else
250  {
251  guard.unlock();
252 
253  // Issue request
254  return Query(GetEndpoint(), node);
255  }
256 }
257 
259 {
260  // Build message
261  data::Message request;
262  request.mutable_ds_request()->set_action(data::discovery::Action::DISCOVER);
263 
264  // Send message
265  GetEndpoint()->Send(&request, nullptr);
266 
267  // Log request
268  LOG(DBUG) << "A global discovery request was sent.";
269 }
270 
271 std::map<const Node*, data::discovery::Response> Service::GetCachedNodeInformation()
272 {
273  std::lock_guard<std::mutex> guard(_remotesMutex);
274 
275  // Return a copy of the map
276  return _remotes;
277 }
virtual void NodeWillLeave(const Node *node) noexceptoverride
Called when a Node signals that it will leave.
virtual bool ReceiveMessage(const Node *sender, const data::Message *message) override
Delivery point of all messages.
void RegisterDiscoverable(Discoverable *discoverable)
Register a new Discoverable service.
std::map< const Node *, data::discovery::Response > _remotes
Cache of remote node discovery responses.
virtual void Tag(data::Message *message)=0
Set the message identifier for a message.
void CacheDiscoveryResponse(const Node *node, const data::Message *message)
Handle incoming discovery requests.
DiscoveryAwaiter CachedQuery(const Node *node)
Send a Discovery query to a remote node, or if the information already exists in the cache...
bool _cacheValid
Tracks whether the current cache is valid.
std::map< const Node *, data::discovery::Response > GetCachedNodeInformation()
Returns a map containing all currently valid node information in the cache.
std::set< Observer * > _observers
List of discovery observers.
An Awaiter that returns discovery data on a remote node.
void UnregisterDiscoverable(Discoverable *discoverable)
Unregister a Discoverable service.
virtual void Send(data::Message *message, const Node *node)=0
Send a message to a specific member of the swarm. Call with node set to nullptr to send a message to ...
void GlobalQuery()
Sends a global Discovery request.
data::discovery::Response _cachedResponse
Cached discovery response.
void Invalidate()
Invalidate the current cached descriptor.
std::mutex _remotesMutex
Mutex to protect remotes list.
std::mutex _observersMutex
Mutex to protect observer list.
Abstract base class for Endpoint implementations.
Definition: Endpoint.h:25
virtual void NodeDidJoin(const Node *node) noexceptoverride
Called when a new Node has joined the group.
bool _performActiveDiscovery
Should we send discovery requests automatically?
Endpoint * GetEndpoint()
Get the associated Endpoint.
Definition: Mailbox.h:144
std::mutex _discoverablesMutex
Mutex to protect discoverable list.
virtual const std::string & GetUUID() const =0
Returns the unique identifier of the node.
void HandleDiscoveryRequest(const Node *node, const data::Message *message)
Handle incoming discovery requests.
Interface for discoverable services.
Definition: Discoverable.h:12
static DiscoveryAwaiter Query(Endpoint *endpoint, const Node *node)
Send a Discovery query to a remote node.
Represents a Node the Endpoint knows about and can send messages to.
void HandleInvalidationRequest(const Node *node, const data::Message *message)
Invalidate information on a previously discovered node.
std::set< Discoverable * > _discoverables
List of discoverable services.


swarmros
Author(s):
autogenerated on Fri Apr 3 2020 03:42:48