10 #include <mrpt/core/exceptions.h>    11 #include <mrpt/version.h>    16 #if MRPT_VERSION >= 0x204    17 #include <mrpt/system/thread_name.h>    22 #include <shared_mutex>    24 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)    26 #include <google/protobuf/text_format.h>    30 #include "AdvertiseServiceRequest.pb.h"    31 #include "AdvertiseTopicRequest.pb.h"    32 #include "CallService.pb.h"    33 #include "GenericAnswer.pb.h"    34 #include "GetServiceInfoAnswer.pb.h"    35 #include "GetServiceInfoRequest.pb.h"    36 #include "ListNodesAnswer.pb.h"    37 #include "ListNodesRequest.pb.h"    38 #include "ListTopicsAnswer.pb.h"    39 #include "ListTopicsRequest.pb.h"    40 #include "RegisterNodeAnswer.pb.h"    41 #include "RegisterNodeRequest.pb.h"    42 #include "SubscribeAnswer.pb.h"    43 #include "SubscribeRequest.pb.h"    44 #include "UnregisterNodeRequest.pb.h"    48 using namespace mvsim;
    50 #if defined(MVSIM_HAS_ZMQ)    53 struct InfoPerAdvertisedTopic
    55         InfoPerAdvertisedTopic(zmq::context_t& 
c) : context(c) {}
    57         zmq::context_t& context;
    59         std::string topicName;
    60         zmq::socket_t pubSocket = zmq::socket_t(context, ZMQ_PUB);
    62         const google::protobuf::Descriptor* descriptor = 
nullptr;
    67         InfoPerService() = 
default;
    69         std::string serviceName;
    70         const google::protobuf::Descriptor* descInput = 
nullptr;
    71         const google::protobuf::Descriptor* descOutput = 
nullptr;
    74 struct InfoPerSubscribedTopic
    76         InfoPerSubscribedTopic(zmq::context_t& 
c) : context(c) {}
    77         ~InfoPerSubscribedTopic()
    79                 if (topicThread.joinable()) topicThread.join();
    82         zmq::context_t& context;
    84         std::string topicName;
    85         zmq::socket_t subSocket = zmq::socket_t(context, ZMQ_SUB);
    86         const google::protobuf::Descriptor* descriptor = 
nullptr;
    88         std::vector<Client::topic_callback_t> callbacks;
    90         std::thread topicThread;
    97 #if defined(MVSIM_HAS_ZMQ)    98         zmq::context_t context{1, ZMQ_MAX_SOCKETS_DFLT};
    99         std::optional<zmq::socket_t> mainReqSocket;
   100         mvsim::SocketMonitor mainReqSocketMonitor;
   102         std::map<std::string, internal::InfoPerAdvertisedTopic> advertisedTopics;
   103         std::shared_mutex advertisedTopics_mtx;
   105         std::optional<zmq::socket_t> srvListenSocket;
   106         std::map<std::string, internal::InfoPerService> offeredServices;
   107         std::shared_mutex offeredServices_mtx;
   109         std::map<std::string, internal::InfoPerSubscribedTopic> subscribedTopics;
   110         std::shared_mutex subscribedTopics_mtx;
   112         std::optional<zmq::socket_t> topicNotificationsSocket;
   113         std::string topicNotificationsEndPoint;
   119         : 
mrpt::system::COutputLogger(
"mvsim::Client"),
   131 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)   132         return zmq_->mainReqSocketMonitor.connected();
   140         using namespace std::string_literals;
   142                 !
zmq_->mainReqSocket || !
zmq_->mainReqSocket->connected(),
   143                 "Client is already running.");
   145 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)   147         zmq_->mainReqSocket.emplace(
zmq_->context, ZMQ_REQ);
   150         zmq_->mainReqSocketMonitor.monitor(
zmq_->mainReqSocket.value());
   152         zmq_->mainReqSocket->connect(
   160         zmq_->srvListenSocket.emplace(
zmq_->context, ZMQ_REP);
   161         zmq_->srvListenSocket->bind(
"tcp://0.0.0.0:*"s);
   163         if (!
zmq_->srvListenSocket->connected())
   168                 "Client service thread is already running!");
   172 #if MRPT_VERSION >= 0x204   173         mrpt::system::thread_name(
"services_"s + 
nodeName_, serviceInvokerThread_);
   177         zmq_->topicNotificationsSocket.emplace(
zmq_->context, ZMQ_PAIR);
   178         zmq_->topicNotificationsSocket->bind(
"tcp://0.0.0.0:*"s);
   180         if (!
zmq_->topicNotificationsSocket->connected())
   183         zmq_->topicNotificationsEndPoint =
   184                 get_zmq_endpoint(*
zmq_->topicNotificationsSocket);
   188                 "Client topic updates thread is already running!");
   192 #if MRPT_VERSION >= 0x204   193         mrpt::system::thread_name(
   194                 "topicUpdates_"s + 
nodeName_, topicUpdatesThread_);
   199                 "MVSIM needs building with ZMQ and PROTOBUF to enable "   206 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)   208         if (!
zmq_->mainReqSocket->connected()) 
return;
   215         catch (
const std::exception& e)
   218                         "shutdown: Exception: " << mrpt::exception_to_str(e));
   221 #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 4, 0)   222         zmq_->context.shutdown();
   225         zmq_ctx_shutdown(
zmq_->context.operator 
void*());
   230         zmq_->subscribedTopics.clear();
   231         zmq_->offeredServices.clear();
   238 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)   239         auto& 
s = *
zmq_->mainReqSocket;
   241         mvsim_msgs::RegisterNodeRequest rnq;
   243         mvsim::sendMessage(rnq, 
s);
   246         const zmq::message_t reply = mvsim::receiveMessage(
s);
   248         mvsim_msgs::RegisterNodeAnswer rna;
   249         mvsim::parseMessage(reply, rna);
   253                         "Server did not allow registering node: %s",
   254                         rna.errormessage().c_str());
   264 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)   265         auto& 
s = *
zmq_->mainReqSocket;
   267         mvsim_msgs::UnregisterNodeRequest rnq;
   269         mvsim::sendMessage(rnq, 
s);
   272         const zmq::message_t reply = mvsim::receiveMessage(
s);
   274         mvsim_msgs::GenericAnswer rna;
   275         mvsim::parseMessage(reply, rna);
   279                         "Server answered an error unregistering node: %s",
   280                         rna.errormessage().c_str());
   290 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)   291         auto& 
s = *
zmq_->mainReqSocket;
   293         mvsim_msgs::ListNodesRequest req;
   294         mvsim::sendMessage(req, 
s);
   297         const zmq::message_t reply = mvsim::receiveMessage(
s);
   299         mvsim_msgs::ListNodesAnswer lna;
   300         mvsim::parseMessage(reply, lna);
   302         std::vector<Client::InfoPerNode> nodes;
   303         nodes.resize(lna.nodes_size());
   305         for (
int i = 0; i < lna.nodes_size(); i++)
   307                 nodes[i].name = lna.nodes(i);
   317 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)   318         auto& 
s = *
zmq_->mainReqSocket;
   320         mvsim_msgs::ListTopicsRequest req;
   321         mvsim::sendMessage(req, 
s);
   324         const zmq::message_t reply = mvsim::receiveMessage(
s);
   326         mvsim_msgs::ListTopicsAnswer lta;
   327         mvsim::parseMessage(reply, lta);
   329         std::vector<Client::InfoPerTopic> topics;
   330         topics.resize(lta.topics_size());
   332         for (
int i = 0; i < lta.topics_size(); i++)
   334                 const auto& 
t = lta.topics(i);
   335                 auto& 
dst = topics[i];
   337                 dst.name = 
t.topicname();
   338                 dst.type = 
t.topictype();
   341                 dst.endpoints.resize(
t.publisherendpoint_size());
   342                 dst.publishers.resize(
t.publisherendpoint_size());
   344                 for (
int k = 0; k < 
t.publisherendpoint_size(); k++)
   346                         dst.publishers[k] = 
t.publishername(k);
   347                         dst.endpoints[k] = 
t.publisherendpoint(k);
   357         const std::string& topicName,
   358         const google::protobuf::Descriptor* descriptor)
   360 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)   362         auto& advTopics = 
zmq_->advertisedTopics;
   364         std::unique_lock<std::shared_mutex> lck(
zmq_->advertisedTopics_mtx);
   366         if (advTopics.find(topicName) != advTopics.end())
   368                         "Topic `%s` already registered for publication in this same "   374         internal::InfoPerAdvertisedTopic& ipat =
   375                 advTopics.emplace_hint(advTopics.begin(), topicName, 
zmq_->context)
   381         ipat.pubSocket.bind(
"tcp://0.0.0.0:*");
   382         if (!ipat.pubSocket.connected())
   386         ipat.endpoint = get_zmq_endpoint(ipat.pubSocket);
   387         ipat.topicName = topicName;  
   388         ipat.descriptor = descriptor;
   391                 "Advertising topic `%s` [%s] on endpoint `%s`", topicName.c_str(),
   392                 descriptor->full_name().c_str(), ipat.endpoint.c_str());
   396         mvsim_msgs::AdvertiseTopicRequest req;
   397         req.set_topicname(ipat.topicName);
   398         req.set_endpoint(ipat.endpoint);
   399         req.set_topictypename(ipat.descriptor->full_name());
   402         mvsim::sendMessage(req, *
zmq_->mainReqSocket);
   405         const zmq::message_t reply = mvsim::receiveMessage(*
zmq_->mainReqSocket);
   406         mvsim_msgs::GenericAnswer ans;
   407         mvsim::parseMessage(reply, ans);
   411                         "Error registering topic `%s` in server: `%s`", topicName.c_str(),
   412                         ans.errormessage().c_str());
   420         const std::string& serviceName, 
const google::protobuf::Descriptor* descIn,
   423 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)   425         std::unique_lock<std::shared_mutex> lck(
zmq_->offeredServices_mtx);
   427         auto& services = 
zmq_->offeredServices;
   429         if (services.find(serviceName) != services.end())
   431                         "Service `%s` already registered in this same client!",
   432                         serviceName.c_str());
   434         internal::InfoPerService& ips = services[serviceName];
   439         char assignedPort[100];
   440         size_t assignedPortLen = 
sizeof(assignedPort);
   441         zmq_->srvListenSocket->getsockopt(
   442                 ZMQ_LAST_ENDPOINT, assignedPort, &assignedPortLen);
   443         assignedPort[assignedPortLen] = 
'\0';
   445         ips.serviceName = serviceName;  
   446         ips.callback = callback;
   447         ips.descInput = descIn;
   448         ips.descOutput = descOut;
   451                 "Advertising service `%s` [%s->%s] on endpoint `%s`",
   452                 serviceName.c_str(), descIn->full_name().c_str(),
   453                 descOut->full_name().c_str(), assignedPort);
   455         mvsim_msgs::AdvertiseServiceRequest req;
   456         req.set_servicename(ips.serviceName);
   457         req.set_endpoint(assignedPort);
   458         req.set_inputtypename(ips.descInput->full_name());
   459         req.set_outputtypename(ips.descOutput->full_name());
   462         mvsim::sendMessage(req, *
zmq_->mainReqSocket);
   465         const zmq::message_t reply = mvsim::receiveMessage(*
zmq_->mainReqSocket);
   466         mvsim_msgs::GenericAnswer ans;
   467         mvsim::parseMessage(reply, ans);
   471                         "Error registering service `%s` in server: `%s`",
   472                         serviceName.c_str(), ans.errormessage().c_str());
   480         const std::string& topicName, 
const google::protobuf::Message& msg)
   483 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)   485                 zmq_ && 
zmq_->mainReqSocket && 
zmq_->mainReqSocket->connected(),
   486                 "Client not connected to Server");
   488         std::shared_lock<std::shared_mutex> lck(
zmq_->advertisedTopics_mtx);
   489         auto itIpat = 
zmq_->advertisedTopics.find(topicName);
   492                 itIpat != 
zmq_->advertisedTopics.end(),
   494                         "Topic `%s` cannot been registered. Missing former call to "   500         auto& ipat = itIpat->second;
   503                 msg.GetDescriptor() == ipat.descriptor,
   505                         "Topic `%s` has type `%s`, but expected `%s` from former call to "   507                         topicName.c_str(), msg.GetDescriptor()->name().c_str(),
   508                         ipat.descriptor->name().c_str()));
   510         ASSERT_(ipat.pubSocket.connected());
   512         mvsim::sendMessage(msg, ipat.pubSocket);
   516                 "Published on topic `%s`: %s", topicName.c_str(),
   517                 msg.DebugString().c_str());
   528         using namespace std::string_literals;
   530 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)   534                         "[" << 
nodeName_ << 
"] Client service thread started.");
   536                 zmq::socket_t& 
s = *
zmq_->srvListenSocket;
   541                         zmq::message_t 
m = mvsim::receiveMessage(s);
   544                         mvsim_msgs::CallService csMsg;
   545                         mvsim::parseMessage(m, csMsg);
   547                         std::shared_lock<std::shared_mutex> lck(
zmq_->offeredServices_mtx);
   548                         const auto& srvName = csMsg.servicename();
   550                         auto itSrv = 
zmq_->offeredServices.find(srvName);
   551                         if (itSrv == 
zmq_->offeredServices.end())
   554                                 mvsim_msgs::GenericAnswer ans;
   555                                 ans.set_success(
false);
   557                                         "Requested unknown service `%s`", srvName.c_str()));
   560                                 mvsim::sendMessage(ans, s);
   564                         internal::InfoPerService& ips = itSrv->second;
   568                         auto outMsgPtr = ips.callback(csMsg.serializedinput());
   571                         mvsim::sendMessage(*outMsgPtr, s);
   574         catch (
const zmq::error_t& e)
   576                 if (e.num() == ETERM)
   581                                 "internalServiceServingThread about to exit for ZMQ term "   587                                 "internalServiceServingThread: ZMQ error: " << e.what());
   590         catch (
const std::exception& e)
   593                         "internalServiceServingThread: Exception: "   594                         << mrpt::exception_to_str(e));
   603         using namespace std::string_literals;
   605 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)   609                         "[" << 
nodeName_ << 
"] Client topic updates thread started.");
   611                 zmq::socket_t& 
s = *
zmq_->topicNotificationsSocket;
   616                         zmq::message_t 
m = mvsim::receiveMessage(s);
   619                         mvsim_msgs::TopicInfo tiMsg;
   620                         mvsim::parseMessage(m, tiMsg);
   623                         mvsim_msgs::GenericAnswer ans;
   624                         ans.set_success(
true);
   625                         mvsim::sendMessage(ans, s);
   633                         std::shared_lock<std::shared_mutex> lck(
zmq_->subscribedTopics_mtx);
   634                         const auto& topicName = tiMsg.topicname();
   636                         auto itTopic = 
zmq_->subscribedTopics.find(topicName);
   637                         if (itTopic == 
zmq_->subscribedTopics.end())
   643                                         << 
"` update message from server, but this node is not "   644                                            "subscribed to it (!).");
   649                                 "[internalTopicUpdatesThread] Received: "   650                                 << tiMsg.DebugString());
   652                         internal::InfoPerSubscribedTopic& ipt = itTopic->second;
   654                         for (
int i = 0; i < tiMsg.publisherendpoint_size(); i++)
   656                                 ipt.subSocket.connect(tiMsg.publisherendpoint(i));
   662         catch (
const zmq::error_t& e)
   664                 if (e.num() == ETERM)
   669                                 "internalTopicUpdatesThread about to exit for ZMQ term "   675                                 "internalTopicUpdatesThread: ZMQ error: " << e.what());
   678         catch (
const std::exception& e)
   681                         "internalTopicUpdatesThread: Exception: "   682                         << mrpt::exception_to_str(e));
   685                 "[" << 
nodeName_ << 
"] Client topic updates thread quitted.");
   692         const std::string& serviceName, 
const std::string& inputSerializedMsg)
   695 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)   697         std::string outMsgData, outMsgType;
   699                 serviceName, inputSerializedMsg, std::nullopt, outMsgData, outMsgType);
   706         const std::string& serviceName, 
const std::string& inputSerializedMsg,
   707         mrpt::optional_ref<google::protobuf::Message> outputMsg,
   708         mrpt::optional_ref<std::string> outputSerializedMsg,
   709         mrpt::optional_ref<std::string> outputMsgTypeName)
   712 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)   716         std::string srvEndpoint;
   718                 zmq::socket_t& 
s = *
zmq_->mainReqSocket;
   720                 mvsim_msgs::GetServiceInfoRequest gsi;
   721                 gsi.set_servicename(serviceName);
   722                 mvsim::sendMessage(gsi, s);
   724                 auto m = mvsim::receiveMessage(s);
   725                 mvsim_msgs::GetServiceInfoAnswer gsia;
   726                 mvsim::parseMessage(
m, gsia);
   730                                 "Error requesting information about service `%s`: %s",
   731                                 serviceName.c_str(), gsia.errormessage().c_str());
   733                 srvEndpoint = gsia.serviceendpoint();
   737         zmq::socket_t srvReqSock(
zmq_->context, ZMQ_REQ);
   738         srvReqSock.connect(srvEndpoint);
   740         mvsim_msgs::CallService csMsg;
   741         csMsg.set_servicename(serviceName);
   742         csMsg.set_serializedinput(inputSerializedMsg);
   744         mvsim::sendMessage(csMsg, srvReqSock);
   746         const auto m = mvsim::receiveMessage(srvReqSock);
   749                 mvsim::parseMessage(
m, outputMsg.value().get());
   751         if (outputSerializedMsg)
   753                 const auto [typeName, serializedData] =
   754                         internal::parseMessageToParts(
m);
   756                 outputSerializedMsg.value().get() = serializedData;
   757                 if (outputMsgTypeName) outputMsgTypeName.value().get() = typeName;
   764         const std::string& topicName,
   765         const google::protobuf::Descriptor* descriptor,
   769 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)   771         std::unique_lock<std::shared_mutex> lck(
zmq_->subscribedTopics_mtx);
   773         auto& topics = 
zmq_->subscribedTopics;
   776         internal::InfoPerSubscribedTopic& ipt =
   777                 topics.emplace_hint(topics.begin(), topicName, 
zmq_->context)->second;
   780         ipt.subSocket.setsockopt(ZMQ_SUBSCRIBE, 
"", 0);
   782         ipt.callbacks.push_back(callback);
   784         ipt.topicName = topicName;
   792         mvsim_msgs::SubscribeRequest subReq;
   793         subReq.set_topic(topicName);
   794         subReq.set_updatesendpoint(
zmq_->topicNotificationsEndPoint);
   796         mvsim::sendMessage(subReq, *
zmq_->mainReqSocket);
   798         const auto m = mvsim::receiveMessage(*
zmq_->mainReqSocket);
   799         mvsim_msgs::SubscribeAnswer subAns;
   800         mvsim::parseMessage(
m, subAns);
   814         using namespace std::string_literals;
   816 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)   820                         "[" << 
nodeName_ << 
"] Client topic subscribe thread for `"   821                                 << ipt.topicName << 
"` started.");
   823                 zmq::socket_t& 
s = ipt.subSocket;
   828                         const zmq::message_t 
m = mvsim::receiveMessage(s);
   833                                 for (
auto callback : ipt.callbacks) callback(m);
   835                         catch (
const std::exception& e)
   838                                         "Exception in topic `"   840                                         << 
"` subscription callback:" << mrpt::exception_to_str(e));
   844         catch (
const zmq::error_t& e)
   846                 if (e.num() == ETERM)
   852                                         << 
"] Client topic subscribe thread about to exit for ZMQ "   858                                 "internalTopicSubscribeThread: ZMQ error: " << e.what());
   861         catch (
const std::exception& e)
   864                         "internalTopicSubscribeThread: Exception: "   865                         << mrpt::exception_to_str(e));
   868                 "[" << 
nodeName_ << 
"] Client topic subscribe thread quitted.");
 #define ASSERT_EQUAL_(__A, __B)
void internalTopicSubscribeThread(internal::InfoPerSubscribedTopic &ipt)
void internalTopicUpdatesThread()
std::thread topicUpdatesThread_
void setName(const std::string &nodeName)
void doAdvertiseService(const std::string &serviceName, const google::protobuf::Descriptor *descIn, const google::protobuf::Descriptor *descOut, service_callback_t callback)
std::unique_ptr< ZMQImpl > zmq_
#define THROW_EXCEPTION(msg)
#define THROW_EXCEPTION_FMT(_FORMAT_STRING,...)
std::function< void(const zmq::message_t &)> topic_callback_t
#define MRPT_LOG_WARN_STREAM(__CONTENTS)
std::vector< InfoPerNode > requestListOfNodes()
void publishTopic(const std::string &topicName, const google::protobuf::Message &msg)
#define MRPT_LOG_INFO_STREAM(__CONTENTS)
#define MRPT_LOG_DEBUG(_STRING)
#define MRPT_LOG_DEBUG_STREAM(__CONTENTS)
#define MRPT_LOG_DEBUG_FMT(_FMT_STRING,...)
std::string serverHostAddress_
void callService(const std::string &serviceName, const INPUT_MSG_T &input, OUTPUT_MSG_T &output)
std::string BASE_IMPEXP format(const char *fmt,...) MRPT_printf_format_check(1
std::vector< InfoPerTopic > requestListOfTopics()
void doAdvertiseTopic(const std::string &topicName, const google::protobuf::Descriptor *descriptor)
void internalServiceServingThread()
constexpr unsigned int MVSIM_PORTNO_MAIN_REP
std::thread serviceInvokerThread_
void doCallService(const std::string &serviceName, const std::string &inputSerializedMsg, mrpt::optional_ref< google::protobuf::Message > outputMsg, mrpt::optional_ref< std::string > outputSerializedMsg=std::nullopt, mrpt::optional_ref< std::string > outputMsgTypeName=std::nullopt)
#define MRPT_LOG_ERROR_STREAM(__CONTENTS)
void doUnregisterClient()
#define ASSERTMSG_(f, __ERROR_MSG)
void doSubscribeTopic(const std::string &topicName, const google::protobuf::Descriptor *descriptor, const topic_callback_t &callback)
std::function< std::shared_ptr< google::protobuf::Message >(const std::string &)> service_callback_t