3 #include <g3log/g3log.hpp> 12 CHECK(endpoint !=
nullptr) <<
"No endpoint specified";
13 CHECK(node !=
nullptr) <<
"No target node specified";
16 data::Message request;
17 request.mutable_header()->set_reliability(data::Reliability::NACK_REQUESTED);
18 request.mutable_kv_get_request()->set_key(path);
21 LOG(DBUG) <<
"A GET request was sent for the remote path '" << path <<
"' on node [" << node->
GetUUID() <<
"]";
24 endpoint->
Tag(&request);
25 ValueAwaiter awaiter(endpoint, request.header().identifier(), path);
26 endpoint->
Send(&request, node);
33 CHECK(endpoint !=
nullptr) <<
"No endpoint specified";
34 CHECK(node !=
nullptr) <<
"No target node specified";
37 data::Message request;
38 request.mutable_header()->set_reliability(data::Reliability::ACK_REQUESTED);
39 request.mutable_kv_set_request()->set_key(path);
40 request.mutable_kv_set_request()->mutable_value()->CopyFrom(value);
43 LOG(DBUG) <<
"A SET request was sent for the remote path '" << path <<
"' on node [" << node->
GetUUID() <<
"]";
46 endpoint->
Tag(&request);
47 ErrorAwaiter awaiter(endpoint, request.header().identifier());
48 endpoint->
Send(&request, node);
55 CHECK(sender !=
nullptr) <<
"Sender address missing";
56 CHECK(message !=
nullptr) <<
"Message is missing";
59 switch (message->content_case())
61 case data::Message::ContentCase::kKvGetRequest:
64 case data::Message::ContentCase::kKvSetRequest:
74 std::unique_lock<std::mutex> guard(
_mutex);
77 auto target =
_targets.find(message->kv_get_request().key());
82 reply.mutable_kv_get_response()->mutable_value()->CopyFrom(target->second->Get(message->kv_get_request().key()));
86 reply.mutable_header()->set_reply_to(message->header().identifier());
87 reply.mutable_kv_get_response()->set_key(message->kv_get_request().key());
91 LOG(DBUG) <<
"A GET request for the local path '" << message->kv_get_request().key() <<
"' from node [" << sender->
GetUUID() <<
"] was handled";
104 std::lock_guard<std::mutex> guard(
_mutex);
107 auto target =
_targets.find(message->kv_set_request().key());
111 target->second->Set(message->kv_set_request().key(), message->kv_set_request().value());
114 LOG(DBUG) <<
"A SET request for the local path '" << message->kv_set_request().key() <<
"' from node [" << sender->
GetUUID() <<
"] was handled";
127 std::lock_guard<std::mutex> guard(
_mutex);
137 throw Exception(
"Path already registered");
143 std::lock_guard<std::mutex> guard(
_mutex);
151 std::lock_guard<std::mutex> guard(
_mutex);
154 auto& fields = *descriptor.mutable_keyvalue_schema()->mutable_fields();
157 fields[target.first] = target.second->GetFieldDescriptor(target.first);
Abstract base class for registered keys.
void RegisterTarget(const std::string &path, Target *target)
Register a new Target with the specified path.
virtual void Tag(data::Message *message)=0
Set the message identifier for a message.
void UnregisterTarget(const std::string &path)
Unregister a Target.
Exception class thrown by all library classes.
std::map< std::string, Target * > _targets
Key-target map.
bool HandleSetRequest(const Node *sender, const data::Message *message)
Handle remote Set requests.
virtual void Send(data::Message *message, const Node *node)=0
Send a message to a specific member of the swarm. Call with node set to nullptr to send a message to ...
virtual bool ReceiveMessage(const Node *sender, const data::Message *message) override
Delivery point of all messages.
std::mutex _mutex
Mutex to protect target list.
static ErrorAwaiter Set(Endpoint *endpoint, const Node *node, const std::string &path, const data::Variant &value)
Set a remote value.
An Awaiter that checks whether the operation was a success.
Abstract base class for Endpoint implementations.
An Awaiter that returns the requested value.
Endpoint * GetEndpoint()
Get the associated Endpoint.
bool HandleGetRequest(const Node *sender, const data::Message *message)
Handle remote Get requests.
virtual const std::string & GetUUID() const =0
Returns the unique identifier of the node.
static ValueAwaiter Get(Endpoint *endpoint, const Node *node, const std::string &path)
Get a remote value.
virtual void DescribeService(data::discovery::Response &descriptor) override
Add descriptors for the service to the discovery descriptor.
Represents a Node the Endpoint knows about and can send messages to.