10 #include <mrpt/core/exceptions.h>
11 #include <mrpt/core/lock_helper.h>
12 #include <mrpt/version.h>
17 #if MRPT_VERSION >= 0x204
18 #include <mrpt/system/thread_name.h>
23 #include <shared_mutex>
25 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
27 #include <google/protobuf/text_format.h>
28 #include <mvsim/mvsim-msgs/AdvertiseServiceRequest.pb.h>
29 #include <mvsim/mvsim-msgs/AdvertiseTopicRequest.pb.h>
30 #include <mvsim/mvsim-msgs/CallService.pb.h>
31 #include <mvsim/mvsim-msgs/GenericAnswer.pb.h>
32 #include <mvsim/mvsim-msgs/GetServiceInfoAnswer.pb.h>
33 #include <mvsim/mvsim-msgs/GetServiceInfoRequest.pb.h>
34 #include <mvsim/mvsim-msgs/ListNodesAnswer.pb.h>
35 #include <mvsim/mvsim-msgs/ListNodesRequest.pb.h>
36 #include <mvsim/mvsim-msgs/ListTopicsAnswer.pb.h>
37 #include <mvsim/mvsim-msgs/ListTopicsRequest.pb.h>
38 #include <mvsim/mvsim-msgs/RegisterNodeAnswer.pb.h>
39 #include <mvsim/mvsim-msgs/RegisterNodeRequest.pb.h>
40 #include <mvsim/mvsim-msgs/SubscribeAnswer.pb.h>
41 #include <mvsim/mvsim-msgs/SubscribeRequest.pb.h>
42 #include <mvsim/mvsim-msgs/UnregisterNodeRequest.pb.h>
48 using namespace mvsim;
50 #if defined(MVSIM_HAS_ZMQ)
53 struct InfoPerAdvertisedTopic
55 InfoPerAdvertisedTopic(zmq::context_t& c) : context(c) {}
57 zmq::context_t& context;
59 std::string topicName;
60 zmq::socket_t pubSocket = zmq::socket_t(context, ZMQ_PUB);
62 const google::protobuf::Descriptor* descriptor =
nullptr;
67 InfoPerService() =
default;
69 std::string serviceName;
70 const google::protobuf::Descriptor* descInput =
nullptr;
71 const google::protobuf::Descriptor* descOutput =
nullptr;
74 struct InfoPerSubscribedTopic
76 InfoPerSubscribedTopic(zmq::context_t& c) : context(c) {}
77 ~InfoPerSubscribedTopic()
79 if (topicThread.joinable()) topicThread.join();
82 zmq::context_t& context;
84 std::string topicName;
85 zmq::socket_t subSocket = zmq::socket_t(context, ZMQ_SUB);
86 const google::protobuf::Descriptor* descriptor =
nullptr;
88 std::vector<Client::topic_callback_t> callbacks;
90 std::thread topicThread;
97 #if defined(MVSIM_HAS_ZMQ)
98 zmq::context_t context{2 , ZMQ_MAX_SOCKETS_DFLT};
99 std::optional<zmq::socket_t> mainReqSocket;
100 std::recursive_mutex mainReqSocketMtx;
101 mvsim::SocketMonitor mainReqSocketMonitor;
103 std::map<std::string, internal::InfoPerAdvertisedTopic> advertisedTopics;
104 std::shared_mutex advertisedTopics_mtx;
106 std::optional<zmq::socket_t> srvListenSocket;
107 std::map<std::string, internal::InfoPerService> offeredServices;
108 std::shared_mutex offeredServices_mtx;
110 std::map<std::string, internal::InfoPerSubscribedTopic> subscribedTopics;
111 std::shared_mutex subscribedTopics_mtx;
113 std::optional<zmq::socket_t> topicNotificationsSocket;
114 std::string topicNotificationsEndPoint;
131 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
132 return zmq_->mainReqSocketMonitor.connected();
140 using namespace std::string_literals;
141 ASSERTMSG_(!
zmq_->mainReqSocket || !
zmq_->mainReqSocket,
"Client is already running.");
143 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
145 mrpt::system::CTimeLoggerEntry tle(
profiler_,
"connect");
147 auto lck = mrpt::lockHelper(
zmq_->mainReqSocketMtx);
149 zmq_->mainReqSocket.emplace(
zmq_->context, ZMQ_REQ);
152 zmq_->mainReqSocketMonitor.monitor(
zmq_->mainReqSocket.value());
154 zmq_->mainReqSocket->connect(
161 zmq_->srvListenSocket.emplace(
zmq_->context, ZMQ_REP);
162 zmq_->srvListenSocket->bind(
"tcp://0.0.0.0:*"s);
164 if (!
zmq_->srvListenSocket) THROW_EXCEPTION(
"Error binding service listening socket.");
169 #if MRPT_VERSION >= 0x204
174 zmq_->topicNotificationsSocket.emplace(
zmq_->context, ZMQ_PAIR);
175 zmq_->topicNotificationsSocket->bind(
"tcp://0.0.0.0:*"s);
177 if (!
zmq_->topicNotificationsSocket)
178 THROW_EXCEPTION(
"Error binding topic updates listening socket.");
180 zmq_->topicNotificationsEndPoint = get_zmq_endpoint(*
zmq_->topicNotificationsSocket);
182 ASSERTMSG_(!
topicUpdatesThread_.joinable(),
"Client topic updates thread is already running!");
185 #if MRPT_VERSION >= 0x204
191 "MVSIM needs building with ZMQ and PROTOBUF to enable "
198 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
199 mrpt::system::CTimeLoggerEntry tle(
profiler_,
"shutdown");
201 auto lck = mrpt::lockHelper(
zmq_->mainReqSocketMtx);
202 if (!
zmq_->mainReqSocket)
return;
206 MRPT_LOG_DEBUG_STREAM(
"Unregistering from server.");
209 catch (
const std::exception& e)
211 MRPT_LOG_ERROR_STREAM(
"shutdown: Exception: " << mrpt::exception_to_str(e));
214 #if CPPZMQ_VERSIONZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 4, 0)
215 zmq_->context.shutdown();
218 zmq_ctx_shutdown(
zmq_->context.operator
void*());
223 zmq_->subscribedTopics.clear();
224 zmq_->offeredServices.clear();
231 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
232 mrpt::system::CTimeLoggerEntry tle(
profiler_,
"doRegisterClient");
234 auto lck = mrpt::lockHelper(
zmq_->mainReqSocketMtx);
235 auto&
s = *
zmq_->mainReqSocket;
237 mvsim_msgs::RegisterNodeRequest rnq;
239 mvsim::sendMessage(rnq,
s);
242 const zmq::message_t reply = mvsim::receiveMessage(
s);
244 mvsim_msgs::RegisterNodeAnswer rna;
245 mvsim::parseMessage(reply, rna);
249 "Server did not allow registering node: %s", rna.errormessage().c_str());
251 MRPT_LOG_DEBUG(
"Successfully registered in the server.");
253 THROW_EXCEPTION(
"MVSIM built without ZMQ");
259 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
260 mrpt::system::CTimeLoggerEntry tle(
profiler_,
"doUnregisterClient");
262 auto lck = mrpt::lockHelper(
zmq_->mainReqSocketMtx);
263 if (!
zmq_->mainReqSocket)
return;
264 auto&
s = *
zmq_->mainReqSocket;
266 mvsim_msgs::UnregisterNodeRequest rnq;
268 mvsim::sendMessage(rnq,
s);
271 const zmq::message_t reply = mvsim::receiveMessage(
s);
273 mvsim_msgs::GenericAnswer rna;
274 mvsim::parseMessage(reply, rna);
278 "Server answered an error unregistering node: %s", rna.errormessage().c_str());
280 MRPT_LOG_DEBUG(
"Successfully unregistered in the server.");
282 THROW_EXCEPTION(
"MVSIM built without ZMQ");
288 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
289 mrpt::system::CTimeLoggerEntry tle(
profiler_,
"requestListOfNodes");
291 auto lck = mrpt::lockHelper(
zmq_->mainReqSocketMtx);
292 auto&
s = *
zmq_->mainReqSocket;
294 mvsim_msgs::ListNodesRequest req;
295 mvsim::sendMessage(req,
s);
298 const zmq::message_t reply = mvsim::receiveMessage(
s);
300 mvsim_msgs::ListNodesAnswer lna;
301 mvsim::parseMessage(reply, lna);
303 std::vector<Client::InfoPerNode> nodes;
304 nodes.resize(lna.nodes_size());
306 for (
int i = 0; i < lna.nodes_size(); i++)
308 nodes[i].name = lna.nodes(i);
312 THROW_EXCEPTION(
"MVSIM built without ZMQ");
318 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
319 mrpt::system::CTimeLoggerEntry tle(
profiler_,
"requestListOfTopics");
320 auto lck = mrpt::lockHelper(
zmq_->mainReqSocketMtx);
321 auto&
s = *
zmq_->mainReqSocket;
323 mvsim_msgs::ListTopicsRequest req;
324 mvsim::sendMessage(req,
s);
327 const zmq::message_t reply = mvsim::receiveMessage(
s);
329 mvsim_msgs::ListTopicsAnswer lta;
330 mvsim::parseMessage(reply, lta);
332 std::vector<Client::InfoPerTopic> topics;
333 topics.resize(lta.topics_size());
335 for (
int i = 0; i < lta.topics_size(); i++)
337 const auto&
t = lta.topics(i);
338 auto& dst = topics[i];
340 dst.name =
t.topicname();
341 dst.type =
t.topictype();
343 ASSERT_EQUAL_(
t.publisherendpoint_size(),
t.publishername_size());
344 dst.endpoints.resize(
t.publisherendpoint_size());
345 dst.publishers.resize(
t.publisherendpoint_size());
347 for (
int k = 0; k <
t.publisherendpoint_size(); k++)
349 dst.publishers[k] =
t.publishername(k);
350 dst.endpoints[k] =
t.publisherendpoint(k);
355 THROW_EXCEPTION(
"MVSIM built without ZMQ");
360 const std::string& topicName,
const google::protobuf::Descriptor* descriptor)
362 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
363 mrpt::system::CTimeLoggerEntry tle(
profiler_,
"doAdvertiseTopic");
364 auto& advTopics =
zmq_->advertisedTopics;
366 std::unique_lock<std::shared_mutex> lck(
zmq_->advertisedTopics_mtx);
368 if (advTopics.find(topicName) != advTopics.end())
370 "Topic `%s` already registered for publication in this same "
376 internal::InfoPerAdvertisedTopic& ipat =
377 advTopics.emplace_hint(advTopics.begin(), topicName,
zmq_->context)->second;
382 ipat.pubSocket.bind(
"tcp://0.0.0.0:*");
384 #if CPPZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 7, 1)
387 if (!ipat.pubSocket.connected())
390 THROW_EXCEPTION(
"Could not bind publisher socket");
394 ipat.endpoint = get_zmq_endpoint(ipat.pubSocket);
395 ipat.topicName = topicName;
396 ipat.descriptor = descriptor;
399 "Advertising topic `%s` [%s] on endpoint `%s`", topicName.c_str(),
400 descriptor->full_name().c_str(), ipat.endpoint.c_str());
404 mvsim_msgs::AdvertiseTopicRequest req;
405 req.set_topicname(ipat.topicName);
406 req.set_endpoint(ipat.endpoint);
407 req.set_topictypename(ipat.descriptor->full_name());
410 auto lckMain = mrpt::lockHelper(
zmq_->mainReqSocketMtx);
411 mvsim::sendMessage(req, *
zmq_->mainReqSocket);
414 const zmq::message_t reply = mvsim::receiveMessage(*
zmq_->mainReqSocket);
417 mvsim_msgs::GenericAnswer ans;
418 mvsim::parseMessage(reply, ans);
422 "Error registering topic `%s` in server: `%s`", topicName.c_str(),
423 ans.errormessage().c_str());
426 THROW_EXCEPTION(
"MVSIM built without ZMQ & PROTOBUF");
431 const std::string& serviceName,
const google::protobuf::Descriptor* descIn,
434 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
435 mrpt::system::CTimeLoggerEntry tle(
profiler_,
"doAdvertiseService");
436 std::unique_lock<std::shared_mutex> lck(
zmq_->offeredServices_mtx);
438 auto& services =
zmq_->offeredServices;
440 if (services.find(serviceName) != services.end())
442 "Service `%s` already registered in this same client!", serviceName.c_str());
449 const auto assignedPort = mvsim::get_zmq_endpoint(*
zmq_->srvListenSocket);
451 ips.serviceName = serviceName;
452 ips.callback = callback;
453 ips.descInput = descIn;
454 ips.descOutput = descOut;
457 "Advertising service `%s` [%s->%s] on endpoint `%s`", serviceName.c_str(),
458 descIn->full_name().c_str(), descOut->full_name().c_str(), assignedPort.c_str());
460 mvsim_msgs::AdvertiseServiceRequest req;
461 req.set_servicename(ips.serviceName);
462 req.set_endpoint(assignedPort);
463 req.set_inputtypename(ips.descInput->full_name());
464 req.set_outputtypename(ips.descOutput->full_name());
467 auto lckMain = mrpt::lockHelper(
zmq_->mainReqSocketMtx);
468 mvsim::sendMessage(req, *
zmq_->mainReqSocket);
471 const zmq::message_t reply = mvsim::receiveMessage(*
zmq_->mainReqSocket);
473 mvsim_msgs::GenericAnswer ans;
474 mvsim::parseMessage(reply, ans);
478 "Error registering service `%s` in server: `%s`", serviceName.c_str(),
479 ans.errormessage().c_str());
482 THROW_EXCEPTION(
"MVSIM built without ZMQ & PROTOBUF");
489 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
491 zmq_ &&
zmq_->mainReqSocket &&
zmq_->mainReqSocket,
"Client not connected to Server");
492 mrpt::system::CTimeLoggerEntry tle(
profiler_,
"publishTopic");
494 std::shared_lock<std::shared_mutex> lck(
zmq_->advertisedTopics_mtx);
495 auto itIpat =
zmq_->advertisedTopics.find(topicName);
498 itIpat !=
zmq_->advertisedTopics.end(),
500 "Topic `%s` has not been registered. Missing former call to "
506 auto& ipat = itIpat->second;
509 msg.GetDescriptor() == ipat.descriptor,
511 "Topic `%s` has type `%s`, but expected `%s` from former call to "
513 topicName.c_str(), msg.GetDescriptor()->name().c_str(),
514 ipat.descriptor->name().c_str()));
516 #if CPPZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 7, 1)
517 ASSERT_(ipat.pubSocket);
519 ASSERT_(ipat.pubSocket.connected());
522 mvsim::sendMessage(msg, ipat.pubSocket);
525 THROW_EXCEPTION(
"MVSIM built without ZMQ & PROTOBUF");
532 using namespace std::string_literals;
534 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
537 MRPT_LOG_INFO_STREAM(
"[" <<
nodeName_ <<
"] Client service thread started.");
539 zmq::socket_t&
s = *
zmq_->srvListenSocket;
544 zmq::message_t m = mvsim::receiveMessage(
s);
547 mvsim_msgs::CallService csMsg;
548 mvsim::parseMessage(m, csMsg);
550 std::shared_lock<std::shared_mutex> lck(
zmq_->offeredServices_mtx);
551 const auto& srvName = csMsg.servicename();
553 auto itSrv =
zmq_->offeredServices.find(srvName);
554 if (itSrv ==
zmq_->offeredServices.end())
557 mvsim_msgs::GenericAnswer ans;
558 ans.set_success(
false);
559 ans.set_errormessage(
560 mrpt::format(
"Requested unknown service `%s`", srvName.c_str()));
561 MRPT_LOG_ERROR_STREAM(ans.errormessage());
563 mvsim::sendMessage(ans,
s);
567 internal::InfoPerService& ips = itSrv->second;
571 auto outMsgPtr = ips.callback(csMsg.serializedinput());
574 mvsim::sendMessage(*outMsgPtr,
s);
577 catch (
const zmq::error_t& e)
579 if (e.num() == ETERM)
583 MRPT_LOG_DEBUG_STREAM(
584 "internalServiceServingThread about to exit for ZMQ term "
589 MRPT_LOG_ERROR_STREAM(
"internalServiceServingThread: ZMQ error: " << e.what());
592 catch (
const std::exception& e)
594 MRPT_LOG_ERROR_STREAM(
595 "internalServiceServingThread: Exception: " << mrpt::exception_to_str(e));
597 MRPT_LOG_DEBUG_STREAM(
"internalServiceServingThread quitted.");
604 using namespace std::string_literals;
606 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
609 MRPT_LOG_DEBUG_STREAM(
"[" <<
nodeName_ <<
"] Client topic updates thread started.");
611 zmq::socket_t&
s = *
zmq_->topicNotificationsSocket;
616 zmq::message_t m = mvsim::receiveMessage(
s);
619 mvsim_msgs::TopicInfo tiMsg;
620 mvsim::parseMessage(m, tiMsg);
623 mvsim_msgs::GenericAnswer ans;
624 ans.set_success(
true);
625 mvsim::sendMessage(ans,
s);
633 std::unique_lock<std::shared_mutex> lck(
zmq_->subscribedTopics_mtx);
634 const auto& topicName = tiMsg.topicname();
636 auto itTopic =
zmq_->subscribedTopics.find(topicName);
637 if (itTopic ==
zmq_->subscribedTopics.end())
640 MRPT_LOG_WARN_STREAM(
641 "Received a topic `" << topicName
642 <<
"` update message from server, but this node is not "
643 "subscribed to it (!).");
647 MRPT_LOG_DEBUG_STREAM(
"[internalTopicUpdatesThread] Received: " << tiMsg.DebugString());
649 internal::InfoPerSubscribedTopic& ipt = itTopic->second;
651 for (
int i = 0; i < tiMsg.publisherendpoint_size(); i++)
653 ipt.subSocket.connect(tiMsg.publisherendpoint(i));
659 catch (
const zmq::error_t& e)
661 if (e.num() == ETERM)
665 MRPT_LOG_DEBUG_STREAM(
666 "internalTopicUpdatesThread about to exit for ZMQ term "
671 MRPT_LOG_ERROR_STREAM(
"internalTopicUpdatesThread: ZMQ error: " << e.what());
674 catch (
const std::exception& e)
676 MRPT_LOG_ERROR_STREAM(
677 "internalTopicUpdatesThread: Exception: " << mrpt::exception_to_str(e));
679 MRPT_LOG_DEBUG_STREAM(
"[" <<
nodeName_ <<
"] Client topic updates thread quitted.");
686 const std::string& serviceName,
const std::string& inputSerializedMsg)
689 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
690 mrpt::system::CTimeLoggerEntry tle(
profiler_,
"callService");
692 std::string outMsgData, outMsgType;
693 doCallService(serviceName, inputSerializedMsg, std::nullopt, outMsgData, outMsgType);
700 const std::string& topicName,
701 const std::function<
void(
702 const std::string& ,
const std::vector<uint8_t>& )>& callback)
705 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
709 [callback](
const zmq::message_t& m)
711 const auto [sType, sData] = mvsim::internal::parseMessageToParts(m);
712 std::vector<uint8_t>
d(sData.size());
713 ::memcpy(
d.data(), sData.data(), sData.size());
721 const std::string& serviceName,
const std::string& inputSerializedMsg,
722 mrpt::optional_ref<google::protobuf::Message> outputMsg,
723 mrpt::optional_ref<std::string> outputSerializedMsg,
724 mrpt::optional_ref<std::string> outputMsgTypeName)
727 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
728 mrpt::system::CTimeLoggerEntry tle(
profiler_,
"doCallService");
731 std::string srvEndpoint;
736 srvEndpoint = it->second;
740 mrpt::system::CTimeLoggerEntry tle2(
profiler_,
"doCallService.getinfo");
742 auto lckMain = mrpt::lockHelper(
zmq_->mainReqSocketMtx);
743 zmq::socket_t&
s = *
zmq_->mainReqSocket;
745 mvsim_msgs::GetServiceInfoRequest gsi;
746 gsi.set_servicename(serviceName);
747 mvsim::sendMessage(gsi,
s);
749 auto m = mvsim::receiveMessage(
s);
750 mvsim_msgs::GetServiceInfoAnswer gsia;
751 mvsim::parseMessage(m, gsia);
755 "Error requesting information about service `%s`: %s", serviceName.c_str(),
756 gsia.errormessage().c_str());
758 srvEndpoint = gsia.serviceendpoint();
765 zmq::socket_t srvReqSock(
zmq_->context, ZMQ_REQ);
766 srvReqSock.connect(srvEndpoint);
768 mvsim_msgs::CallService csMsg;
769 csMsg.set_servicename(serviceName);
770 csMsg.set_serializedinput(inputSerializedMsg);
772 mvsim::sendMessage(csMsg, srvReqSock);
774 const auto m = mvsim::receiveMessage(srvReqSock);
777 mvsim::parseMessage(m, outputMsg.value().get());
779 if (outputSerializedMsg)
781 const auto [typeName, serializedData] = internal::parseMessageToParts(m);
783 outputSerializedMsg.value().get() = serializedData;
784 if (outputMsgTypeName) outputMsgTypeName.value().get() = typeName;
796 const std::string& topicName, [[maybe_unused]]
const google::protobuf::Descriptor* descriptor,
800 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
801 mrpt::system::CTimeLoggerEntry tle(
profiler_,
"doSubscribeTopic");
803 std::unique_lock<std::shared_mutex> lck(
zmq_->subscribedTopics_mtx);
805 auto& topics =
zmq_->subscribedTopics;
809 topics.emplace_hint(topics.begin(), topicName,
zmq_->context)->second;
812 #if CPPZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 7, 1)
813 ipt.subSocket.set(zmq::sockopt::subscribe,
"");
815 ipt.subSocket.setsockopt(ZMQ_SUBSCRIBE,
"", 0);
818 ipt.callbacks.push_back(callback);
820 ipt.topicName = topicName;
827 mvsim_msgs::SubscribeRequest subReq;
828 subReq.set_topic(topicName);
829 subReq.set_updatesendpoint(
zmq_->topicNotificationsEndPoint);
831 auto lckMain = mrpt::lockHelper(
zmq_->mainReqSocketMtx);
832 mvsim::sendMessage(subReq, *
zmq_->mainReqSocket);
834 const auto m = mvsim::receiveMessage(*
zmq_->mainReqSocket);
836 mvsim_msgs::SubscribeAnswer subAns;
837 mvsim::parseMessage(m, subAns);
839 ASSERT_EQUAL_(subAns.topic(), topicName);
840 ASSERT_(subAns.success());
851 using namespace std::string_literals;
853 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
856 MRPT_LOG_DEBUG_STREAM(
857 "[" <<
nodeName_ <<
"] Client topic subscribe thread for `" << ipt.topicName
860 zmq::socket_t&
s = ipt.subSocket;
865 const zmq::message_t m = mvsim::receiveMessage(
s);
870 for (
const auto& callback : ipt.callbacks) callback(m);
872 catch (
const std::exception& e)
874 MRPT_LOG_ERROR_STREAM(
875 "Exception in topic `"
876 << ipt.topicName <<
"` subscription callback:" << mrpt::exception_to_str(e));
880 catch (
const zmq::error_t& e)
882 if (e.num() == ETERM)
886 MRPT_LOG_DEBUG_STREAM(
888 <<
"] Client topic subscribe thread about to exit for ZMQ "
893 MRPT_LOG_ERROR_STREAM(
"internalTopicSubscribeThread: ZMQ error: " << e.what());
896 catch (
const std::exception& e)
898 MRPT_LOG_ERROR_STREAM(
899 "internalTopicSubscribeThread: Exception: " << mrpt::exception_to_str(e));
901 MRPT_LOG_DEBUG_STREAM(
"[" <<
nodeName_ <<
"] Client topic subscribe thread quitted.");