10 #include <mrpt/core/exceptions.h>
11 #include <mrpt/version.h>
14 #if MRPT_VERSION >= 0x204
15 #include <mrpt/system/thread_name.h>
18 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
19 #include <mvsim/mvsim-msgs/GenericAnswer.pb.h>
20 #include <mvsim/mvsim-msgs/GetServiceInfoAnswer.pb.h>
21 #include <mvsim/mvsim-msgs/ListNodesAnswer.pb.h>
22 #include <mvsim/mvsim-msgs/ListNodesRequest.pb.h>
23 #include <mvsim/mvsim-msgs/ListTopicsAnswer.pb.h>
24 #include <mvsim/mvsim-msgs/ListTopicsRequest.pb.h>
25 #include <mvsim/mvsim-msgs/RegisterNodeAnswer.pb.h>
26 #include <mvsim/mvsim-msgs/RegisterNodeRequest.pb.h>
27 #include <mvsim/mvsim-msgs/SubscribeAnswer.pb.h>
28 #include <mvsim/mvsim-msgs/SubscribeRequest.pb.h>
29 #include <mvsim/mvsim-msgs/UnregisterNodeRequest.pb.h>
35 using namespace mvsim;
43 ASSERTMSG_(!
mainThread_.joinable(),
"Server is already running.");
45 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
49 #if MRPT_VERSION >= 0x204
50 mrpt::system::thread_name(
"serverMain",
mainThread_);
54 THROW_EXCEPTION(
"MVSIM needs building with ZMQ and PROTOBUF to enable client/server");
62 MRPT_LOG_DEBUG_STREAM(
"Waiting for the thread to quit.");
67 MRPT_LOG_DEBUG_STREAM(
"Joined thread.");
69 catch (
const std::exception& e)
71 MRPT_LOG_ERROR_STREAM(
"shutdown: Exception: " << mrpt::exception_to_str(e));
77 using namespace std::string_literals;
79 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
82 MRPT_LOG_INFO_STREAM(
"Server thread started.");
84 zmq::context_t context(3);
87 zmq::socket_t mainRepSocket(context, ZMQ_REP);
92 zmq::message_t request;
95 #if CPPZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 1)
96 std::optional<size_t> reqSize = mainRepSocket.recv(request);
97 ASSERT_(reqSize.has_value());
99 mainRepSocket.recv(&request);
103 using client_requests_t = std::variant<
104 mvsim_msgs::RegisterNodeRequest, mvsim_msgs::UnregisterNodeRequest,
105 mvsim_msgs::SubscribeRequest, mvsim_msgs::ListNodesRequest,
106 mvsim_msgs::ListTopicsRequest, mvsim_msgs::AdvertiseTopicRequest,
107 mvsim_msgs::AdvertiseServiceRequest, mvsim_msgs::GetServiceInfoRequest>;
112 client_requests_t req = mvsim::parseMessageVariant<client_requests_t>(request);
116 [&](
const auto& m) { this->handle(m, mainRepSocket); },
120 catch (
const UnexpectedMessageException& e)
122 MRPT_LOG_ERROR_STREAM(e.what());
126 catch (
const zmq::error_t& e)
128 if (e.num() == ETERM)
132 MRPT_LOG_DEBUG_STREAM(
"Server thread about to exit for ZMQ term signal.");
136 MRPT_LOG_ERROR_STREAM(
"internalServerThread: ZMQ error: " << e.what());
139 catch (
const std::exception& e)
141 MRPT_LOG_ERROR_STREAM(
"internalServerThread: Exception: " << mrpt::exception_to_str(e));
143 MRPT_LOG_DEBUG_STREAM(
"Server thread quitted.");
151 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
155 #if CPPZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 4, 0)
159 zmq_ctx_shutdown(ctx->operator
void*());
172 for (
const std::string& topic : itNode->second.advertisedTopics)
192 const std::string& topicName,
const std::string& topicTypeName,
193 const std::string& publisherEndpoint,
const std::string& nodeName)
200 if (!dbTopic.topicTypeName.empty() && dbTopic.topicTypeName != topicTypeName)
202 throw std::runtime_error(mrpt::format(
203 "Trying to register topic `%s` [%s] but already known with type "
205 topicName.c_str(), topicTypeName.c_str(), dbTopic.topicTypeName.c_str()));
207 dbTopic.topicName = topicName;
208 dbTopic.topicTypeName = topicTypeName;
210 dbTopic.publishers.try_emplace(nodeName, topicName, nodeName, publisherEndpoint);
218 const std::string& topicName,
const std::string& updatesEndPoint)
224 dbTopic.subscribers.try_emplace(updatesEndPoint, topicName, updatesEndPoint);
231 const std::string& topicName,
const std::optional<std::string>& updatesEndPoint)
233 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
236 mvsim_msgs::TopicInfo tiMsg;
237 tiMsg.set_topicname(topicName);
238 tiMsg.set_topictype(dbTopic.topicTypeName);
240 for (
const auto& pub : dbTopic.publishers)
242 tiMsg.add_publishername(pub.second.publisherNodeName);
243 tiMsg.add_publisherendpoint(pub.second.publisherEndpoint);
248 auto lambdaSendToSub = [&](
const std::string& subUpdtEndPoint)
252 MRPT_LOG_DEBUG_STREAM(
253 "[send_topic_publishers_to_subscribed_clients] Letting "
254 << subUpdtEndPoint <<
" know about " << dbTopic.publishers.size()
255 <<
" publishers for topic '" << topicName <<
"'");
258 s.connect(subUpdtEndPoint);
259 #if CPPZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 7, 1)
262 ASSERT_(
s.connected());
264 sendMessage(tiMsg,
s);
266 mvsim_msgs::GenericAnswer ans;
267 const auto m = receiveMessage(
s);
268 mvsim::parseMessage(m, ans);
269 ASSERT_(ans.success());
271 catch (
const std::exception& e)
273 MRPT_LOG_ERROR_STREAM(
274 "Error sending topic updates to endpoint " << subUpdtEndPoint <<
":\n"
279 if (updatesEndPoint.has_value())
282 lambdaSendToSub(*updatesEndPoint);
287 for (
const auto& ips : dbTopic.subscribers)
289 lambdaSendToSub(ips.second.subscriberUpdatesEndpoint);
296 const std::string& serviceName,
const std::string& inputTypeName,
297 const std::string& outputTypeName,
const std::string& publisherEndpoint,
298 const std::string& nodeName)
305 if (!dbSrv.inputTypeName.empty() &&
306 (dbSrv.inputTypeName != inputTypeName || dbSrv.outputTypeName != outputTypeName))
308 throw std::runtime_error(mrpt::format(
309 "Trying to register service `%s` [%s->%s] but already known "
313 serviceName.c_str(), inputTypeName.c_str(), outputTypeName.c_str(),
314 dbSrv.inputTypeName.c_str(), dbSrv.outputTypeName.c_str()));
316 dbSrv.serviceName = serviceName;
317 dbSrv.inputTypeName = inputTypeName;
318 dbSrv.outputTypeName = outputTypeName;
319 dbSrv.endpoint = publisherEndpoint;
320 dbSrv.nodeName = nodeName;
324 const std::string& serviceName, std::string& publisherEndpoint, std::string& nodeName)
const
332 auto& dbSrv = itSrv->second;
334 publisherEndpoint = dbSrv.endpoint;
335 nodeName = dbSrv.nodeName;
340 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
343 void Server::handle(
const mvsim_msgs::RegisterNodeRequest& m, zmq::socket_t& s)
346 MRPT_LOG_DEBUG_STREAM(
"Registering new node named '" << m.nodename() <<
"'");
355 mvsim_msgs::RegisterNodeAnswer rna;
356 rna.set_success(
true);
357 mvsim::sendMessage(rna, s);
361 void Server::handle(
const mvsim_msgs::UnregisterNodeRequest& m, zmq::socket_t& s)
364 MRPT_LOG_DEBUG_STREAM(
"Unregistering node named '" << m.nodename() <<
"'");
368 mvsim_msgs::GenericAnswer rna;
369 rna.set_success(
true);
370 mvsim::sendMessage(rna, s);
374 void Server::handle(
const mvsim_msgs::SubscribeRequest& m, zmq::socket_t& s)
377 MRPT_LOG_DEBUG_STREAM(
"Subscription request for topic " << m.topic() <<
"'");
384 mvsim_msgs::SubscribeAnswer ans;
385 ans.set_topic(m.topic());
386 ans.set_success(
true);
387 mvsim::sendMessage(ans, s);
391 void Server::handle(
const mvsim_msgs::GetServiceInfoRequest& m, zmq::socket_t& s)
394 MRPT_LOG_DEBUG_STREAM(
"GetServiceInfo request for service '" << m.servicename() <<
"'");
396 mvsim_msgs::GetServiceInfoAnswer ans;
397 std::string node, endpoint;
401 ans.set_success(
true);
402 ans.set_serviceendpoint(endpoint);
403 ans.set_servicenodename(node);
407 ans.set_success(
false);
408 ans.set_errormessage(mrpt::format(
"Could not find service `%s`", m.servicename().c_str()));
411 mvsim::sendMessage(ans, s);
415 void Server::handle(
const mvsim_msgs::ListTopicsRequest& m, zmq::socket_t& s)
418 MRPT_LOG_DEBUG(
"Listing topics request");
420 mvsim_msgs::ListTopicsAnswer ans;
423 const auto& queryPrefix = m.topicstartswith();
429 const auto&
t = kv.second;
430 const auto& name =
t.topicName;
432 if (!queryPrefix.empty() || name.substr(0, queryPrefix.size()) == queryPrefix)
434 auto tInfo = ans.add_topics();
435 tInfo->set_topicname(name);
436 tInfo->set_topictype(
t.topicTypeName);
438 for (
const auto& pubs :
t.publishers)
440 tInfo->add_publishername(pubs.second.publisherNodeName);
441 tInfo->add_publisherendpoint(pubs.second.publisherEndpoint);
445 mvsim::sendMessage(ans, s);
449 void Server::handle(
const mvsim_msgs::ListNodesRequest& m, zmq::socket_t& s)
452 MRPT_LOG_DEBUG(
"Listing nodes request");
455 const auto& queryPrefix = m.nodestartswith();
457 mvsim_msgs::ListNodesAnswer ans;
460 const auto& name = n.second.nodeName;
462 if (!queryPrefix.empty() || name.substr(0, queryPrefix.size()) == queryPrefix)
467 mvsim::sendMessage(ans, s);
471 void Server::handle(
const mvsim_msgs::AdvertiseTopicRequest& m, zmq::socket_t& s)
475 "Received new topic advertiser: `%s` [%s] @ %s (%s)", m.topicname().c_str(),
476 m.topictypename().c_str(), m.endpoint().c_str(), m.nodename().c_str());
478 mvsim_msgs::GenericAnswer ans;
482 ans.set_success(
true);
484 catch (
const std::exception& e)
486 ans.set_success(
false);
487 ans.set_errormessage(mrpt::exception_to_str(e));
489 mvsim::sendMessage(ans, s);
493 void Server::handle(
const mvsim_msgs::AdvertiseServiceRequest& m, zmq::socket_t& s)
497 "Received new service offering: `%s` [%s->%s] @ %s (%s)", m.servicename().c_str(),
498 m.inputtypename().c_str(), m.outputtypename().c_str(), m.endpoint().c_str(),
499 m.nodename().c_str());
501 mvsim_msgs::GenericAnswer ans;
505 m.servicename(), m.inputtypename(), m.outputtypename(), m.endpoint(), m.nodename());
506 ans.set_success(
true);
508 catch (
const std::exception& e)
510 ans.set_success(
false);
511 ans.set_errormessage(mrpt::exception_to_str(e));
513 mvsim::sendMessage(ans, s);