keyvalue/Service.cpp
Go to the documentation of this file.
2 #include <swarmio/Exception.h>
3 #include <g3log/g3log.hpp>
4 
5 using namespace swarmio;
6 using namespace swarmio::services;
7 using namespace swarmio::services::keyvalue;
8 
9 ValueAwaiter Service::Get(Endpoint* endpoint, const Node* node, const std::string& path)
10 {
11  // Sanity checks
12  CHECK(endpoint != nullptr) << "No endpoint specified";
13  CHECK(node != nullptr) << "No target node specified";
14 
15  // Build message
16  data::Message request;
17  request.mutable_header()->set_reliability(data::Reliability::NACK_REQUESTED);
18  request.mutable_kv_get_request()->set_key(path);
19 
20  // Log outgoing request
21  LOG(DBUG) << "A GET request was sent for the remote path '" << path << "' on node [" << node->GetUUID() << "]";
22 
23  // Send and await response
24  endpoint->Tag(&request);
25  ValueAwaiter awaiter(endpoint, request.header().identifier(), path);
26  endpoint->Send(&request, node);
27  return awaiter;
28 }
29 
30 ErrorAwaiter Service::Set(Endpoint* endpoint, const Node* node, const std::string& path, const data::Variant& value)
31 {
32  // Sanity checks
33  CHECK(endpoint != nullptr) << "No endpoint specified";
34  CHECK(node != nullptr) << "No target node specified";
35 
36  // Build message
37  data::Message request;
38  request.mutable_header()->set_reliability(data::Reliability::ACK_REQUESTED);
39  request.mutable_kv_set_request()->set_key(path);
40  request.mutable_kv_set_request()->mutable_value()->CopyFrom(value);
41 
42  // Log outgoing request
43  LOG(DBUG) << "A SET request was sent for the remote path '" << path << "' on node [" << node->GetUUID() << "]";
44 
45  // Send and await response
46  endpoint->Tag(&request);
47  ErrorAwaiter awaiter(endpoint, request.header().identifier());
48  endpoint->Send(&request, node);
49  return awaiter;
50 }
51 
52 bool Service::ReceiveMessage(const Node* sender, const data::Message* message)
53 {
54  // Sanity checks
55  CHECK(sender != nullptr) << "Sender address missing";
56  CHECK(message != nullptr) << "Message is missing";
57 
58  // Forward to handler for message type
59  switch (message->content_case())
60  {
61  case data::Message::ContentCase::kKvGetRequest:
62  return HandleGetRequest(sender, message);
63 
64  case data::Message::ContentCase::kKvSetRequest:
65  return HandleSetRequest(sender, message);
66 
67  default:
68  return false;
69  }
70 }
71 
72 bool Service::HandleGetRequest(const Node* sender, const data::Message* message)
73 {
74  std::unique_lock<std::mutex> guard(_mutex);
75 
76  // Check if target exists
77  auto target = _targets.find(message->kv_get_request().key());
78  if (target != _targets.end())
79  {
80  // Fetch value and unlock
81  data::Message reply;
82  reply.mutable_kv_get_response()->mutable_value()->CopyFrom(target->second->Get(message->kv_get_request().key()));
83  guard.unlock();
84 
85  // Build the rest of the response and send message
86  reply.mutable_header()->set_reply_to(message->header().identifier());
87  reply.mutable_kv_get_response()->set_key(message->kv_get_request().key());
88  GetEndpoint()->Send(&reply, sender);
89 
90  // Log incoming request
91  LOG(DBUG) << "A GET request for the local path '" << message->kv_get_request().key() << "' from node [" << sender->GetUUID() << "] was handled";
92 
93  // Mark as handled
94  return true;
95  }
96  else
97  {
98  return false;
99  }
100 }
101 
102 bool Service::HandleSetRequest(const Node* sender, const data::Message* message)
103 {
104  std::lock_guard<std::mutex> guard(_mutex);
105 
106  // Check if target exists
107  auto target = _targets.find(message->kv_set_request().key());
108  if (target != _targets.end())
109  {
110  // Set value
111  target->second->Set(message->kv_set_request().key(), message->kv_set_request().value());
112 
113  // Log incoming request
114  LOG(DBUG) << "A SET request for the local path '" << message->kv_set_request().key() << "' from node [" << sender->GetUUID() << "] was handled";
115 
116  // Mark as handled
117  return true;
118  }
119  else
120  {
121  return false;
122  }
123 }
124 
125 void Service::RegisterTarget(const std::string& path, Target* target)
126 {
127  std::lock_guard<std::mutex> guard(_mutex);
128 
129  // Check if path is already registered
130  if (_targets.find(path) == _targets.end())
131  {
132  // Add to map
133  _targets[path] = target;
134  }
135  else
136  {
137  throw Exception("Path already registered");
138  }
139 }
140 
141 void Service::UnregisterTarget(const std::string& path)
142 {
143  std::lock_guard<std::mutex> guard(_mutex);
144 
145  // Remove path
146  _targets.erase(path);
147 }
148 
149 void Service::DescribeService(data::discovery::Response& descriptor)
150 {
151  std::lock_guard<std::mutex> guard(_mutex);
152 
153  // Collect information from targets
154  auto& fields = *descriptor.mutable_keyvalue_schema()->mutable_fields();
155  for (auto target : _targets)
156  {
157  fields[target.first] = target.second->GetFieldDescriptor(target.first);
158  }
159 }
Abstract base class for registered keys.
Definition: Target.h:12
void RegisterTarget(const std::string &path, Target *target)
Register a new Target with the specified path.
virtual void Tag(data::Message *message)=0
Set the message identifier for a message.
void UnregisterTarget(const std::string &path)
Unregister a Target.
Exception class thrown by all library classes.
std::map< std::string, Target * > _targets
Key-target map.
bool HandleSetRequest(const Node *sender, const data::Message *message)
Handle remote Set requests.
virtual void Send(data::Message *message, const Node *node)=0
Send a message to a specific member of the swarm. Call with node set to nullptr to send a message to ...
virtual bool ReceiveMessage(const Node *sender, const data::Message *message) override
Delivery point of all messages.
std::mutex _mutex
Mutex to protect target list.
static ErrorAwaiter Set(Endpoint *endpoint, const Node *node, const std::string &path, const data::Variant &value)
Set a remote value.
An Awaiter that checks whether the operation was a success.
Definition: ErrorAwaiter.h:11
Abstract base class for Endpoint implementations.
Definition: Endpoint.h:25
An Awaiter that returns the requested value.
Definition: ValueAwaiter.h:12
Endpoint * GetEndpoint()
Get the associated Endpoint.
Definition: Mailbox.h:144
bool HandleGetRequest(const Node *sender, const data::Message *message)
Handle remote Get requests.
virtual const std::string & GetUUID() const =0
Returns the unique identifier of the node.
static ValueAwaiter Get(Endpoint *endpoint, const Node *node, const std::string &path)
Get a remote value.
virtual void DescribeService(data::discovery::Response &descriptor) override
Add descriptors for the service to the discovery descriptor.
Represents a Node the Endpoint knows about and can send messages to.


swarmros
Author(s):
autogenerated on Fri Apr 3 2020 03:42:48