9 #include <libconfig.h++> 15 #define SWARMIO_ZYRE_GROUP_MESSAGES "messages" 16 #define SWARMIO_ZYRE_HEADER_DEVICE_CLASS "X-DeviceClass" 22 _zyre = zyre_new(name);
29 if (deviceClass !=
nullptr)
45 std::map<std::string, std::string> map;
48 ziflist_t *ifaces = ziflist_new();
49 if (ifaces !=
nullptr)
52 for (
const char *current = ziflist_first(ifaces); current !=
nullptr; current = ziflist_next(ifaces))
54 map[current] = ziflist_address(ifaces);
58 ziflist_destroy(&ifaces);
69 throw Exception(
"Node is already running");
73 zyre_set_port(
_zyre, port);
81 throw Exception(
"Node is already running");
85 zyre_set_interface(
_zyre, ifname);
98 throw Exception(
"Node is already running");
100 #ifdef SWARMROS_CONFIG_PATH 105 libconfig::Config config;
106 std::unique_ptr<swarmio::Endpoint> endpoint;
109 config.readFile(configFilePath.c_str());
110 std::string type = (
const char *)config.lookup(
"endpoint.type");
114 auto zyreEndpoint = std::make_unique<swarmio::transport::zyre::ZyreEndpoint>(config.lookup(
"endpoint.name"), config.lookup(
"endpoint.deviceClass"));
117 if (config.exists(
"endpoint.parameters.port"))
119 unsigned port = config.lookup(
"endpoint.parameters.port");
120 if (port <= UINT16_MAX)
122 zyreEndpoint->SetPort((uint16_t)port);
126 LOG(FATAL) <<
"Port is out of range.";
129 if (config.exists(
"endpoint.parameters.ifname"))
132 const char *name = config.lookup(
"endpoint.parameters.ifname");
136 if (map.find(name) != map.end())
138 zyreEndpoint->SetInterface(name);
142 LOG(DBUG) <<
"Available interfaces:";
143 for (
auto &pair : map)
145 LOG(DBUG) <<
" - " << pair.first <<
" (" << pair.second <<
")";
147 LOG(FATAL) <<
"An invalid interface name has been specified.";
150 if (config.exists(
"endpoint.parameters.security"))
154 if (config.exists(
"endpoint.parameters.privateKey"))
156 std::string decoded =
base64_decode(config.lookup(
"endpoint.parameters.privateKey"));
157 memcpy(
my_secretkey, reinterpret_cast<unsigned char *>(const_cast<char *>((decoded.c_str()))), crypto_box_SECRETKEYBYTES);
160 LOG(WARNING) <<
"Unable to convert private key";
164 if (config.exists(
"endpoint.parameters.publicKey"))
166 std::string decoded =
base64_decode(config.lookup(
"endpoint.parameters.publicKey"));
167 memcpy(
my_publickey, reinterpret_cast<unsigned char *>(const_cast<char *>(decoded.c_str())), crypto_box_PUBLICKEYBYTES);
173 if (config.exists(
"endpoint.parameters.signature"))
175 std::string decoded =
base64_decode(config.lookup(
"endpoint.parameters.signature"));
176 memcpy(
signature, reinterpret_cast<unsigned char *>(const_cast<char *>(decoded.c_str())), crypto_sign_SECRETKEYBYTES);
182 if (config.exists(
"endpoint.parameters.ca"))
184 std::string decoded =
base64_decode(config.lookup(
"endpoint.parameters.ca"));
185 memcpy(
server_public, reinterpret_cast<unsigned char *>(const_cast<char *>(decoded.c_str())), crypto_sign_PUBLICKEYBYTES);
195 catch (
const libconfig::SettingTypeException &e)
197 LOG(FATAL) <<
"Invalid type for setting at " << e.getPath() <<
".";
199 catch (
const libconfig::SettingNotFoundException &e)
201 LOG(FATAL) <<
"Missing setting at " << e.getPath() <<
".";
203 catch (
const libconfig::FileIOException &e)
205 LOG(DBUG) << e.what();
206 LOG(DBUG) <<
"An exception has occurred while trying to read the configuration file.";
208 catch (
const libconfig::ParseException &e)
210 LOG(DBUG) << e.getError();
211 LOG(FATAL) <<
"An exception has occurred while trying to parse the configuration file (line " << e.getLine() <<
").";
215 LOG(DBUG) << e.
what();
216 LOG(FATAL) <<
"An exception has occurred while trying to initialize the endpoint.";
222 LOG(WARNING) <<
"Configuring secure mode.";
223 if (sodium_init() < 0)
225 throw Exception(
"Unable to init sodium");
234 std::piecewise_construct,
235 std::forward_as_tuple(
"0"),
236 std::forward_as_tuple(
"0",
"broadcast",
"broadcast",
"0"));
237 unsigned char k[crypto_box_BEFORENMBYTES];
239 throw Exception(
"Cannot create broadcast keys");
243 unsigned char n[crypto_box_NONCEBYTES];
244 randombytes_buf(n, crypto_box_NONCEBYTES);
246 _nodes.at(
"0").SetVerified();
251 throw Exception(
"Unable to verify own certificate");
259 throw Exception(
"Unable to convert public key");
265 if (zyre_start(
_zyre) != 0)
292 zsock_send(client,
"p",
"SHUTDOWN");
295 if (zsock_wait(client) != 0)
310 if (node !=
nullptr && zyreNode ==
nullptr)
312 throw Exception(
"Invalid target node type");
319 size_t c_len = crypto_box_MACBYTES + (size_t)size;
320 unsigned char *ciphertext =
new unsigned char[c_len + crypto_box_NONCEBYTES];
326 if (crypto_box_easy_afternm(ciphertext, (
const unsigned char *)data, (
size_t)size, zyreNode->
GetCtr(), zyreNode->
GetKey()) != 0)
328 throw Exception(
"Could not encrypt message");
330 memcpy(&ciphertext[c_len], zyreNode->
GetCtr(), crypto_box_NONCEBYTES);
331 zmsg_addmem(message, ciphertext, c_len + crypto_box_NONCEBYTES);
340 zmsg_addmem(message, data, (
size_t)size);
347 if (zsock_send(client,
"ppp",
"SEND", zyreNode, message) != 0)
349 throw Exception(
"Cannot send control message");
351 if (zsock_wait(client) != 0)
368 zyre_destroy(&
_zyre);
375 moodycamel::BlockingReaderWriterQueue<zyre_event_t *> deliveryQueue;
382 zsock_t *zyreSocket = zyre_socket(
_zyre);
384 zpoller_t *poller = zpoller_new(controlSocket, zyreSocket,
nullptr);
390 void *
result = zpoller_wait(poller, -1);
393 if (result == zyreSocket)
396 zyre_event_t *
event = zyre_event_new(
_zyre);
400 deliveryQueue.enqueue(event);
403 else if (result == controlSocket)
409 if (zsock_recv(
_control,
"ppp", &command, &target, &message) == 0)
411 if (strcmp(command,
"SHUTDOWN") == 0)
416 else if (strcmp(command,
"SEND") == 0 && message !=
nullptr)
419 if (target ==
nullptr)
431 else if (command == NULL)
441 deliveryQueue.enqueue(
nullptr);
442 deliveryThread.join();
456 zyre_event_t *
event =
nullptr;
460 queue->wait_dequeue(event);
463 if (event ==
nullptr)
466 std::shared_lock<std::shared_timed_mutex> guard(
_mutex);
467 for (
auto &element :
_nodes)
469 if (element.second.IsOnline())
471 element.second.SetOnline(
false);
481 const char *uuid = zyre_event_peer_uuid(event);
485 const char *type = zyre_event_type(event);
486 if (strcmp(type,
"ENTER") == 0)
489 std::unique_lock<std::shared_timed_mutex> guard(
_mutex);
497 if (deviceClass ==
nullptr)
499 deviceClass =
"unknown";
506 std::piecewise_construct,
507 std::forward_as_tuple(uuid),
508 std::forward_as_tuple(uuid, zyre_event_peer_name(event), deviceClass, zyre_event_peer_addr(event)));
523 std::shared_lock<std::shared_timed_mutex> guard(
_mutex);
535 if (!
result->second.IsOnline())
537 result->second.SetOnline(
true);
542 LOG(WARNING) <<
"Sending certificate to join";
551 if (
result->second.IsOnline())
553 result->second.SetOnline(
false);
557 else if (strcmp(type,
"EXIT") == 0)
560 if (
result->second.IsOnline())
562 result->second.SetOnline(
false);
566 else if (strcmp(type,
"SHOUT") == 0 ||
567 strcmp(type,
"WHISPER") == 0)
570 zmsg_t *raw = zyre_event_msg(event);
574 while (zmsg_size(raw) > 0)
577 zframe_t *frame = zmsg_pop(raw);
581 if (crypto_sign_verify_detached(&zframe_data(frame)[crypto_sign_PUBLICKEYBYTES], zframe_data(frame), crypto_sign_PUBLICKEYBYTES,
server_public) != 0)
583 LOG(WARNING) <<
"Unable to verify joining node certificate";
588 unsigned char k[crypto_box_BEFORENMBYTES];
589 unsigned char p2p_publickey[crypto_box_PUBLICKEYBYTES];
590 if (crypto_sign_ed25519_pk_to_curve25519(p2p_publickey, zframe_data(frame)) != 0)
592 LOG(WARNING) <<
"Unable to convert public key";
594 if (crypto_box_beforenm(k, p2p_publickey,
my_secretkey) != 0)
596 LOG(WARNING) <<
"Cannot create shared p2p keys";
602 LOG(WARNING) <<
"Adding joining node certificate";
605 LOG(WARNING) <<
"Sending back own certificate";
615 size_t d_size = zframe_size(frame) - crypto_box_MACBYTES - crypto_box_NONCEBYTES;
616 unsigned char *decrypted =
new unsigned char[d_size];
617 unsigned char *nonce = &zframe_data(frame)[zframe_size(frame) - crypto_box_NONCEBYTES];
620 if (memcmp(nonce, zyreNode->
GetCtr(), crypto_box_NONCEBYTES) < 0)
622 LOG(WARNING) <<
"Nonce was smaller than expected";
627 if (strcmp(type,
"SHOUT") == 0)
631 LOG(WARNING) <<
"Bcast Message cannot be decrypted";
635 LOG(WARNING) <<
"Bcast Message successfully decrypted";
642 if (crypto_box_open_easy_afternm(decrypted, zframe_data(frame), d_size + crypto_box_MACBYTES, nonce, zyreNode->
GetKey()) != 0)
644 LOG(WARNING) <<
"Message cannot be decrypted";
648 LOG(WARNING) <<
"Message successfully decrypted";
661 zframe_destroy(&frame);
670 zyre_event_destroy(&event);
677 std::shared_lock<std::shared_timed_mutex> guard(
_mutex);
680 std::list<const ZyreNode *> nodes;
681 transform(
_nodes.begin(),
_nodes.end(), back_inserter(nodes), [](
const std::map<std::string, ZyreNode>::value_type &value) {
return &value.second; });
690 std::shared_lock<std::shared_timed_mutex> guard(
_mutex);
693 auto it =
_nodes.find(uuid);
void SetPort(uint16_t port)
Set the port used by the endpoint.
unsigned char certificate[crypto_box_PUBLICKEYBYTES+crypto_sign_SECRETKEYBYTES]
#define SWARMIO_ZYRE_GROUP_MESSAGES
A Node as discovered by the Zyre protocol.
virtual void Stop() override
Send a termination signal and wait until the endpoint finished processing messages.
virtual void Send(const void *data, size_t size, const Node *node) override
Called by BasicEndpoint to send serialized messages. Called with node set to nullptr to send a messag...
void SetConfig(std::string cfg)
Set the config file path.
unsigned char bcast_publickey[crypto_box_PUBLICKEYBYTES]
std::thread * _worker
Worker thread.
void IncrementCtr() const
Increment the nonce counter the node.
zsock_t * GetSocket()
Get the pointer to the socket.
virtual void NodeWillLeave(const Node *node) noexcept
Called when a Node signals that it will leave.
Exception class thrown by all library classes.
virtual void NodeWasDiscovered(const Node *node) noexcept
Called when a new Node has been discovered.
#define SWARMIO_ZYRE_HEADER_DEVICE_CLASS
void SetCtr(unsigned char *c) const
Set the nonce (counter)
void Deliver(moodycamel::BlockingReaderWriterQueue< zyre_event_t * > *queue)
Entry point for the delivery thread.
ZyreControlSocket _control
Control pipe to shut down event processing.
virtual void Stop() override
Send a termination signal and wait until the endpoint finished processing messages.
std::string configFilePath
void Process()
Entry point for the worker thread.
virtual bool ReceiveMessage(const Node *sender, const data::Message *message) noexcept
Called by implementations to deliver decoded messages.
ROSLIB_DECL std::string command(const std::string &cmd)
std::map< std::string, ZyreNode > _nodes
Node registry.
std::shared_timed_mutex _mutex
Mutex protecting the Nodes registry.
void SetInterface(const char *ifname)
Set the network interface to bind to.
zyre_t * _zyre
Reference to the Zyre structure.
unsigned char my_secretkey[crypto_box_SECRETKEYBYTES]
unsigned char my_publickey[crypto_box_PUBLICKEYBYTES]
Secret key.
std::string _uuid
Local UUID.
unsigned char bcast_secretkey[crypto_box_SECRETKEYBYTES]
virtual void Start() override
Start a background thread and begin processing messages on this endpoint.
const unsigned char * GetKey() const
Get the shared cryptographic key of the node.
virtual ~ZyreEndpoint() override
Destroy the ZyreEndpoint object.
An inproc socket bound to a special name generated from an object pointer.
virtual const std::string & GetUUID() const override
Get the unique identifier of the node.
const char * what() const noexceptoverride
Get the error message.
ZyreEndpoint(const char *name, const char *deviceClass)
Construct a new ZyreEndpoint object.
virtual void Start() override
Start the background thread, announce the Zyre node and start processing messages.
unsigned char signature[crypto_sign_SECRETKEYBYTES]
virtual void NodeDidJoin(const Node *node) noexcept
Called when a new Node has joined the group.
Represents a Node the Endpoint knows about and can send messages to.
virtual const Node * NodeForUUID(const std::string &uuid) override
Retreive a node by its UUID.
std::string base64_decode(std::string const &s)
bool security_enabled
Security enabled bit (if true, communication is secure)
std::list< const ZyreNode * > GetNodes()
Get a list of known Nodes.
unsigned char server_public[crypto_sign_PUBLICKEYBYTES]
const unsigned char * GetCtr() const
Get the nonce counter of the node.
static std::map< std::string, std::string > GetInterfaceMap()
Get a map of interface names and addresses.