10 #include <mrpt/core/exceptions.h> 11 #include <mrpt/version.h> 16 #if MRPT_VERSION >= 0x204 17 #include <mrpt/system/thread_name.h> 22 #include <shared_mutex> 24 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 26 #include <google/protobuf/text_format.h> 30 #include "AdvertiseServiceRequest.pb.h" 31 #include "AdvertiseTopicRequest.pb.h" 32 #include "CallService.pb.h" 33 #include "GenericAnswer.pb.h" 34 #include "GetServiceInfoAnswer.pb.h" 35 #include "GetServiceInfoRequest.pb.h" 36 #include "ListNodesAnswer.pb.h" 37 #include "ListNodesRequest.pb.h" 38 #include "ListTopicsAnswer.pb.h" 39 #include "ListTopicsRequest.pb.h" 40 #include "RegisterNodeAnswer.pb.h" 41 #include "RegisterNodeRequest.pb.h" 42 #include "SubscribeAnswer.pb.h" 43 #include "SubscribeRequest.pb.h" 44 #include "UnregisterNodeRequest.pb.h" 48 using namespace mvsim;
50 #if defined(MVSIM_HAS_ZMQ) 53 struct InfoPerAdvertisedTopic
55 InfoPerAdvertisedTopic(zmq::context_t&
c) : context(c) {}
57 zmq::context_t& context;
59 std::string topicName;
60 zmq::socket_t pubSocket = zmq::socket_t(context, ZMQ_PUB);
62 const google::protobuf::Descriptor* descriptor =
nullptr;
67 InfoPerService() =
default;
69 std::string serviceName;
70 const google::protobuf::Descriptor* descInput =
nullptr;
71 const google::protobuf::Descriptor* descOutput =
nullptr;
74 struct InfoPerSubscribedTopic
76 InfoPerSubscribedTopic(zmq::context_t&
c) : context(c) {}
77 ~InfoPerSubscribedTopic()
79 if (topicThread.joinable()) topicThread.join();
82 zmq::context_t& context;
84 std::string topicName;
85 zmq::socket_t subSocket = zmq::socket_t(context, ZMQ_SUB);
86 const google::protobuf::Descriptor* descriptor =
nullptr;
88 std::vector<Client::topic_callback_t> callbacks;
90 std::thread topicThread;
97 #if defined(MVSIM_HAS_ZMQ) 98 zmq::context_t context{1, ZMQ_MAX_SOCKETS_DFLT};
99 std::optional<zmq::socket_t> mainReqSocket;
100 mvsim::SocketMonitor mainReqSocketMonitor;
102 std::map<std::string, internal::InfoPerAdvertisedTopic> advertisedTopics;
103 std::shared_mutex advertisedTopics_mtx;
105 std::optional<zmq::socket_t> srvListenSocket;
106 std::map<std::string, internal::InfoPerService> offeredServices;
107 std::shared_mutex offeredServices_mtx;
109 std::map<std::string, internal::InfoPerSubscribedTopic> subscribedTopics;
110 std::shared_mutex subscribedTopics_mtx;
112 std::optional<zmq::socket_t> topicNotificationsSocket;
113 std::string topicNotificationsEndPoint;
119 :
mrpt::system::COutputLogger(
"mvsim::Client"),
131 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 132 return zmq_->mainReqSocketMonitor.connected();
140 using namespace std::string_literals;
142 !
zmq_->mainReqSocket || !
zmq_->mainReqSocket->connected(),
143 "Client is already running.");
145 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 147 zmq_->mainReqSocket.emplace(
zmq_->context, ZMQ_REQ);
150 zmq_->mainReqSocketMonitor.monitor(
zmq_->mainReqSocket.value());
152 zmq_->mainReqSocket->connect(
160 zmq_->srvListenSocket.emplace(
zmq_->context, ZMQ_REP);
161 zmq_->srvListenSocket->bind(
"tcp://0.0.0.0:*"s);
163 if (!
zmq_->srvListenSocket->connected())
168 "Client service thread is already running!");
172 #if MRPT_VERSION >= 0x204 173 mrpt::system::thread_name(
"services_"s +
nodeName_, serviceInvokerThread_);
177 zmq_->topicNotificationsSocket.emplace(
zmq_->context, ZMQ_PAIR);
178 zmq_->topicNotificationsSocket->bind(
"tcp://0.0.0.0:*"s);
180 if (!
zmq_->topicNotificationsSocket->connected())
183 zmq_->topicNotificationsEndPoint =
184 get_zmq_endpoint(*
zmq_->topicNotificationsSocket);
188 "Client topic updates thread is already running!");
192 #if MRPT_VERSION >= 0x204 193 mrpt::system::thread_name(
194 "topicUpdates_"s +
nodeName_, topicUpdatesThread_);
199 "MVSIM needs building with ZMQ and PROTOBUF to enable " 206 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 208 if (!
zmq_->mainReqSocket->connected())
return;
215 catch (
const std::exception& e)
218 "shutdown: Exception: " << mrpt::exception_to_str(e));
221 #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 4, 0) 222 zmq_->context.shutdown();
225 zmq_ctx_shutdown(
zmq_->context.operator
void*());
230 zmq_->subscribedTopics.clear();
231 zmq_->offeredServices.clear();
238 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 239 auto&
s = *
zmq_->mainReqSocket;
241 mvsim_msgs::RegisterNodeRequest rnq;
243 mvsim::sendMessage(rnq,
s);
246 const zmq::message_t reply = mvsim::receiveMessage(
s);
248 mvsim_msgs::RegisterNodeAnswer rna;
249 mvsim::parseMessage(reply, rna);
253 "Server did not allow registering node: %s",
254 rna.errormessage().c_str());
264 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 265 auto&
s = *
zmq_->mainReqSocket;
267 mvsim_msgs::UnregisterNodeRequest rnq;
269 mvsim::sendMessage(rnq,
s);
272 const zmq::message_t reply = mvsim::receiveMessage(
s);
274 mvsim_msgs::GenericAnswer rna;
275 mvsim::parseMessage(reply, rna);
279 "Server answered an error unregistering node: %s",
280 rna.errormessage().c_str());
290 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 291 auto&
s = *
zmq_->mainReqSocket;
293 mvsim_msgs::ListNodesRequest req;
294 mvsim::sendMessage(req,
s);
297 const zmq::message_t reply = mvsim::receiveMessage(
s);
299 mvsim_msgs::ListNodesAnswer lna;
300 mvsim::parseMessage(reply, lna);
302 std::vector<Client::InfoPerNode> nodes;
303 nodes.resize(lna.nodes_size());
305 for (
int i = 0; i < lna.nodes_size(); i++)
307 nodes[i].name = lna.nodes(i);
317 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 318 auto&
s = *
zmq_->mainReqSocket;
320 mvsim_msgs::ListTopicsRequest req;
321 mvsim::sendMessage(req,
s);
324 const zmq::message_t reply = mvsim::receiveMessage(
s);
326 mvsim_msgs::ListTopicsAnswer lta;
327 mvsim::parseMessage(reply, lta);
329 std::vector<Client::InfoPerTopic> topics;
330 topics.resize(lta.topics_size());
332 for (
int i = 0; i < lta.topics_size(); i++)
334 const auto&
t = lta.topics(i);
335 auto&
dst = topics[i];
337 dst.name =
t.topicname();
338 dst.type =
t.topictype();
341 dst.endpoints.resize(
t.publisherendpoint_size());
342 dst.publishers.resize(
t.publisherendpoint_size());
344 for (
int k = 0; k <
t.publisherendpoint_size(); k++)
346 dst.publishers[k] =
t.publishername(k);
347 dst.endpoints[k] =
t.publisherendpoint(k);
357 const std::string& topicName,
358 const google::protobuf::Descriptor* descriptor)
360 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 362 auto& advTopics =
zmq_->advertisedTopics;
364 std::unique_lock<std::shared_mutex> lck(
zmq_->advertisedTopics_mtx);
366 if (advTopics.find(topicName) != advTopics.end())
368 "Topic `%s` already registered for publication in this same " 374 internal::InfoPerAdvertisedTopic& ipat =
375 advTopics.emplace_hint(advTopics.begin(), topicName,
zmq_->context)
381 ipat.pubSocket.bind(
"tcp://0.0.0.0:*");
382 if (!ipat.pubSocket.connected())
386 ipat.endpoint = get_zmq_endpoint(ipat.pubSocket);
387 ipat.topicName = topicName;
388 ipat.descriptor = descriptor;
391 "Advertising topic `%s` [%s] on endpoint `%s`", topicName.c_str(),
392 descriptor->full_name().c_str(), ipat.endpoint.c_str());
396 mvsim_msgs::AdvertiseTopicRequest req;
397 req.set_topicname(ipat.topicName);
398 req.set_endpoint(ipat.endpoint);
399 req.set_topictypename(ipat.descriptor->full_name());
402 mvsim::sendMessage(req, *
zmq_->mainReqSocket);
405 const zmq::message_t reply = mvsim::receiveMessage(*
zmq_->mainReqSocket);
406 mvsim_msgs::GenericAnswer ans;
407 mvsim::parseMessage(reply, ans);
411 "Error registering topic `%s` in server: `%s`", topicName.c_str(),
412 ans.errormessage().c_str());
420 const std::string& serviceName,
const google::protobuf::Descriptor* descIn,
423 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 425 std::unique_lock<std::shared_mutex> lck(
zmq_->offeredServices_mtx);
427 auto& services =
zmq_->offeredServices;
429 if (services.find(serviceName) != services.end())
431 "Service `%s` already registered in this same client!",
432 serviceName.c_str());
434 internal::InfoPerService& ips = services[serviceName];
439 char assignedPort[100];
440 size_t assignedPortLen =
sizeof(assignedPort);
441 zmq_->srvListenSocket->getsockopt(
442 ZMQ_LAST_ENDPOINT, assignedPort, &assignedPortLen);
443 assignedPort[assignedPortLen] =
'\0';
445 ips.serviceName = serviceName;
446 ips.callback = callback;
447 ips.descInput = descIn;
448 ips.descOutput = descOut;
451 "Advertising service `%s` [%s->%s] on endpoint `%s`",
452 serviceName.c_str(), descIn->full_name().c_str(),
453 descOut->full_name().c_str(), assignedPort);
455 mvsim_msgs::AdvertiseServiceRequest req;
456 req.set_servicename(ips.serviceName);
457 req.set_endpoint(assignedPort);
458 req.set_inputtypename(ips.descInput->full_name());
459 req.set_outputtypename(ips.descOutput->full_name());
462 mvsim::sendMessage(req, *
zmq_->mainReqSocket);
465 const zmq::message_t reply = mvsim::receiveMessage(*
zmq_->mainReqSocket);
466 mvsim_msgs::GenericAnswer ans;
467 mvsim::parseMessage(reply, ans);
471 "Error registering service `%s` in server: `%s`",
472 serviceName.c_str(), ans.errormessage().c_str());
480 const std::string& topicName,
const google::protobuf::Message& msg)
483 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 485 zmq_ &&
zmq_->mainReqSocket &&
zmq_->mainReqSocket->connected(),
486 "Client not connected to Server");
488 std::shared_lock<std::shared_mutex> lck(
zmq_->advertisedTopics_mtx);
489 auto itIpat =
zmq_->advertisedTopics.find(topicName);
492 itIpat !=
zmq_->advertisedTopics.end(),
494 "Topic `%s` cannot been registered. Missing former call to " 500 auto& ipat = itIpat->second;
503 msg.GetDescriptor() == ipat.descriptor,
505 "Topic `%s` has type `%s`, but expected `%s` from former call to " 507 topicName.c_str(), msg.GetDescriptor()->name().c_str(),
508 ipat.descriptor->name().c_str()));
510 ASSERT_(ipat.pubSocket.connected());
512 mvsim::sendMessage(msg, ipat.pubSocket);
516 "Published on topic `%s`: %s", topicName.c_str(),
517 msg.DebugString().c_str());
528 using namespace std::string_literals;
530 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 534 "[" <<
nodeName_ <<
"] Client service thread started.");
536 zmq::socket_t&
s = *
zmq_->srvListenSocket;
541 zmq::message_t
m = mvsim::receiveMessage(s);
544 mvsim_msgs::CallService csMsg;
545 mvsim::parseMessage(m, csMsg);
547 std::shared_lock<std::shared_mutex> lck(
zmq_->offeredServices_mtx);
548 const auto& srvName = csMsg.servicename();
550 auto itSrv =
zmq_->offeredServices.find(srvName);
551 if (itSrv ==
zmq_->offeredServices.end())
554 mvsim_msgs::GenericAnswer ans;
555 ans.set_success(
false);
557 "Requested unknown service `%s`", srvName.c_str()));
560 mvsim::sendMessage(ans, s);
564 internal::InfoPerService& ips = itSrv->second;
568 auto outMsgPtr = ips.callback(csMsg.serializedinput());
571 mvsim::sendMessage(*outMsgPtr, s);
574 catch (
const zmq::error_t& e)
576 if (e.num() == ETERM)
581 "internalServiceServingThread about to exit for ZMQ term " 587 "internalServiceServingThread: ZMQ error: " << e.what());
590 catch (
const std::exception& e)
593 "internalServiceServingThread: Exception: " 594 << mrpt::exception_to_str(e));
603 using namespace std::string_literals;
605 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 609 "[" <<
nodeName_ <<
"] Client topic updates thread started.");
611 zmq::socket_t&
s = *
zmq_->topicNotificationsSocket;
616 zmq::message_t
m = mvsim::receiveMessage(s);
619 mvsim_msgs::TopicInfo tiMsg;
620 mvsim::parseMessage(m, tiMsg);
623 mvsim_msgs::GenericAnswer ans;
624 ans.set_success(
true);
625 mvsim::sendMessage(ans, s);
633 std::shared_lock<std::shared_mutex> lck(
zmq_->subscribedTopics_mtx);
634 const auto& topicName = tiMsg.topicname();
636 auto itTopic =
zmq_->subscribedTopics.find(topicName);
637 if (itTopic ==
zmq_->subscribedTopics.end())
643 <<
"` update message from server, but this node is not " 644 "subscribed to it (!).");
649 "[internalTopicUpdatesThread] Received: " 650 << tiMsg.DebugString());
652 internal::InfoPerSubscribedTopic& ipt = itTopic->second;
654 for (
int i = 0; i < tiMsg.publisherendpoint_size(); i++)
656 ipt.subSocket.connect(tiMsg.publisherendpoint(i));
662 catch (
const zmq::error_t& e)
664 if (e.num() == ETERM)
669 "internalTopicUpdatesThread about to exit for ZMQ term " 675 "internalTopicUpdatesThread: ZMQ error: " << e.what());
678 catch (
const std::exception& e)
681 "internalTopicUpdatesThread: Exception: " 682 << mrpt::exception_to_str(e));
685 "[" <<
nodeName_ <<
"] Client topic updates thread quitted.");
692 const std::string& serviceName,
const std::string& inputSerializedMsg)
695 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 697 std::string outMsgData, outMsgType;
699 serviceName, inputSerializedMsg, std::nullopt, outMsgData, outMsgType);
706 const std::string& serviceName,
const std::string& inputSerializedMsg,
707 mrpt::optional_ref<google::protobuf::Message> outputMsg,
708 mrpt::optional_ref<std::string> outputSerializedMsg,
709 mrpt::optional_ref<std::string> outputMsgTypeName)
712 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 716 std::string srvEndpoint;
718 zmq::socket_t&
s = *
zmq_->mainReqSocket;
720 mvsim_msgs::GetServiceInfoRequest gsi;
721 gsi.set_servicename(serviceName);
722 mvsim::sendMessage(gsi, s);
724 auto m = mvsim::receiveMessage(s);
725 mvsim_msgs::GetServiceInfoAnswer gsia;
726 mvsim::parseMessage(
m, gsia);
730 "Error requesting information about service `%s`: %s",
731 serviceName.c_str(), gsia.errormessage().c_str());
733 srvEndpoint = gsia.serviceendpoint();
737 zmq::socket_t srvReqSock(
zmq_->context, ZMQ_REQ);
738 srvReqSock.connect(srvEndpoint);
740 mvsim_msgs::CallService csMsg;
741 csMsg.set_servicename(serviceName);
742 csMsg.set_serializedinput(inputSerializedMsg);
744 mvsim::sendMessage(csMsg, srvReqSock);
746 const auto m = mvsim::receiveMessage(srvReqSock);
749 mvsim::parseMessage(
m, outputMsg.value().get());
751 if (outputSerializedMsg)
753 const auto [typeName, serializedData] =
754 internal::parseMessageToParts(
m);
756 outputSerializedMsg.value().get() = serializedData;
757 if (outputMsgTypeName) outputMsgTypeName.value().get() = typeName;
764 const std::string& topicName,
765 const google::protobuf::Descriptor* descriptor,
769 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 771 std::unique_lock<std::shared_mutex> lck(
zmq_->subscribedTopics_mtx);
773 auto& topics =
zmq_->subscribedTopics;
776 internal::InfoPerSubscribedTopic& ipt =
777 topics.emplace_hint(topics.begin(), topicName,
zmq_->context)->second;
780 ipt.subSocket.setsockopt(ZMQ_SUBSCRIBE,
"", 0);
782 ipt.callbacks.push_back(callback);
784 ipt.topicName = topicName;
792 mvsim_msgs::SubscribeRequest subReq;
793 subReq.set_topic(topicName);
794 subReq.set_updatesendpoint(
zmq_->topicNotificationsEndPoint);
796 mvsim::sendMessage(subReq, *
zmq_->mainReqSocket);
798 const auto m = mvsim::receiveMessage(*
zmq_->mainReqSocket);
799 mvsim_msgs::SubscribeAnswer subAns;
800 mvsim::parseMessage(
m, subAns);
814 using namespace std::string_literals;
816 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 820 "[" <<
nodeName_ <<
"] Client topic subscribe thread for `" 821 << ipt.topicName <<
"` started.");
823 zmq::socket_t&
s = ipt.subSocket;
828 const zmq::message_t
m = mvsim::receiveMessage(s);
833 for (
auto callback : ipt.callbacks) callback(m);
835 catch (
const std::exception& e)
838 "Exception in topic `" 840 <<
"` subscription callback:" << mrpt::exception_to_str(e));
844 catch (
const zmq::error_t& e)
846 if (e.num() == ETERM)
852 <<
"] Client topic subscribe thread about to exit for ZMQ " 858 "internalTopicSubscribeThread: ZMQ error: " << e.what());
861 catch (
const std::exception& e)
864 "internalTopicSubscribeThread: Exception: " 865 << mrpt::exception_to_str(e));
868 "[" <<
nodeName_ <<
"] Client topic subscribe thread quitted.");
#define ASSERT_EQUAL_(__A, __B)
void internalTopicSubscribeThread(internal::InfoPerSubscribedTopic &ipt)
void internalTopicUpdatesThread()
std::thread topicUpdatesThread_
void setName(const std::string &nodeName)
void doAdvertiseService(const std::string &serviceName, const google::protobuf::Descriptor *descIn, const google::protobuf::Descriptor *descOut, service_callback_t callback)
std::unique_ptr< ZMQImpl > zmq_
#define THROW_EXCEPTION(msg)
#define THROW_EXCEPTION_FMT(_FORMAT_STRING,...)
std::function< void(const zmq::message_t &)> topic_callback_t
#define MRPT_LOG_WARN_STREAM(__CONTENTS)
std::vector< InfoPerNode > requestListOfNodes()
void publishTopic(const std::string &topicName, const google::protobuf::Message &msg)
#define MRPT_LOG_INFO_STREAM(__CONTENTS)
#define MRPT_LOG_DEBUG(_STRING)
#define MRPT_LOG_DEBUG_STREAM(__CONTENTS)
#define MRPT_LOG_DEBUG_FMT(_FMT_STRING,...)
std::string serverHostAddress_
void callService(const std::string &serviceName, const INPUT_MSG_T &input, OUTPUT_MSG_T &output)
std::string BASE_IMPEXP format(const char *fmt,...) MRPT_printf_format_check(1
std::vector< InfoPerTopic > requestListOfTopics()
void doAdvertiseTopic(const std::string &topicName, const google::protobuf::Descriptor *descriptor)
void internalServiceServingThread()
constexpr unsigned int MVSIM_PORTNO_MAIN_REP
std::thread serviceInvokerThread_
void doCallService(const std::string &serviceName, const std::string &inputSerializedMsg, mrpt::optional_ref< google::protobuf::Message > outputMsg, mrpt::optional_ref< std::string > outputSerializedMsg=std::nullopt, mrpt::optional_ref< std::string > outputMsgTypeName=std::nullopt)
#define MRPT_LOG_ERROR_STREAM(__CONTENTS)
void doUnregisterClient()
#define ASSERTMSG_(f, __ERROR_MSG)
void doSubscribeTopic(const std::string &topicName, const google::protobuf::Descriptor *descriptor, const topic_callback_t &callback)
std::function< std::shared_ptr< google::protobuf::Message >(const std::string &)> service_callback_t