BasicEndpoint.cpp
Go to the documentation of this file.
2 #include <swarmio/Exception.h>
3 #include <g3log/g3log.hpp>
4 #include <memory>
5 #include <chrono>
6 #include <sodium.h>
7 
8 using namespace swarmio;
9 using namespace swarmio::transport;
10 
12 {
13  std::lock_guard<std::recursive_mutex> guard(_mutex);
14 
15  // Check if we are already running
16  if (_isRunning)
17  {
18  throw Exception("Endpoint is already running");
19  }
20  else
21  {
22  // Mark as running
23  _isRunning = true;
24  // Fire callbacks
25  for (auto mailbox : _mailboxes)
26  {
27  mailbox->MailboxWasConnected();
28  }
29  }
30 }
31 
33 {
34  std::lock_guard<std::recursive_mutex> guard(_mutex);
35 
36  // Check if we are already running
37  if (_isRunning)
38  {
39  // Fire callbacks
40  for (auto mailbox : _mailboxes)
41  {
42  mailbox->MailboxWillBeDisconnected();
43  }
44 
45  // Mark as stopped
46  _isRunning = false;
47  }
48  else
49  {
50  throw Exception("Endpoint has not been started");
51  }
52 }
53 
55 {
56  std::lock_guard<std::recursive_mutex> guard(_mutex);
57 
58  // Add mailbox
59  _mailboxes.insert(mailbox);
60 
61  // Fire callback
62  if (_isRunning)
63  {
64  mailbox->MailboxWasConnected();
65  }
66 }
67 
69 {
70  std::lock_guard<std::recursive_mutex> guard(_mutex);
71 
72  // Fire callback
73  if (_isRunning)
74  {
75  mailbox->MailboxWillBeDisconnected();
76  }
77 
78  // Remove mailbox
79  _mailboxes.erase(mailbox);
80 }
81 
82 void BasicEndpoint::ReplaceMailbox(Mailbox *oldMailbox, Mailbox *newMailbox)
83 {
84  std::lock_guard<std::recursive_mutex> guard(_mutex);
85 
86  // Remove the old one, add the new one
87  _mailboxes.erase(oldMailbox);
88  _mailboxes.insert(newMailbox);
89 }
90 
91 void BasicEndpoint::NodeWasDiscovered(const Node *node) noexcept
92 {
93  std::lock_guard<std::recursive_mutex> guard(_mutex);
94 
95  // Fire callbacks
96  for (auto mailbox : _mailboxes)
97  {
98  mailbox->NodeWasDiscovered(node);
99  }
100 }
101 
102 void BasicEndpoint::NodeDidJoin(const Node *node) noexcept
103 {
104  std::lock_guard<std::recursive_mutex> guard(_mutex);
105 
106  // Fire callbacks
107  for (auto mailbox : _mailboxes)
108  {
109  mailbox->NodeDidJoin(node);
110  }
111 }
112 
113 void BasicEndpoint::NodeWillLeave(const Node *node) noexcept
114 {
115  std::lock_guard<std::recursive_mutex> guard(_mutex);
116 
117  // Fire callbacks
118  for (auto mailbox : _mailboxes)
119  {
120  mailbox->NodeWillLeave(node);
121  }
122 }
123 
124 bool BasicEndpoint::ReceiveMessage(const Node *sender, const void *data, size_t size) noexcept
125 {
126  data::Message message;
127  if (message.ParseFromArray(data, (int)size))
128  {
129  return ReceiveMessage(sender, &message);
130  }
131  else
132  {
133  LOG(WARNING) << "Message received from node [" << sender->GetUUID() << "] cannot be parsed";
134  return false;
135  }
136 }
137 
138 void BasicEndpoint::ReplyWithError(const Node *sender, const data::Message *message, data::Error error)
139 {
140  try
141  {
142  // Send reply
143  data::Message reply;
144  reply.mutable_header()->set_reply_to(message->header().identifier());
145  reply.set_error(error);
146  Send(&reply, sender);
147  }
148  catch (const Exception &e)
149  {
150  // Log and ignore error
151  LOG(WARNING) << "An error has occurred while trying to reply to the message received from node [" << sender->GetUUID() << "]: " << e.what();
152  }
153 }
154 
155 bool BasicEndpoint::ReceiveMessage(const Node *sender, const data::Message *message) noexcept
156 {
157  // Drop malformed messages without a valid identifier
158  if (message->header().identifier() == 0)
159  {
160  return false;
161  }
162 
163  // Find mailbox to handle message
164  std::unique_lock<std::recursive_mutex> guard(_mutex);
165  for (auto mailbox : _mailboxes)
166  {
167  try
168  {
169  if (mailbox->ReceiveMessage(sender, message))
170  {
171  // Unlock
172  guard.unlock();
173 
174  // Send ACK
175  if (message->header().reliability() == data::Reliability::ACK_REQUESTED)
176  {
177  ReplyWithError(sender, message, data::Error::NONE);
178  }
179 
180  // Mark as processed
181  return true;
182  }
183  }
184  catch (const Exception &e)
185  {
186  // Unlock
187  guard.unlock();
188 
189  // Log error
190  LOG(WARNING) << "An error has occurred while processing the message received from node [" << sender->GetUUID() << "]: " << e.what();
191 
192  // Send error
193  if (message->header().reliability() == data::Reliability::ACK_REQUESTED ||
194  message->header().reliability() == data::Reliability::NACK_REQUESTED)
195  {
197  }
198 
199  // Stop processing
200  return false;
201  }
202  }
203 
204  // Unlock
205  guard.unlock();
206 
207  // Send error
208  if (message->header().reliability() == data::Reliability::ACK_REQUESTED ||
209  message->header().reliability() == data::Reliability::NACK_REQUESTED)
210  {
211  ReplyWithError(sender, message, data::Error::DELIVERY);
212  }
213 
214  // Mark as unprocessed
215  return false;
216 }
217 
218 void BasicEndpoint::Tag(data::Message *message)
219 {
220  // Set identifier
221  message->mutable_header()->set_identifier(_counter++);
222 
223  // Set timestamp
224  auto timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch());
225  message->mutable_header()->set_timestamp(timestamp.count());
226 }
227 
228 void BasicEndpoint::Send(data::Message *message, const Node *node)
229 {
230  // Set identifier if missing
231  if (!message->has_header() || message->header().identifier() == 0)
232  {
233  Tag(message);
234  }
235 
236  // Serialize
237  std::vector<char> buffer(message->ByteSizeLong());
238  if (message->SerializeToArray(buffer.data(), (int)buffer.size()))
239  {
240  Send((const void*)buffer.data(), buffer.size(), node);
241  }
242  else
243  {
244  throw Exception("Cannot serialize message");
245  }
246 }
std::set< Mailbox * > _mailboxes
Container for registered mailboxes.
Definition: BasicEndpoint.h:28
virtual void NodeWillLeave(const Node *node) noexcept
Called when a Node signals that it will leave.
Exception class thrown by all library classes.
virtual void NodeWasDiscovered(const Node *node) noexcept
Called when a new Node has been discovered.
virtual void RegisterMailbox(Mailbox *mailbox) override
Register a Mailbox to receive messages.
std::recursive_mutex _mutex
Mutex used to synchronize access to the list of mailboxes.
Definition: BasicEndpoint.h:34
virtual void MailboxWillBeDisconnected() noexcept
Called right before the mailbox is disconnected from its endpoint or if the attached endpoint is abou...
Definition: Mailbox.h:137
virtual void Stop() override
Send a termination signal and wait until the endpoint finished processing messages.
virtual void MailboxWasConnected() noexcept
Called when the mailbox is attached to an already running endpoint or if the attached endpoint has ju...
Definition: Mailbox.h:130
virtual bool ReceiveMessage(const Node *sender, const data::Message *message) noexcept
Called by implementations to deliver decoded messages.
std::atomic< uint64_t > _counter
Atomic counter for message identifiers.
Definition: BasicEndpoint.h:40
virtual void Send(const void *data, size_t size, const Node *node)=0
Called by this class to send serialized messages. Called with node set to nullptr to send a message t...
virtual const std::string & GetUUID() const =0
Returns the unique identifier of the node.
virtual void Start() override
Start a background thread and begin processing messages on this endpoint.
virtual void ReplaceMailbox(Mailbox *oldMailbox, Mailbox *newMailbox) override
Relocate a mailbox to another in-memory location.
const char * what() const noexceptoverride
Get the error message.
bool _isRunning
True if the Mailbox has been started.
Definition: BasicEndpoint.h:23
void ReplyWithError(const Node *sender, const data::Message *message, data::Error error)
Send a reply to a message that only contains a status code.
virtual void NodeDidJoin(const Node *node) noexcept
Called when a new Node has joined the group.
Represents a Node the Endpoint knows about and can send messages to.
virtual void UnregisterMailbox(Mailbox *mailbox) override
Unregister a Mailbox from receiving messages.
Abstract base class for Mailbox implementations.
Definition: Mailbox.h:13
virtual void Tag(data::Message *message) override
Set the message identifier for a message.


swarmros
Author(s):
autogenerated on Fri Apr 3 2020 03:42:47