10 #include <mrpt/core/exceptions.h> 11 #include <mrpt/core/lock_helper.h> 12 #include <mrpt/version.h> 17 #if MRPT_VERSION >= 0x204 18 #include <mrpt/system/thread_name.h> 23 #include <shared_mutex> 25 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 27 #include <google/protobuf/text_format.h> 28 #include <mvsim/mvsim-msgs/AdvertiseServiceRequest.pb.h> 29 #include <mvsim/mvsim-msgs/AdvertiseTopicRequest.pb.h> 30 #include <mvsim/mvsim-msgs/CallService.pb.h> 31 #include <mvsim/mvsim-msgs/GenericAnswer.pb.h> 32 #include <mvsim/mvsim-msgs/GetServiceInfoAnswer.pb.h> 33 #include <mvsim/mvsim-msgs/GetServiceInfoRequest.pb.h> 34 #include <mvsim/mvsim-msgs/ListNodesAnswer.pb.h> 35 #include <mvsim/mvsim-msgs/ListNodesRequest.pb.h> 36 #include <mvsim/mvsim-msgs/ListTopicsAnswer.pb.h> 37 #include <mvsim/mvsim-msgs/ListTopicsRequest.pb.h> 38 #include <mvsim/mvsim-msgs/RegisterNodeAnswer.pb.h> 39 #include <mvsim/mvsim-msgs/RegisterNodeRequest.pb.h> 40 #include <mvsim/mvsim-msgs/SubscribeAnswer.pb.h> 41 #include <mvsim/mvsim-msgs/SubscribeRequest.pb.h> 42 #include <mvsim/mvsim-msgs/UnregisterNodeRequest.pb.h> 48 using namespace mvsim;
50 #if defined(MVSIM_HAS_ZMQ) 53 struct InfoPerAdvertisedTopic
55 InfoPerAdvertisedTopic(zmq::context_t& c) : context(c) {}
57 zmq::context_t& context;
59 std::string topicName;
60 zmq::socket_t pubSocket = zmq::socket_t(context, ZMQ_PUB);
62 const google::protobuf::Descriptor* descriptor =
nullptr;
67 InfoPerService() =
default;
69 std::string serviceName;
70 const google::protobuf::Descriptor* descInput =
nullptr;
71 const google::protobuf::Descriptor* descOutput =
nullptr;
74 struct InfoPerSubscribedTopic
76 InfoPerSubscribedTopic(zmq::context_t& c) : context(c) {}
77 ~InfoPerSubscribedTopic()
79 if (topicThread.joinable()) topicThread.join();
82 zmq::context_t& context;
84 std::string topicName;
85 zmq::socket_t subSocket = zmq::socket_t(context, ZMQ_SUB);
86 const google::protobuf::Descriptor* descriptor =
nullptr;
88 std::vector<Client::topic_callback_t> callbacks;
90 std::thread topicThread;
97 #if defined(MVSIM_HAS_ZMQ) 98 zmq::context_t context{2 , ZMQ_MAX_SOCKETS_DFLT};
99 std::optional<zmq::socket_t> mainReqSocket;
100 std::recursive_mutex mainReqSocketMtx;
101 mvsim::SocketMonitor mainReqSocketMonitor;
103 std::map<std::string, internal::InfoPerAdvertisedTopic> advertisedTopics;
104 std::shared_mutex advertisedTopics_mtx;
106 std::optional<zmq::socket_t> srvListenSocket;
107 std::map<std::string, internal::InfoPerService> offeredServices;
108 std::shared_mutex offeredServices_mtx;
110 std::map<std::string, internal::InfoPerSubscribedTopic> subscribedTopics;
111 std::shared_mutex subscribedTopics_mtx;
113 std::optional<zmq::socket_t> topicNotificationsSocket;
114 std::string topicNotificationsEndPoint;
120 :
mrpt::system::COutputLogger(
"mvsim::Client"),
132 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 133 return zmq_->mainReqSocketMonitor.connected();
141 using namespace std::string_literals;
143 !
zmq_->mainReqSocket || !
zmq_->mainReqSocket,
144 "Client is already running.");
146 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 148 mrpt::system::CTimeLoggerEntry tle(
profiler_,
"connect");
150 auto lck = mrpt::lockHelper(
zmq_->mainReqSocketMtx);
152 zmq_->mainReqSocket.emplace(
zmq_->context, ZMQ_REQ);
155 zmq_->mainReqSocketMonitor.monitor(
zmq_->mainReqSocket.value());
157 zmq_->mainReqSocket->connect(
165 zmq_->srvListenSocket.emplace(
zmq_->context, ZMQ_REP);
166 zmq_->srvListenSocket->bind(
"tcp://0.0.0.0:*"s);
168 if (!
zmq_->srvListenSocket)
169 THROW_EXCEPTION(
"Error binding service listening socket.");
173 "Client service thread is already running!");
177 #if MRPT_VERSION >= 0x204 178 mrpt::system::thread_name(
"services_"s +
nodeName_, serviceInvokerThread_);
182 zmq_->topicNotificationsSocket.emplace(
zmq_->context, ZMQ_PAIR);
183 zmq_->topicNotificationsSocket->bind(
"tcp://0.0.0.0:*"s);
185 if (!
zmq_->topicNotificationsSocket)
186 THROW_EXCEPTION(
"Error binding topic updates listening socket.");
188 zmq_->topicNotificationsEndPoint =
189 get_zmq_endpoint(*
zmq_->topicNotificationsSocket);
193 "Client topic updates thread is already running!");
197 #if MRPT_VERSION >= 0x204 198 mrpt::system::thread_name(
199 "topicUpdates_"s +
nodeName_, topicUpdatesThread_);
204 "MVSIM needs building with ZMQ and PROTOBUF to enable " 211 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 212 mrpt::system::CTimeLoggerEntry tle(
profiler_,
"shutdown");
214 auto lck = mrpt::lockHelper(
zmq_->mainReqSocketMtx);
215 if (!
zmq_->mainReqSocket)
return;
219 MRPT_LOG_DEBUG_STREAM(
"Unregistering from server.");
222 catch (
const std::exception& e)
224 MRPT_LOG_ERROR_STREAM(
225 "shutdown: Exception: " << mrpt::exception_to_str(e));
228 #if CPPZMQ_VERSIONZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 4, 0) 229 zmq_->context.shutdown();
232 zmq_ctx_shutdown(
zmq_->context.operator
void*());
237 zmq_->subscribedTopics.clear();
238 zmq_->offeredServices.clear();
245 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 246 mrpt::system::CTimeLoggerEntry tle(
profiler_,
"doRegisterClient");
248 auto lck = mrpt::lockHelper(
zmq_->mainReqSocketMtx);
249 auto&
s = *
zmq_->mainReqSocket;
251 mvsim_msgs::RegisterNodeRequest rnq;
253 mvsim::sendMessage(rnq,
s);
256 const zmq::message_t reply = mvsim::receiveMessage(
s);
258 mvsim_msgs::RegisterNodeAnswer rna;
259 mvsim::parseMessage(reply, rna);
263 "Server did not allow registering node: %s",
264 rna.errormessage().c_str());
266 MRPT_LOG_DEBUG(
"Successfully registered in the server.");
268 THROW_EXCEPTION(
"MVSIM built without ZMQ");
274 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 275 mrpt::system::CTimeLoggerEntry tle(
profiler_,
"doUnregisterClient");
277 auto lck = mrpt::lockHelper(
zmq_->mainReqSocketMtx);
278 if (!
zmq_->mainReqSocket)
return;
279 auto&
s = *
zmq_->mainReqSocket;
281 mvsim_msgs::UnregisterNodeRequest rnq;
283 mvsim::sendMessage(rnq,
s);
286 const zmq::message_t reply = mvsim::receiveMessage(
s);
288 mvsim_msgs::GenericAnswer rna;
289 mvsim::parseMessage(reply, rna);
293 "Server answered an error unregistering node: %s",
294 rna.errormessage().c_str());
296 MRPT_LOG_DEBUG(
"Successfully unregistered in the server.");
298 THROW_EXCEPTION(
"MVSIM built without ZMQ");
304 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 305 mrpt::system::CTimeLoggerEntry tle(
profiler_,
"requestListOfNodes");
307 auto lck = mrpt::lockHelper(
zmq_->mainReqSocketMtx);
308 auto&
s = *
zmq_->mainReqSocket;
310 mvsim_msgs::ListNodesRequest req;
311 mvsim::sendMessage(req,
s);
314 const zmq::message_t reply = mvsim::receiveMessage(
s);
316 mvsim_msgs::ListNodesAnswer lna;
317 mvsim::parseMessage(reply, lna);
319 std::vector<Client::InfoPerNode> nodes;
320 nodes.resize(lna.nodes_size());
322 for (
int i = 0; i < lna.nodes_size(); i++)
324 nodes[i].name = lna.nodes(i);
328 THROW_EXCEPTION(
"MVSIM built without ZMQ");
334 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 335 mrpt::system::CTimeLoggerEntry tle(
profiler_,
"requestListOfTopics");
336 auto lck = mrpt::lockHelper(
zmq_->mainReqSocketMtx);
337 auto&
s = *
zmq_->mainReqSocket;
339 mvsim_msgs::ListTopicsRequest req;
340 mvsim::sendMessage(req,
s);
343 const zmq::message_t reply = mvsim::receiveMessage(
s);
345 mvsim_msgs::ListTopicsAnswer lta;
346 mvsim::parseMessage(reply, lta);
348 std::vector<Client::InfoPerTopic> topics;
349 topics.resize(lta.topics_size());
351 for (
int i = 0; i < lta.topics_size(); i++)
353 const auto&
t = lta.topics(i);
354 auto& dst = topics[i];
356 dst.name =
t.topicname();
357 dst.type =
t.topictype();
359 ASSERT_EQUAL_(
t.publisherendpoint_size(),
t.publishername_size());
360 dst.endpoints.resize(
t.publisherendpoint_size());
361 dst.publishers.resize(
t.publisherendpoint_size());
363 for (
int k = 0; k <
t.publisherendpoint_size(); k++)
365 dst.publishers[k] =
t.publishername(k);
366 dst.endpoints[k] =
t.publisherendpoint(k);
371 THROW_EXCEPTION(
"MVSIM built without ZMQ");
376 const std::string& topicName,
377 const google::protobuf::Descriptor* descriptor)
379 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 380 mrpt::system::CTimeLoggerEntry tle(
profiler_,
"doAdvertiseTopic");
381 auto& advTopics =
zmq_->advertisedTopics;
383 std::unique_lock<std::shared_mutex> lck(
zmq_->advertisedTopics_mtx);
385 if (advTopics.find(topicName) != advTopics.end())
387 "Topic `%s` already registered for publication in this same " 393 internal::InfoPerAdvertisedTopic& ipat =
394 advTopics.emplace_hint(advTopics.begin(), topicName,
zmq_->context)
400 ipat.pubSocket.bind(
"tcp://0.0.0.0:*");
402 #if CPPZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 7, 1) 405 if (!ipat.pubSocket.connected())
408 THROW_EXCEPTION(
"Could not bind publisher socket");
412 ipat.endpoint = get_zmq_endpoint(ipat.pubSocket);
413 ipat.topicName = topicName;
414 ipat.descriptor = descriptor;
417 "Advertising topic `%s` [%s] on endpoint `%s`", topicName.c_str(),
418 descriptor->full_name().c_str(), ipat.endpoint.c_str());
422 mvsim_msgs::AdvertiseTopicRequest req;
423 req.set_topicname(ipat.topicName);
424 req.set_endpoint(ipat.endpoint);
425 req.set_topictypename(ipat.descriptor->full_name());
428 auto lckMain = mrpt::lockHelper(
zmq_->mainReqSocketMtx);
429 mvsim::sendMessage(req, *
zmq_->mainReqSocket);
432 const zmq::message_t reply = mvsim::receiveMessage(*
zmq_->mainReqSocket);
435 mvsim_msgs::GenericAnswer ans;
436 mvsim::parseMessage(reply, ans);
440 "Error registering topic `%s` in server: `%s`", topicName.c_str(),
441 ans.errormessage().c_str());
444 THROW_EXCEPTION(
"MVSIM built without ZMQ & PROTOBUF");
449 const std::string& serviceName,
const google::protobuf::Descriptor* descIn,
452 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 453 mrpt::system::CTimeLoggerEntry tle(
profiler_,
"doAdvertiseService");
454 std::unique_lock<std::shared_mutex> lck(
zmq_->offeredServices_mtx);
456 auto& services =
zmq_->offeredServices;
458 if (services.find(serviceName) != services.end())
460 "Service `%s` already registered in this same client!",
461 serviceName.c_str());
463 internal::InfoPerService& ips = services[serviceName];
468 const auto assignedPort = mvsim::get_zmq_endpoint(*
zmq_->srvListenSocket);
470 ips.serviceName = serviceName;
471 ips.callback = callback;
472 ips.descInput = descIn;
473 ips.descOutput = descOut;
476 "Advertising service `%s` [%s->%s] on endpoint `%s`",
477 serviceName.c_str(), descIn->full_name().c_str(),
478 descOut->full_name().c_str(), assignedPort.c_str());
480 mvsim_msgs::AdvertiseServiceRequest req;
481 req.set_servicename(ips.serviceName);
482 req.set_endpoint(assignedPort);
483 req.set_inputtypename(ips.descInput->full_name());
484 req.set_outputtypename(ips.descOutput->full_name());
487 auto lckMain = mrpt::lockHelper(
zmq_->mainReqSocketMtx);
488 mvsim::sendMessage(req, *
zmq_->mainReqSocket);
491 const zmq::message_t reply = mvsim::receiveMessage(*
zmq_->mainReqSocket);
493 mvsim_msgs::GenericAnswer ans;
494 mvsim::parseMessage(reply, ans);
498 "Error registering service `%s` in server: `%s`",
499 serviceName.c_str(), ans.errormessage().c_str());
502 THROW_EXCEPTION(
"MVSIM built without ZMQ & PROTOBUF");
507 const std::string& topicName,
const google::protobuf::Message& msg)
510 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 513 "Client not connected to Server");
514 mrpt::system::CTimeLoggerEntry tle(
profiler_,
"publishTopic");
516 std::shared_lock<std::shared_mutex> lck(
zmq_->advertisedTopics_mtx);
517 auto itIpat =
zmq_->advertisedTopics.find(topicName);
520 itIpat !=
zmq_->advertisedTopics.end(),
522 "Topic `%s` has not been registered. Missing former call to " 528 auto& ipat = itIpat->second;
531 msg.GetDescriptor() == ipat.descriptor,
533 "Topic `%s` has type `%s`, but expected `%s` from former call to " 535 topicName.c_str(), msg.GetDescriptor()->name().c_str(),
536 ipat.descriptor->name().c_str()));
538 #if CPPZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 7, 1) 539 ASSERT_(ipat.pubSocket);
541 ASSERT_(ipat.pubSocket.connected());
544 mvsim::sendMessage(msg, ipat.pubSocket);
547 std::cout <<
"Published on topic " << topicName << std::endl;
551 THROW_EXCEPTION(
"MVSIM built without ZMQ & PROTOBUF");
558 using namespace std::string_literals;
560 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 563 MRPT_LOG_INFO_STREAM(
564 "[" <<
nodeName_ <<
"] Client service thread started.");
566 zmq::socket_t&
s = *
zmq_->srvListenSocket;
571 zmq::message_t m = mvsim::receiveMessage(s);
574 mvsim_msgs::CallService csMsg;
575 mvsim::parseMessage(m, csMsg);
577 std::shared_lock<std::shared_mutex> lck(
zmq_->offeredServices_mtx);
578 const auto& srvName = csMsg.servicename();
580 auto itSrv =
zmq_->offeredServices.find(srvName);
581 if (itSrv ==
zmq_->offeredServices.end())
584 mvsim_msgs::GenericAnswer ans;
585 ans.set_success(
false);
586 ans.set_errormessage(mrpt::format(
587 "Requested unknown service `%s`", srvName.c_str()));
588 MRPT_LOG_ERROR_STREAM(ans.errormessage());
590 mvsim::sendMessage(ans, s);
594 internal::InfoPerService& ips = itSrv->second;
598 auto outMsgPtr = ips.callback(csMsg.serializedinput());
601 mvsim::sendMessage(*outMsgPtr, s);
604 catch (
const zmq::error_t& e)
606 if (e.num() == ETERM)
610 MRPT_LOG_DEBUG_STREAM(
611 "internalServiceServingThread about to exit for ZMQ term " 616 MRPT_LOG_ERROR_STREAM(
617 "internalServiceServingThread: ZMQ error: " << e.what());
620 catch (
const std::exception& e)
622 MRPT_LOG_ERROR_STREAM(
623 "internalServiceServingThread: Exception: " 624 << mrpt::exception_to_str(e));
626 MRPT_LOG_DEBUG_STREAM(
"internalServiceServingThread quitted.");
633 using namespace std::string_literals;
635 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 638 MRPT_LOG_DEBUG_STREAM(
639 "[" <<
nodeName_ <<
"] Client topic updates thread started.");
641 zmq::socket_t&
s = *
zmq_->topicNotificationsSocket;
646 zmq::message_t m = mvsim::receiveMessage(s);
649 mvsim_msgs::TopicInfo tiMsg;
650 mvsim::parseMessage(m, tiMsg);
653 mvsim_msgs::GenericAnswer ans;
654 ans.set_success(
true);
655 mvsim::sendMessage(ans, s);
663 std::unique_lock<std::shared_mutex> lck(
zmq_->subscribedTopics_mtx);
664 const auto& topicName = tiMsg.topicname();
666 auto itTopic =
zmq_->subscribedTopics.find(topicName);
667 if (itTopic ==
zmq_->subscribedTopics.end())
670 MRPT_LOG_WARN_STREAM(
673 <<
"` update message from server, but this node is not " 674 "subscribed to it (!).");
678 MRPT_LOG_DEBUG_STREAM(
679 "[internalTopicUpdatesThread] Received: " 680 << tiMsg.DebugString());
682 internal::InfoPerSubscribedTopic& ipt = itTopic->second;
684 for (
int i = 0; i < tiMsg.publisherendpoint_size(); i++)
686 ipt.subSocket.connect(tiMsg.publisherendpoint(i));
692 catch (
const zmq::error_t& e)
694 if (e.num() == ETERM)
698 MRPT_LOG_DEBUG_STREAM(
699 "internalTopicUpdatesThread about to exit for ZMQ term " 704 MRPT_LOG_ERROR_STREAM(
705 "internalTopicUpdatesThread: ZMQ error: " << e.what());
708 catch (
const std::exception& e)
710 MRPT_LOG_ERROR_STREAM(
711 "internalTopicUpdatesThread: Exception: " 712 << mrpt::exception_to_str(e));
714 MRPT_LOG_DEBUG_STREAM(
715 "[" <<
nodeName_ <<
"] Client topic updates thread quitted.");
722 const std::string& serviceName,
const std::string& inputSerializedMsg)
725 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 726 mrpt::system::CTimeLoggerEntry tle(
profiler_,
"callService");
728 std::string outMsgData, outMsgType;
730 serviceName, inputSerializedMsg, std::nullopt, outMsgData, outMsgType);
737 const std::string& topicName,
738 const std::function<
void(
740 const std::vector<uint8_t>& )>& callback)
743 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 746 const auto [sType, sData] = mvsim::internal::parseMessageToParts(m);
747 std::vector<uint8_t>
d(sData.size());
748 ::memcpy(
d.data(), sData.data(), sData.size());
756 const std::string& serviceName,
const std::string& inputSerializedMsg,
757 mrpt::optional_ref<google::protobuf::Message> outputMsg,
758 mrpt::optional_ref<std::string> outputSerializedMsg,
759 mrpt::optional_ref<std::string> outputMsgTypeName)
762 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 763 mrpt::system::CTimeLoggerEntry tle(
profiler_,
"doCallService");
766 std::string srvEndpoint;
772 srvEndpoint = it->second;
776 mrpt::system::CTimeLoggerEntry tle2(
profiler_,
"doCallService.getinfo");
778 auto lckMain = mrpt::lockHelper(
zmq_->mainReqSocketMtx);
779 zmq::socket_t&
s = *
zmq_->mainReqSocket;
781 mvsim_msgs::GetServiceInfoRequest gsi;
782 gsi.set_servicename(serviceName);
783 mvsim::sendMessage(gsi, s);
785 auto m = mvsim::receiveMessage(s);
786 mvsim_msgs::GetServiceInfoAnswer gsia;
787 mvsim::parseMessage(m, gsia);
791 "Error requesting information about service `%s`: %s",
792 serviceName.c_str(), gsia.errormessage().c_str());
794 srvEndpoint = gsia.serviceendpoint();
801 zmq::socket_t srvReqSock(
zmq_->context, ZMQ_REQ);
802 srvReqSock.connect(srvEndpoint);
804 mvsim_msgs::CallService csMsg;
805 csMsg.set_servicename(serviceName);
806 csMsg.set_serializedinput(inputSerializedMsg);
808 mvsim::sendMessage(csMsg, srvReqSock);
810 const auto m = mvsim::receiveMessage(srvReqSock);
813 mvsim::parseMessage(m, outputMsg.value().get());
815 if (outputSerializedMsg)
817 const auto [typeName, serializedData] =
818 internal::parseMessageToParts(m);
820 outputSerializedMsg.value().get() = serializedData;
821 if (outputMsgTypeName) outputMsgTypeName.value().get() = typeName;
834 const std::string& topicName,
835 [[maybe_unused]]
const google::protobuf::Descriptor* descriptor,
839 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 840 mrpt::system::CTimeLoggerEntry tle(
profiler_,
"doSubscribeTopic");
842 std::unique_lock<std::shared_mutex> lck(
zmq_->subscribedTopics_mtx);
844 auto& topics =
zmq_->subscribedTopics;
847 internal::InfoPerSubscribedTopic& ipt =
848 topics.emplace_hint(topics.begin(), topicName,
zmq_->context)->second;
851 #if CPPZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 7, 1) 852 ipt.subSocket.set(zmq::sockopt::subscribe,
"");
854 ipt.subSocket.setsockopt(ZMQ_SUBSCRIBE,
"", 0);
857 ipt.callbacks.push_back(callback);
859 ipt.topicName = topicName;
867 mvsim_msgs::SubscribeRequest subReq;
868 subReq.set_topic(topicName);
869 subReq.set_updatesendpoint(
zmq_->topicNotificationsEndPoint);
871 auto lckMain = mrpt::lockHelper(
zmq_->mainReqSocketMtx);
872 mvsim::sendMessage(subReq, *
zmq_->mainReqSocket);
874 const auto m = mvsim::receiveMessage(*
zmq_->mainReqSocket);
876 mvsim_msgs::SubscribeAnswer subAns;
877 mvsim::parseMessage(m, subAns);
879 ASSERT_EQUAL_(subAns.topic(), topicName);
880 ASSERT_(subAns.success());
891 using namespace std::string_literals;
893 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 896 MRPT_LOG_DEBUG_STREAM(
897 "[" <<
nodeName_ <<
"] Client topic subscribe thread for `" 898 << ipt.topicName <<
"` started.");
900 zmq::socket_t&
s = ipt.subSocket;
905 const zmq::message_t m = mvsim::receiveMessage(s);
910 for (
const auto& callback : ipt.callbacks) callback(m);
912 catch (
const std::exception& e)
914 MRPT_LOG_ERROR_STREAM(
915 "Exception in topic `" 917 <<
"` subscription callback:" << mrpt::exception_to_str(e));
921 catch (
const zmq::error_t& e)
923 if (e.num() == ETERM)
927 MRPT_LOG_DEBUG_STREAM(
929 <<
"] Client topic subscribe thread about to exit for ZMQ " 934 MRPT_LOG_ERROR_STREAM(
935 "internalTopicSubscribeThread: ZMQ error: " << e.what());
938 catch (
const std::exception& e)
940 MRPT_LOG_ERROR_STREAM(
941 "internalTopicSubscribeThread: Exception: " 942 << mrpt::exception_to_str(e));
944 MRPT_LOG_DEBUG_STREAM(
945 "[" <<
nodeName_ <<
"] Client topic subscribe thread quitted.");
mrpt::system::CTimeLogger profiler_
void internalTopicSubscribeThread(internal::InfoPerSubscribedTopic &ipt)
void internalTopicUpdatesThread()
std::thread topicUpdatesThread_
void setName(const std::string &nodeName)
void doAdvertiseService(const std::string &serviceName, const google::protobuf::Descriptor *descIn, const google::protobuf::Descriptor *descOut, service_callback_t callback)
std::unique_ptr< ZMQImpl > zmq_
std::map< std::string, std::string > serviceToEndPointCache_
void subscribeTopic(const std::string &topicName, const std::function< void(const MSG_T &)> &callback)
std::function< void(const zmq::message_t &)> topic_callback_t
std::vector< InfoPerNode > requestListOfNodes()
void publishTopic(const std::string &topicName, const google::protobuf::Message &msg)
geometry_msgs::TransformStamped t
std::string serverHostAddress_
void callService(const std::string &serviceName, const INPUT_MSG_T &input, OUTPUT_MSG_T &output)
std::vector< InfoPerTopic > requestListOfTopics()
void doAdvertiseTopic(const std::string &topicName, const google::protobuf::Descriptor *descriptor)
void internalServiceServingThread()
constexpr unsigned int MVSIM_PORTNO_MAIN_REP
std::thread serviceInvokerThread_
void doCallService(const std::string &serviceName, const std::string &inputSerializedMsg, mrpt::optional_ref< google::protobuf::Message > outputMsg, mrpt::optional_ref< std::string > outputSerializedMsg=std::nullopt, mrpt::optional_ref< std::string > outputMsgTypeName=std::nullopt)
void doUnregisterClient()
void doSubscribeTopic(const std::string &topicName, const google::protobuf::Descriptor *descriptor, const topic_callback_t &callback)
void subscribe_topic_raw(const std::string &topicName, const topic_callback_t &callback)
std::function< std::shared_ptr< google::protobuf::Message >(const std::string &)> service_callback_t
std::mutex serviceToEndPointCacheMtx_