10 #include <mrpt/core/exceptions.h>    11 #include <mrpt/version.h>    14 #if MRPT_VERSION >= 0x204    15 #include <mrpt/system/thread_name.h>    18 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)    21 #include "GenericAnswer.pb.h"    22 #include "GetServiceInfoAnswer.pb.h"    23 #include "ListNodesAnswer.pb.h"    24 #include "ListNodesRequest.pb.h"    25 #include "ListTopicsAnswer.pb.h"    26 #include "ListTopicsRequest.pb.h"    27 #include "RegisterNodeAnswer.pb.h"    28 #include "RegisterNodeRequest.pb.h"    29 #include "SubscribeAnswer.pb.h"    30 #include "SubscribeRequest.pb.h"    31 #include "UnregisterNodeRequest.pb.h"    35 using namespace mvsim;
    45 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)    49 #if MRPT_VERSION >= 0x204    50         mrpt::system::thread_name(
"serverMain", 
mainThread_);
    55                 "MVSIM needs building with ZMQ and PROTOBUF to enable client/server");
    70         catch (
const std::exception& e)
    73                         "shutdown: Exception: " << mrpt::exception_to_str(e));
    79         using namespace std::string_literals;
    81 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)    86                 zmq::context_t context(1);
    89                 zmq::socket_t mainRepSocket(context, ZMQ_REP);
    94                         zmq::message_t request;
    97 #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 1)    98                         std::optional<size_t> reqSize = mainRepSocket.recv(request);
   101                         mainRepSocket.recv(&request);
   105                         using client_requests_t = std::variant<
   106                                 mvsim_msgs::RegisterNodeRequest,
   107                                 mvsim_msgs::UnregisterNodeRequest, mvsim_msgs::SubscribeRequest,
   108                                 mvsim_msgs::ListNodesRequest, mvsim_msgs::ListTopicsRequest,
   109                                 mvsim_msgs::AdvertiseTopicRequest,
   110                                 mvsim_msgs::AdvertiseServiceRequest,
   111                                 mvsim_msgs::GetServiceInfoRequest>;
   116                                 client_requests_t req =
   117                                         mvsim::parseMessageVariant<client_requests_t>(request);
   121                                                 [&](
const auto& 
m) { this->handle(
m, mainRepSocket); },
   125                         catch (
const UnexpectedMessageException& e)
   131         catch (
const zmq::error_t& e)
   133                 if (e.num() == ETERM)
   138                                 "Server thread about to exit for ZMQ term signal.");
   143                                 "internalServerThread: ZMQ error: " << e.what());
   146         catch (
const std::exception& e)
   149                         "internalServerThread: Exception: " << mrpt::exception_to_str(e));
   159 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)   163 #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 4, 0)   167                 zmq_ctx_shutdown(ctx->operator 
void*());
   180         for (
const std::string& topic : itNode->second.advertisedTopics)
   202         const std::string& topicName, 
const std::string& topicTypeName,
   203         const std::string& publisherEndpoint, 
const std::string& nodeName)
   210         if (!dbTopic.topicTypeName.empty() &&
   211                 dbTopic.topicTypeName != topicTypeName)
   214                         "Trying to register topic `%s` [%s] but already known with type "   216                         topicName.c_str(), topicTypeName.c_str(),
   217                         dbTopic.topicTypeName.c_str()));
   219         dbTopic.topicName = topicName;
   220         dbTopic.topicTypeName = topicTypeName;
   222         dbTopic.publishers.try_emplace(
   223                 nodeName, topicName, nodeName, publisherEndpoint);
   227 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)   233         const std::string& topicName, 
const std::string& updatesEndPoint)
   239         dbTopic.subscribers.try_emplace(
   240                 updatesEndPoint, topicName, updatesEndPoint);
   243 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)   244         mvsim_msgs::TopicInfo tiMsg;
   245         tiMsg.set_topicname(topicName);
   246         tiMsg.set_topictype(dbTopic.topicTypeName);
   248         for (
const auto& pub : dbTopic.publishers)
   250                 tiMsg.add_publishername(pub.second.publisherNodeName);
   251                 tiMsg.add_publisherendpoint(pub.second.publisherEndpoint);
   256         s.connect(updatesEndPoint);
   258         sendMessage(tiMsg, s);
   260         mvsim_msgs::GenericAnswer ans;
   261         const auto m = receiveMessage(s);
   262         mvsim::parseMessage(
m, ans);
   269         const std::string& serviceName, 
const std::string& inputTypeName,
   270         const std::string& outputTypeName, 
const std::string& publisherEndpoint,
   271         const std::string& nodeName)
   278         if (!dbSrv.inputTypeName.empty() &&
   279                 (dbSrv.inputTypeName != inputTypeName ||
   280                  dbSrv.outputTypeName != outputTypeName))
   283                         "Trying to register service `%s` [%s->%s] but already known with "   286                         serviceName.c_str(), inputTypeName.c_str(), outputTypeName.c_str(),
   287                         dbSrv.inputTypeName.c_str(), dbSrv.outputTypeName.c_str()));
   289         dbSrv.serviceName = serviceName;
   290         dbSrv.inputTypeName = inputTypeName;
   291         dbSrv.outputTypeName = outputTypeName;
   292         dbSrv.endpoint = publisherEndpoint;
   293         dbSrv.nodeName = nodeName;
   297         const std::string& serviceName, std::string& publisherEndpoint,
   298         std::string& nodeName)
 const   306         auto& dbSrv = itSrv->second;
   308         publisherEndpoint = dbSrv.endpoint;
   309         nodeName = dbSrv.nodeName;
   314 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)   317 void Server::handle(
const mvsim_msgs::RegisterNodeRequest& 
m, zmq::socket_t& 
s)
   321                 "Registering new node named '" << 
m.nodename() << 
"'");
   330         mvsim_msgs::RegisterNodeAnswer rna;
   331         rna.set_success(
true);
   332         mvsim::sendMessage(rna, s);
   337         const mvsim_msgs::UnregisterNodeRequest& 
m, zmq::socket_t& 
s)
   344         mvsim_msgs::GenericAnswer rna;
   345         rna.set_success(
true);
   346         mvsim::sendMessage(rna, s);
   350 void Server::handle(
const mvsim_msgs::SubscribeRequest& 
m, zmq::socket_t& 
s)
   354                 "Subscription request for topic " << 
m.topic() << 
"'");
   361         mvsim_msgs::SubscribeAnswer ans;
   362         ans.set_topic(
m.topic());
   363         ans.set_success(
true);
   364         mvsim::sendMessage(ans, s);
   369         const mvsim_msgs::GetServiceInfoRequest& 
m, zmq::socket_t& 
s)
   373                 "GetServiceInfo request for service '" << 
m.servicename() << 
"'");
   375         mvsim_msgs::GetServiceInfoAnswer ans;
   376         std::string node, endpoint;
   380                 ans.set_success(
true);
   381                 ans.set_serviceendpoint(endpoint);
   382                 ans.set_servicenodename(node);
   386                 ans.set_success(
false);
   388                         "Could not find service `%s`", 
m.servicename().c_str()));
   391         mvsim::sendMessage(ans, s);
   395 void Server::handle(
const mvsim_msgs::ListTopicsRequest& 
m, zmq::socket_t& 
s)
   400         mvsim_msgs::ListTopicsAnswer ans;
   403         const auto& queryPrefix = 
m.topicstartswith();
   409                 const auto& 
t = kv.second;
   410                 const auto& 
name = 
t.topicName;
   412                 if (!queryPrefix.empty() ||
   413                         name.substr(0, queryPrefix.size()) == queryPrefix)
   415                         auto tInfo = ans.add_topics();
   416                         tInfo->set_topicname(
name);
   417                         tInfo->set_topictype(
t.topicTypeName);
   419                         for (
const auto& pubs : 
t.publishers)
   421                                 tInfo->add_publishername(pubs.second.publisherNodeName);
   422                                 tInfo->add_publisherendpoint(pubs.second.publisherEndpoint);
   426         mvsim::sendMessage(ans, s);
   430 void Server::handle(
const mvsim_msgs::ListNodesRequest& 
m, zmq::socket_t& 
s)
   436         const auto& queryPrefix = 
m.nodestartswith();
   438         mvsim_msgs::ListNodesAnswer ans;
   441                 const auto& 
name = 
n.second.nodeName;
   443                 if (!queryPrefix.empty() ||
   444                         name.substr(0, queryPrefix.size()) == queryPrefix)
   449         mvsim::sendMessage(ans, s);
   454         const mvsim_msgs::AdvertiseTopicRequest& 
m, zmq::socket_t& 
s)
   458                 "Received new topic advertiser: `%s` [%s] @ %s (%s)",
   459                 m.topicname().c_str(), 
m.topictypename().c_str(), 
m.endpoint().c_str(),
   460                 m.nodename().c_str());
   462         mvsim_msgs::GenericAnswer ans;
   466                         m.topicname(), 
m.topictypename(), 
m.endpoint(), 
m.nodename());
   467                 ans.set_success(
true);
   469         catch (
const std::exception& e)
   471                 ans.set_success(
false);
   472                 ans.set_errormessage(mrpt::exception_to_str(e));
   474         mvsim::sendMessage(ans, s);
   479         const mvsim_msgs::AdvertiseServiceRequest& 
m, zmq::socket_t& 
s)
   483                 "Received new service offering: `%s` [%s->%s] @ %s (%s)",
   484                 m.servicename().c_str(), 
m.inputtypename().c_str(),
   485                 m.outputtypename().c_str(), 
m.endpoint().c_str(), 
m.nodename().c_str());
   487         mvsim_msgs::GenericAnswer ans;
   491                         m.servicename(), 
m.inputtypename(), 
m.outputtypename(),
   492                         m.endpoint(), 
m.nodename());
   493                 ans.set_success(
true);
   495         catch (
const std::exception& e)
   497                 ans.set_success(
false);
   498                 ans.set_errormessage(mrpt::exception_to_str(e));
   500         mvsim::sendMessage(ans, s);
 void db_advertise_topic(const std::string &topicName, const std::string &topicTypeName, const std::string &publisherEndpoint, const std::string &nodeName)
std::map< node_name_t, InfoPerNode > connectedNodes_
void db_advertise_service(const std::string &serviceName, const std::string &inputTypeName, const std::string &outputTypeName, const std::string &publisherEndpoint, const std::string &nodeName)
#define THROW_EXCEPTION(msg)
std::map< service_name_t, InfoPerService > knownServices_
void internalServerThread()
void db_remove_node(const std::string &nodeName)
#define MRPT_LOG_INFO_STREAM(__CONTENTS)
#define MRPT_LOG_DEBUG(_STRING)
std::shared_mutex dbMutex
MRPT_TODO("Modify ping to run on Windows + Test this")
#define MRPT_LOG_DEBUG_STREAM(__CONTENTS)
#define MRPT_LOG_DEBUG_FMT(_FMT_STRING,...)
std::atomic< zmq::context_t * > mainThreadZMQcontext_
std::string BASE_IMPEXP format(const char *fmt,...) MRPT_printf_format_check(1
GLuint const GLchar * name
std::map< topic_name_t, InfoPerTopic > knownTopics_
void requestMainThreadTermination()
bool db_get_service_info(const std::string &serviceName, std::string &publisherEndpoint, std::string &nodeName) const 
unsigned int serverPortNo_
void db_add_topic_subscriber(const std::string &topicName, const std::string &updatesEndPoint)
void db_register_node(const std::string &nodeName)
#define MRPT_LOG_ERROR_STREAM(__CONTENTS)
#define ASSERTMSG_(f, __ERROR_MSG)