2 #include <g3log/g3log.hpp> 18 message.mutable_ds_request()->set_action(data::discovery::Action::INVALIDATE);
22 LOG(DBUG) <<
"An invalidation request was sent globally.";
46 CHECK(sender !=
nullptr) <<
"Sender address missing";
47 CHECK(message !=
nullptr) <<
"Message is missing";
50 if (message->content_case() == data::Message::ContentCase::kDsRequest)
52 switch (message->ds_request().action())
54 case data::discovery::Action::DISCOVER:
62 case data::discovery::Action::INVALIDATE:
73 LOG(WARNING) <<
"A discovery request with the unknown action '" << message->ds_request().action() <<
"' was received and promptly ignored.";
77 else if (message->content_case() == data::Message::ContentCase::kDsResponse)
96 CHECK(node !=
nullptr) <<
"Sender address missing";
97 CHECK(message !=
nullptr) <<
"Message is missing";
116 data::Message response;
121 response.mutable_header()->set_reply_to(message->header().identifier());
125 LOG(DBUG) <<
"A discovery request from node [" << node->
GetUUID() <<
"] was answered.";
131 CHECK(node !=
nullptr) <<
"Sender address missing";
132 CHECK(message !=
nullptr) <<
"Message is missing";
136 _remotes[node] = message->ds_response();
137 remotesGuard.unlock();
140 LOG(DBUG) <<
"A discovery response from node [" << node->
GetUUID() <<
"] was cached.";
146 observer->CachedDiscoveryResponseWasUpdated(node, message->ds_response());
153 CHECK(node !=
nullptr) <<
"Sender address missing";
161 LOG(DBUG) <<
"An invalidation request from node [" << node->
GetUUID() <<
"] was received.";
167 data::Message request;
168 request.mutable_ds_request()->set_action(data::discovery::Action::DISCOVER);
174 LOG(DBUG) <<
"An automatic discovery request was sent to [" << node->
GetUUID() <<
"] after the node requested invalidation.";
181 CHECK(node !=
nullptr) <<
"Node address missing";
188 LOG(DBUG) <<
"Discovery information on [" << node->GetUUID() <<
"] was erased after the node left.";
194 CHECK(node !=
nullptr) <<
"Node address missing";
200 data::Message request;
201 request.mutable_ds_request()->set_action(data::discovery::Action::DISCOVER);
207 LOG(DBUG) <<
"An automatic discovery request was sent to [" << node->GetUUID() <<
"] after the node joined.";
214 CHECK(node !=
nullptr) <<
"Destination address missing";
215 CHECK(endpoint !=
nullptr) <<
"Endpoint is missing";
218 data::Message request;
219 request.mutable_header()->set_reliability(data::Reliability::NACK_REQUESTED);
220 request.mutable_ds_request()->set_action(data::discovery::Action::DISCOVER);
223 endpoint->
Tag(&request);
225 endpoint->
Send(&request, node);
228 LOG(DBUG) <<
"A discovery request was sent to [" << node->
GetUUID() <<
"].";
236 CHECK(node !=
nullptr) <<
"Destination address missing";
240 auto existing =
_remotes.find(node);
244 LOG(DBUG) <<
"Cached discovery information was returned for [" << node->
GetUUID() <<
"].";
261 data::Message request;
262 request.mutable_ds_request()->set_action(data::discovery::Action::DISCOVER);
268 LOG(DBUG) <<
"A global discovery request was sent.";
virtual void NodeWillLeave(const Node *node) noexceptoverride
Called when a Node signals that it will leave.
virtual bool ReceiveMessage(const Node *sender, const data::Message *message) override
Delivery point of all messages.
void RegisterDiscoverable(Discoverable *discoverable)
Register a new Discoverable service.
std::map< const Node *, data::discovery::Response > _remotes
Cache of remote node discovery responses.
virtual void Tag(data::Message *message)=0
Set the message identifier for a message.
void CacheDiscoveryResponse(const Node *node, const data::Message *message)
Handle incoming discovery requests.
DiscoveryAwaiter CachedQuery(const Node *node)
Send a Discovery query to a remote node, or if the information already exists in the cache...
bool _cacheValid
Tracks whether the current cache is valid.
std::map< const Node *, data::discovery::Response > GetCachedNodeInformation()
Returns a map containing all currently valid node information in the cache.
std::set< Observer * > _observers
List of discovery observers.
An Awaiter that returns discovery data on a remote node.
void UnregisterDiscoverable(Discoverable *discoverable)
Unregister a Discoverable service.
virtual void Send(data::Message *message, const Node *node)=0
Send a message to a specific member of the swarm. Call with node set to nullptr to send a message to ...
void GlobalQuery()
Sends a global Discovery request.
data::discovery::Response _cachedResponse
Cached discovery response.
void Invalidate()
Invalidate the current cached descriptor.
std::mutex _remotesMutex
Mutex to protect remotes list.
std::mutex _observersMutex
Mutex to protect observer list.
Abstract base class for Endpoint implementations.
virtual void NodeDidJoin(const Node *node) noexceptoverride
Called when a new Node has joined the group.
bool _performActiveDiscovery
Should we send discovery requests automatically?
Endpoint * GetEndpoint()
Get the associated Endpoint.
std::mutex _discoverablesMutex
Mutex to protect discoverable list.
virtual const std::string & GetUUID() const =0
Returns the unique identifier of the node.
void HandleDiscoveryRequest(const Node *node, const data::Message *message)
Handle incoming discovery requests.
Interface for discoverable services.
static DiscoveryAwaiter Query(Endpoint *endpoint, const Node *node)
Send a Discovery query to a remote node.
Represents a Node the Endpoint knows about and can send messages to.
void HandleInvalidationRequest(const Node *node, const data::Message *message)
Invalidate information on a previously discovered node.
std::set< Discoverable * > _discoverables
List of discoverable services.