10 #include <mrpt/core/exceptions.h> 11 #include <mrpt/version.h> 14 #if MRPT_VERSION >= 0x204 15 #include <mrpt/system/thread_name.h> 18 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 21 #include "GenericAnswer.pb.h" 22 #include "GetServiceInfoAnswer.pb.h" 23 #include "ListNodesAnswer.pb.h" 24 #include "ListNodesRequest.pb.h" 25 #include "ListTopicsAnswer.pb.h" 26 #include "ListTopicsRequest.pb.h" 27 #include "RegisterNodeAnswer.pb.h" 28 #include "RegisterNodeRequest.pb.h" 29 #include "SubscribeAnswer.pb.h" 30 #include "SubscribeRequest.pb.h" 31 #include "UnregisterNodeRequest.pb.h" 35 using namespace mvsim;
45 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 49 #if MRPT_VERSION >= 0x204 50 mrpt::system::thread_name(
"serverMain",
mainThread_);
55 "MVSIM needs building with ZMQ and PROTOBUF to enable client/server");
70 catch (
const std::exception& e)
73 "shutdown: Exception: " << mrpt::exception_to_str(e));
79 using namespace std::string_literals;
81 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 86 zmq::context_t context(1);
89 zmq::socket_t mainRepSocket(context, ZMQ_REP);
94 zmq::message_t request;
97 #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 1) 98 std::optional<size_t> reqSize = mainRepSocket.recv(request);
101 mainRepSocket.recv(&request);
105 using client_requests_t = std::variant<
106 mvsim_msgs::RegisterNodeRequest,
107 mvsim_msgs::UnregisterNodeRequest, mvsim_msgs::SubscribeRequest,
108 mvsim_msgs::ListNodesRequest, mvsim_msgs::ListTopicsRequest,
109 mvsim_msgs::AdvertiseTopicRequest,
110 mvsim_msgs::AdvertiseServiceRequest,
111 mvsim_msgs::GetServiceInfoRequest>;
116 client_requests_t req =
117 mvsim::parseMessageVariant<client_requests_t>(request);
121 [&](
const auto&
m) { this->handle(
m, mainRepSocket); },
125 catch (
const UnexpectedMessageException& e)
131 catch (
const zmq::error_t& e)
133 if (e.num() == ETERM)
138 "Server thread about to exit for ZMQ term signal.");
143 "internalServerThread: ZMQ error: " << e.what());
146 catch (
const std::exception& e)
149 "internalServerThread: Exception: " << mrpt::exception_to_str(e));
159 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 163 #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 4, 0) 167 zmq_ctx_shutdown(ctx->operator
void*());
180 for (
const std::string& topic : itNode->second.advertisedTopics)
202 const std::string& topicName,
const std::string& topicTypeName,
203 const std::string& publisherEndpoint,
const std::string& nodeName)
210 if (!dbTopic.topicTypeName.empty() &&
211 dbTopic.topicTypeName != topicTypeName)
214 "Trying to register topic `%s` [%s] but already known with type " 216 topicName.c_str(), topicTypeName.c_str(),
217 dbTopic.topicTypeName.c_str()));
219 dbTopic.topicName = topicName;
220 dbTopic.topicTypeName = topicTypeName;
222 dbTopic.publishers.try_emplace(
223 nodeName, topicName, nodeName, publisherEndpoint);
227 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 233 const std::string& topicName,
const std::string& updatesEndPoint)
239 dbTopic.subscribers.try_emplace(
240 updatesEndPoint, topicName, updatesEndPoint);
243 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 244 mvsim_msgs::TopicInfo tiMsg;
245 tiMsg.set_topicname(topicName);
246 tiMsg.set_topictype(dbTopic.topicTypeName);
248 for (
const auto& pub : dbTopic.publishers)
250 tiMsg.add_publishername(pub.second.publisherNodeName);
251 tiMsg.add_publisherendpoint(pub.second.publisherEndpoint);
256 s.connect(updatesEndPoint);
258 sendMessage(tiMsg, s);
260 mvsim_msgs::GenericAnswer ans;
261 const auto m = receiveMessage(s);
262 mvsim::parseMessage(
m, ans);
269 const std::string& serviceName,
const std::string& inputTypeName,
270 const std::string& outputTypeName,
const std::string& publisherEndpoint,
271 const std::string& nodeName)
278 if (!dbSrv.inputTypeName.empty() &&
279 (dbSrv.inputTypeName != inputTypeName ||
280 dbSrv.outputTypeName != outputTypeName))
283 "Trying to register service `%s` [%s->%s] but already known with " 286 serviceName.c_str(), inputTypeName.c_str(), outputTypeName.c_str(),
287 dbSrv.inputTypeName.c_str(), dbSrv.outputTypeName.c_str()));
289 dbSrv.serviceName = serviceName;
290 dbSrv.inputTypeName = inputTypeName;
291 dbSrv.outputTypeName = outputTypeName;
292 dbSrv.endpoint = publisherEndpoint;
293 dbSrv.nodeName = nodeName;
297 const std::string& serviceName, std::string& publisherEndpoint,
298 std::string& nodeName)
const 306 auto& dbSrv = itSrv->second;
308 publisherEndpoint = dbSrv.endpoint;
309 nodeName = dbSrv.nodeName;
314 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 317 void Server::handle(
const mvsim_msgs::RegisterNodeRequest&
m, zmq::socket_t&
s)
321 "Registering new node named '" <<
m.nodename() <<
"'");
330 mvsim_msgs::RegisterNodeAnswer rna;
331 rna.set_success(
true);
332 mvsim::sendMessage(rna, s);
337 const mvsim_msgs::UnregisterNodeRequest&
m, zmq::socket_t&
s)
344 mvsim_msgs::GenericAnswer rna;
345 rna.set_success(
true);
346 mvsim::sendMessage(rna, s);
350 void Server::handle(
const mvsim_msgs::SubscribeRequest&
m, zmq::socket_t&
s)
354 "Subscription request for topic " <<
m.topic() <<
"'");
361 mvsim_msgs::SubscribeAnswer ans;
362 ans.set_topic(
m.topic());
363 ans.set_success(
true);
364 mvsim::sendMessage(ans, s);
369 const mvsim_msgs::GetServiceInfoRequest&
m, zmq::socket_t&
s)
373 "GetServiceInfo request for service '" <<
m.servicename() <<
"'");
375 mvsim_msgs::GetServiceInfoAnswer ans;
376 std::string node, endpoint;
380 ans.set_success(
true);
381 ans.set_serviceendpoint(endpoint);
382 ans.set_servicenodename(node);
386 ans.set_success(
false);
388 "Could not find service `%s`",
m.servicename().c_str()));
391 mvsim::sendMessage(ans, s);
395 void Server::handle(
const mvsim_msgs::ListTopicsRequest&
m, zmq::socket_t&
s)
400 mvsim_msgs::ListTopicsAnswer ans;
403 const auto& queryPrefix =
m.topicstartswith();
409 const auto&
t = kv.second;
410 const auto&
name =
t.topicName;
412 if (!queryPrefix.empty() ||
413 name.substr(0, queryPrefix.size()) == queryPrefix)
415 auto tInfo = ans.add_topics();
416 tInfo->set_topicname(
name);
417 tInfo->set_topictype(
t.topicTypeName);
419 for (
const auto& pubs :
t.publishers)
421 tInfo->add_publishername(pubs.second.publisherNodeName);
422 tInfo->add_publisherendpoint(pubs.second.publisherEndpoint);
426 mvsim::sendMessage(ans, s);
430 void Server::handle(
const mvsim_msgs::ListNodesRequest&
m, zmq::socket_t&
s)
436 const auto& queryPrefix =
m.nodestartswith();
438 mvsim_msgs::ListNodesAnswer ans;
441 const auto&
name =
n.second.nodeName;
443 if (!queryPrefix.empty() ||
444 name.substr(0, queryPrefix.size()) == queryPrefix)
449 mvsim::sendMessage(ans, s);
454 const mvsim_msgs::AdvertiseTopicRequest&
m, zmq::socket_t&
s)
458 "Received new topic advertiser: `%s` [%s] @ %s (%s)",
459 m.topicname().c_str(),
m.topictypename().c_str(),
m.endpoint().c_str(),
460 m.nodename().c_str());
462 mvsim_msgs::GenericAnswer ans;
466 m.topicname(),
m.topictypename(),
m.endpoint(),
m.nodename());
467 ans.set_success(
true);
469 catch (
const std::exception& e)
471 ans.set_success(
false);
472 ans.set_errormessage(mrpt::exception_to_str(e));
474 mvsim::sendMessage(ans, s);
479 const mvsim_msgs::AdvertiseServiceRequest&
m, zmq::socket_t&
s)
483 "Received new service offering: `%s` [%s->%s] @ %s (%s)",
484 m.servicename().c_str(),
m.inputtypename().c_str(),
485 m.outputtypename().c_str(),
m.endpoint().c_str(),
m.nodename().c_str());
487 mvsim_msgs::GenericAnswer ans;
491 m.servicename(),
m.inputtypename(),
m.outputtypename(),
492 m.endpoint(),
m.nodename());
493 ans.set_success(
true);
495 catch (
const std::exception& e)
497 ans.set_success(
false);
498 ans.set_errormessage(mrpt::exception_to_str(e));
500 mvsim::sendMessage(ans, s);
void db_advertise_topic(const std::string &topicName, const std::string &topicTypeName, const std::string &publisherEndpoint, const std::string &nodeName)
std::map< node_name_t, InfoPerNode > connectedNodes_
void db_advertise_service(const std::string &serviceName, const std::string &inputTypeName, const std::string &outputTypeName, const std::string &publisherEndpoint, const std::string &nodeName)
#define THROW_EXCEPTION(msg)
std::map< service_name_t, InfoPerService > knownServices_
void internalServerThread()
void db_remove_node(const std::string &nodeName)
#define MRPT_LOG_INFO_STREAM(__CONTENTS)
#define MRPT_LOG_DEBUG(_STRING)
std::shared_mutex dbMutex
MRPT_TODO("Modify ping to run on Windows + Test this")
#define MRPT_LOG_DEBUG_STREAM(__CONTENTS)
#define MRPT_LOG_DEBUG_FMT(_FMT_STRING,...)
std::atomic< zmq::context_t * > mainThreadZMQcontext_
std::string BASE_IMPEXP format(const char *fmt,...) MRPT_printf_format_check(1
GLuint const GLchar * name
std::map< topic_name_t, InfoPerTopic > knownTopics_
void requestMainThreadTermination()
bool db_get_service_info(const std::string &serviceName, std::string &publisherEndpoint, std::string &nodeName) const
unsigned int serverPortNo_
void db_add_topic_subscriber(const std::string &topicName, const std::string &updatesEndPoint)
void db_register_node(const std::string &nodeName)
#define MRPT_LOG_ERROR_STREAM(__CONTENTS)
#define ASSERTMSG_(f, __ERROR_MSG)