3 #include <g3log/g3log.hpp> 13 std::lock_guard<std::recursive_mutex> guard(
_mutex);
18 throw Exception(
"Endpoint is already running");
27 mailbox->MailboxWasConnected();
34 std::lock_guard<std::recursive_mutex> guard(
_mutex);
42 mailbox->MailboxWillBeDisconnected();
50 throw Exception(
"Endpoint has not been started");
56 std::lock_guard<std::recursive_mutex> guard(
_mutex);
70 std::lock_guard<std::recursive_mutex> guard(
_mutex);
84 std::lock_guard<std::recursive_mutex> guard(
_mutex);
93 std::lock_guard<std::recursive_mutex> guard(
_mutex);
98 mailbox->NodeWasDiscovered(node);
104 std::lock_guard<std::recursive_mutex> guard(
_mutex);
109 mailbox->NodeDidJoin(node);
115 std::lock_guard<std::recursive_mutex> guard(
_mutex);
120 mailbox->NodeWillLeave(node);
127 if (message.ParseFromArray(data, (
int)size))
133 LOG(WARNING) <<
"Message received from node [" << sender->GetUUID() <<
"] cannot be parsed";
144 reply.mutable_header()->set_reply_to(message->header().identifier());
145 reply.set_error(error);
146 Send(&reply, sender);
151 LOG(WARNING) <<
"An error has occurred while trying to reply to the message received from node [" << sender->
GetUUID() <<
"]: " << e.
what();
158 if (
message->header().identifier() == 0)
164 std::unique_lock<std::recursive_mutex> guard(
_mutex);
169 if (mailbox->ReceiveMessage(sender,
message))
175 if (
message->header().reliability() == data::Reliability::ACK_REQUESTED)
190 LOG(WARNING) <<
"An error has occurred while processing the message received from node [" << sender->GetUUID() <<
"]: " << e.
what();
193 if (
message->header().reliability() == data::Reliability::ACK_REQUESTED ||
194 message->header().reliability() == data::Reliability::NACK_REQUESTED)
208 if (
message->header().reliability() == data::Reliability::ACK_REQUESTED ||
209 message->header().reliability() == data::Reliability::NACK_REQUESTED)
221 message->mutable_header()->set_identifier(
_counter++);
224 auto timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch());
225 message->mutable_header()->set_timestamp(timestamp.count());
231 if (!message->has_header() || message->header().identifier() == 0)
237 std::vector<char> buffer(message->ByteSizeLong());
238 if (message->SerializeToArray(buffer.data(), (int)buffer.size()))
240 Send((
const void*)buffer.data(), buffer.size(), node);
244 throw Exception(
"Cannot serialize message");
std::set< Mailbox * > _mailboxes
Container for registered mailboxes.
virtual void NodeWillLeave(const Node *node) noexcept
Called when a Node signals that it will leave.
Exception class thrown by all library classes.
virtual void NodeWasDiscovered(const Node *node) noexcept
Called when a new Node has been discovered.
virtual void RegisterMailbox(Mailbox *mailbox) override
Register a Mailbox to receive messages.
std::recursive_mutex _mutex
Mutex used to synchronize access to the list of mailboxes.
virtual void MailboxWillBeDisconnected() noexcept
Called right before the mailbox is disconnected from its endpoint or if the attached endpoint is abou...
virtual void Stop() override
Send a termination signal and wait until the endpoint finished processing messages.
virtual void MailboxWasConnected() noexcept
Called when the mailbox is attached to an already running endpoint or if the attached endpoint has ju...
virtual bool ReceiveMessage(const Node *sender, const data::Message *message) noexcept
Called by implementations to deliver decoded messages.
std::atomic< uint64_t > _counter
Atomic counter for message identifiers.
virtual void Send(const void *data, size_t size, const Node *node)=0
Called by this class to send serialized messages. Called with node set to nullptr to send a message t...
virtual const std::string & GetUUID() const =0
Returns the unique identifier of the node.
virtual void Start() override
Start a background thread and begin processing messages on this endpoint.
virtual void ReplaceMailbox(Mailbox *oldMailbox, Mailbox *newMailbox) override
Relocate a mailbox to another in-memory location.
const char * what() const noexceptoverride
Get the error message.
bool _isRunning
True if the Mailbox has been started.
void ReplyWithError(const Node *sender, const data::Message *message, data::Error error)
Send a reply to a message that only contains a status code.
virtual void NodeDidJoin(const Node *node) noexcept
Called when a new Node has joined the group.
Represents a Node the Endpoint knows about and can send messages to.
virtual void UnregisterMailbox(Mailbox *mailbox) override
Unregister a Mailbox from receiving messages.
Abstract base class for Mailbox implementations.
virtual void Tag(data::Message *message) override
Set the message identifier for a message.