src/Comms/Client.cpp
Go to the documentation of this file.
1 /*+-------------------------------------------------------------------------+
2  | MultiVehicle simulator (libmvsim) |
3  | |
4  | Copyright (C) 2014-2024 Jose Luis Blanco Claraco |
5  | Copyright (C) 2017 Borys Tymchenko (Odessa Polytechnic University) |
6  | Distributed under 3-clause BSD License |
7  | See COPYING |
8  +-------------------------------------------------------------------------+ */
9 
10 #include <mrpt/core/exceptions.h>
11 #include <mrpt/core/lock_helper.h>
12 #include <mrpt/version.h>
13 #include <mvsim/Comms/Client.h>
14 #include <mvsim/Comms/common.h>
15 #include <mvsim/Comms/ports.h>
17 #if MRPT_VERSION >= 0x204
18 #include <mrpt/system/thread_name.h>
19 #endif
20 
21 #include <iostream>
22 #include <mutex>
23 #include <shared_mutex>
24 
25 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
26 
27 #include <google/protobuf/text_format.h>
28 #include <mvsim/mvsim-msgs/AdvertiseServiceRequest.pb.h>
29 #include <mvsim/mvsim-msgs/AdvertiseTopicRequest.pb.h>
30 #include <mvsim/mvsim-msgs/CallService.pb.h>
31 #include <mvsim/mvsim-msgs/GenericAnswer.pb.h>
32 #include <mvsim/mvsim-msgs/GetServiceInfoAnswer.pb.h>
33 #include <mvsim/mvsim-msgs/GetServiceInfoRequest.pb.h>
34 #include <mvsim/mvsim-msgs/ListNodesAnswer.pb.h>
35 #include <mvsim/mvsim-msgs/ListNodesRequest.pb.h>
36 #include <mvsim/mvsim-msgs/ListTopicsAnswer.pb.h>
37 #include <mvsim/mvsim-msgs/ListTopicsRequest.pb.h>
38 #include <mvsim/mvsim-msgs/RegisterNodeAnswer.pb.h>
39 #include <mvsim/mvsim-msgs/RegisterNodeRequest.pb.h>
40 #include <mvsim/mvsim-msgs/SubscribeAnswer.pb.h>
41 #include <mvsim/mvsim-msgs/SubscribeRequest.pb.h>
42 #include <mvsim/mvsim-msgs/UnregisterNodeRequest.pb.h>
43 
44 #include <zmq.hpp>
45 
46 #endif
47 
48 using namespace mvsim;
49 
50 #if defined(MVSIM_HAS_ZMQ)
51 namespace mvsim::internal
52 {
53 struct InfoPerAdvertisedTopic
54 {
55  InfoPerAdvertisedTopic(zmq::context_t& c) : context(c) {}
56 
57  zmq::context_t& context;
58 
59  std::string topicName;
60  zmq::socket_t pubSocket = zmq::socket_t(context, ZMQ_PUB);
61  std::string endpoint;
62  const google::protobuf::Descriptor* descriptor = nullptr;
63 };
64 
65 struct InfoPerService
66 {
67  InfoPerService() = default;
68 
69  std::string serviceName;
70  const google::protobuf::Descriptor* descInput = nullptr;
71  const google::protobuf::Descriptor* descOutput = nullptr;
73 };
74 struct InfoPerSubscribedTopic
75 {
76  InfoPerSubscribedTopic(zmq::context_t& c) : context(c) {}
77  ~InfoPerSubscribedTopic()
78  {
79  if (topicThread.joinable()) topicThread.join();
80  }
81 
82  zmq::context_t& context;
83 
84  std::string topicName;
85  zmq::socket_t subSocket = zmq::socket_t(context, ZMQ_SUB);
86  const google::protobuf::Descriptor* descriptor = nullptr;
87 
88  std::vector<Client::topic_callback_t> callbacks;
89 
90  std::thread topicThread;
91 };
92 } // namespace mvsim::internal
93 #endif
94 
96 {
97 #if defined(MVSIM_HAS_ZMQ)
98  zmq::context_t context{2 /* io sockets */, ZMQ_MAX_SOCKETS_DFLT};
99  std::optional<zmq::socket_t> mainReqSocket;
100  std::recursive_mutex mainReqSocketMtx;
101  mvsim::SocketMonitor mainReqSocketMonitor;
102 
103  std::map<std::string, internal::InfoPerAdvertisedTopic> advertisedTopics;
104  std::shared_mutex advertisedTopics_mtx;
105 
106  std::optional<zmq::socket_t> srvListenSocket;
107  std::map<std::string, internal::InfoPerService> offeredServices;
108  std::shared_mutex offeredServices_mtx;
109 
110  std::map<std::string, internal::InfoPerSubscribedTopic> subscribedTopics;
111  std::shared_mutex subscribedTopics_mtx;
112 
113  std::optional<zmq::socket_t> topicNotificationsSocket;
114  std::string topicNotificationsEndPoint;
115 
116 #endif
117 };
118 
120  : mrpt::system::COutputLogger("mvsim::Client"), zmq_(std::make_unique<Client::ZMQImpl>())
121 {
122 }
123 Client::Client(const std::string& nodeName) : Client() { setName(nodeName); }
124 
126 
127 void Client::setName(const std::string& nodeName) { nodeName_ = nodeName; }
128 
129 bool Client::connected() const
130 {
131 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
132  return zmq_->mainReqSocketMonitor.connected();
133 #else
134  return false;
135 #endif
136 }
137 
139 {
140  using namespace std::string_literals;
141  ASSERTMSG_(!zmq_->mainReqSocket || !zmq_->mainReqSocket, "Client is already running.");
142 
143 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
144 
145  mrpt::system::CTimeLoggerEntry tle(profiler_, "connect");
146 
147  auto lck = mrpt::lockHelper(zmq_->mainReqSocketMtx);
148 
149  zmq_->mainReqSocket.emplace(zmq_->context, ZMQ_REQ);
150 
151  // Monitor to listen on ZMQ socket events:
152  zmq_->mainReqSocketMonitor.monitor(zmq_->mainReqSocket.value());
153 
154  zmq_->mainReqSocket->connect(
155  "tcp://"s + serverHostAddress_ + ":"s + std::to_string(MVSIM_PORTNO_MAIN_REP));
156 
157  // Let the server know about this new node:
159 
160  // Create listening socket for services:
161  zmq_->srvListenSocket.emplace(zmq_->context, ZMQ_REP);
162  zmq_->srvListenSocket->bind("tcp://0.0.0.0:*"s);
163 
164  if (!zmq_->srvListenSocket) THROW_EXCEPTION("Error binding service listening socket.");
165 
166  ASSERTMSG_(!serviceInvokerThread_.joinable(), "Client service thread is already running!");
167 
169 #if MRPT_VERSION >= 0x204
170  mrpt::system::thread_name("services_"s + nodeName_, serviceInvokerThread_);
171 #endif
172 
173  // Create listening socket for subscription updates:
174  zmq_->topicNotificationsSocket.emplace(zmq_->context, ZMQ_PAIR);
175  zmq_->topicNotificationsSocket->bind("tcp://0.0.0.0:*"s);
176 
177  if (!zmq_->topicNotificationsSocket)
178  THROW_EXCEPTION("Error binding topic updates listening socket.");
179 
180  zmq_->topicNotificationsEndPoint = get_zmq_endpoint(*zmq_->topicNotificationsSocket);
181 
182  ASSERTMSG_(!topicUpdatesThread_.joinable(), "Client topic updates thread is already running!");
183 
185 #if MRPT_VERSION >= 0x204
186  mrpt::system::thread_name("topicUpdates_"s + nodeName_, topicUpdatesThread_);
187 #endif
188 
189 #else
190  THROW_EXCEPTION(
191  "MVSIM needs building with ZMQ and PROTOBUF to enable "
192  "client/server");
193 #endif
194 }
195 
196 void Client::shutdown() noexcept
197 {
198 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
199  mrpt::system::CTimeLoggerEntry tle(profiler_, "shutdown");
200 
201  auto lck = mrpt::lockHelper(zmq_->mainReqSocketMtx);
202  if (!zmq_->mainReqSocket) return;
203 
204  try
205  {
206  MRPT_LOG_DEBUG_STREAM("Unregistering from server.");
208  }
209  catch (const std::exception& e)
210  {
211  MRPT_LOG_ERROR_STREAM("shutdown: Exception: " << mrpt::exception_to_str(e));
212  }
213 
214 #if CPPZMQ_VERSIONZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 4, 0)
215  zmq_->context.shutdown();
216 #else
217  // Missing shutdown() in older versions:
218  zmq_ctx_shutdown(zmq_->context.operator void*());
219 #endif
220 
221  if (serviceInvokerThread_.joinable()) serviceInvokerThread_.join();
222  if (topicUpdatesThread_.joinable()) topicUpdatesThread_.join();
223  zmq_->subscribedTopics.clear();
224  zmq_->offeredServices.clear();
225 
226 #endif
227 }
228 
230 {
231 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
232  mrpt::system::CTimeLoggerEntry tle(profiler_, "doRegisterClient");
233 
234  auto lck = mrpt::lockHelper(zmq_->mainReqSocketMtx);
235  auto& s = *zmq_->mainReqSocket;
236 
237  mvsim_msgs::RegisterNodeRequest rnq;
238  rnq.set_nodename(nodeName_);
239  mvsim::sendMessage(rnq, s);
240 
241  // Get the reply.
242  const zmq::message_t reply = mvsim::receiveMessage(s);
243 
244  mvsim_msgs::RegisterNodeAnswer rna;
245  mvsim::parseMessage(reply, rna);
246  if (!rna.success())
247  {
248  THROW_EXCEPTION_FMT(
249  "Server did not allow registering node: %s", rna.errormessage().c_str());
250  }
251  MRPT_LOG_DEBUG("Successfully registered in the server.");
252 #else
253  THROW_EXCEPTION("MVSIM built without ZMQ");
254 #endif
255 }
256 
258 {
259 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
260  mrpt::system::CTimeLoggerEntry tle(profiler_, "doUnregisterClient");
261 
262  auto lck = mrpt::lockHelper(zmq_->mainReqSocketMtx);
263  if (!zmq_->mainReqSocket) return;
264  auto& s = *zmq_->mainReqSocket;
265 
266  mvsim_msgs::UnregisterNodeRequest rnq;
267  rnq.set_nodename(nodeName_);
268  mvsim::sendMessage(rnq, s);
269 
270  // Get the reply.
271  const zmq::message_t reply = mvsim::receiveMessage(s);
272 
273  mvsim_msgs::GenericAnswer rna;
274  mvsim::parseMessage(reply, rna);
275  if (!rna.success())
276  {
277  THROW_EXCEPTION_FMT(
278  "Server answered an error unregistering node: %s", rna.errormessage().c_str());
279  }
280  MRPT_LOG_DEBUG("Successfully unregistered in the server.");
281 #else
282  THROW_EXCEPTION("MVSIM built without ZMQ");
283 #endif
284 }
285 
286 std::vector<Client::InfoPerNode> Client::requestListOfNodes()
287 {
288 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
289  mrpt::system::CTimeLoggerEntry tle(profiler_, "requestListOfNodes");
290 
291  auto lck = mrpt::lockHelper(zmq_->mainReqSocketMtx);
292  auto& s = *zmq_->mainReqSocket;
293 
294  mvsim_msgs::ListNodesRequest req;
295  mvsim::sendMessage(req, s);
296 
297  // Get the reply.
298  const zmq::message_t reply = mvsim::receiveMessage(s);
299 
300  mvsim_msgs::ListNodesAnswer lna;
301  mvsim::parseMessage(reply, lna);
302 
303  std::vector<Client::InfoPerNode> nodes;
304  nodes.resize(lna.nodes_size());
305 
306  for (int i = 0; i < lna.nodes_size(); i++)
307  {
308  nodes[i].name = lna.nodes(i);
309  }
310  return nodes;
311 #else
312  THROW_EXCEPTION("MVSIM built without ZMQ");
313 #endif
314 }
315 
316 std::vector<Client::InfoPerTopic> Client::requestListOfTopics()
317 {
318 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
319  mrpt::system::CTimeLoggerEntry tle(profiler_, "requestListOfTopics");
320  auto lck = mrpt::lockHelper(zmq_->mainReqSocketMtx);
321  auto& s = *zmq_->mainReqSocket;
322 
323  mvsim_msgs::ListTopicsRequest req;
324  mvsim::sendMessage(req, s);
325 
326  // Get the reply.
327  const zmq::message_t reply = mvsim::receiveMessage(s);
328 
329  mvsim_msgs::ListTopicsAnswer lta;
330  mvsim::parseMessage(reply, lta);
331 
332  std::vector<Client::InfoPerTopic> topics;
333  topics.resize(lta.topics_size());
334 
335  for (int i = 0; i < lta.topics_size(); i++)
336  {
337  const auto& t = lta.topics(i);
338  auto& dst = topics[i];
339 
340  dst.name = t.topicname();
341  dst.type = t.topictype();
342 
343  ASSERT_EQUAL_(t.publisherendpoint_size(), t.publishername_size());
344  dst.endpoints.resize(t.publisherendpoint_size());
345  dst.publishers.resize(t.publisherendpoint_size());
346 
347  for (int k = 0; k < t.publisherendpoint_size(); k++)
348  {
349  dst.publishers[k] = t.publishername(k);
350  dst.endpoints[k] = t.publisherendpoint(k);
351  }
352  }
353  return topics;
354 #else
355  THROW_EXCEPTION("MVSIM built without ZMQ");
356 #endif
357 }
358 
360  const std::string& topicName, const google::protobuf::Descriptor* descriptor)
361 {
362 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
363  mrpt::system::CTimeLoggerEntry tle(profiler_, "doAdvertiseTopic");
364  auto& advTopics = zmq_->advertisedTopics;
365 
366  std::unique_lock<std::shared_mutex> lck(zmq_->advertisedTopics_mtx);
367 
368  if (advTopics.find(topicName) != advTopics.end())
369  THROW_EXCEPTION_FMT(
370  "Topic `%s` already registered for publication in this same "
371  "client (!)",
372  topicName.c_str());
373 
374  // the ctor of InfoPerAdvertisedTopic automatically creates a ZMQ_PUB
375  // socket in pubSocket
376  internal::InfoPerAdvertisedTopic& ipat =
377  advTopics.emplace_hint(advTopics.begin(), topicName, zmq_->context)->second;
378 
379  lck.unlock();
380 
381  // Bind the PUBLISH socket:
382  ipat.pubSocket.bind("tcp://0.0.0.0:*");
383 
384 #if CPPZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 7, 1)
385  if (!ipat.pubSocket)
386 #else
387  if (!ipat.pubSocket.connected())
388 #endif
389  {
390  THROW_EXCEPTION("Could not bind publisher socket");
391  }
392 
393  // Retrieve assigned TCP port:
394  ipat.endpoint = get_zmq_endpoint(ipat.pubSocket);
395  ipat.topicName = topicName; // redundant in container, but handy.
396  ipat.descriptor = descriptor;
397 
398  MRPT_LOG_DEBUG_FMT(
399  "Advertising topic `%s` [%s] on endpoint `%s`", topicName.c_str(),
400  descriptor->full_name().c_str(), ipat.endpoint.c_str());
401 
402  // MRPT_LOG_INFO_STREAM("Type: " << descriptor->DebugString());
403 
404  mvsim_msgs::AdvertiseTopicRequest req;
405  req.set_topicname(ipat.topicName);
406  req.set_endpoint(ipat.endpoint);
407  req.set_topictypename(ipat.descriptor->full_name());
408  req.set_nodename(nodeName_);
409 
410  auto lckMain = mrpt::lockHelper(zmq_->mainReqSocketMtx);
411  mvsim::sendMessage(req, *zmq_->mainReqSocket);
412 
413  // Get the reply.
414  const zmq::message_t reply = mvsim::receiveMessage(*zmq_->mainReqSocket);
415  lckMain.unlock();
416 
417  mvsim_msgs::GenericAnswer ans;
418  mvsim::parseMessage(reply, ans);
419 
420  if (!ans.success())
421  THROW_EXCEPTION_FMT(
422  "Error registering topic `%s` in server: `%s`", topicName.c_str(),
423  ans.errormessage().c_str());
424 
425 #else
426  THROW_EXCEPTION("MVSIM built without ZMQ & PROTOBUF");
427 #endif
428 }
429 
431  const std::string& serviceName, const google::protobuf::Descriptor* descIn,
432  const google::protobuf::Descriptor* descOut, service_callback_t callback)
433 {
434 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
435  mrpt::system::CTimeLoggerEntry tle(profiler_, "doAdvertiseService");
436  std::unique_lock<std::shared_mutex> lck(zmq_->offeredServices_mtx);
437 
438  auto& services = zmq_->offeredServices;
439 
440  if (services.find(serviceName) != services.end())
441  THROW_EXCEPTION_FMT(
442  "Service `%s` already registered in this same client!", serviceName.c_str());
443 
444  internal::InfoPerService& ips = services[serviceName];
445 
446  lck.unlock();
447 
448  // Retrieve assigned TCP port:
449  const auto assignedPort = mvsim::get_zmq_endpoint(*zmq_->srvListenSocket);
450 
451  ips.serviceName = serviceName; // redundant in container, but handy.
452  ips.callback = callback;
453  ips.descInput = descIn;
454  ips.descOutput = descOut;
455 
456  MRPT_LOG_DEBUG_FMT(
457  "Advertising service `%s` [%s->%s] on endpoint `%s`", serviceName.c_str(),
458  descIn->full_name().c_str(), descOut->full_name().c_str(), assignedPort.c_str());
459 
460  mvsim_msgs::AdvertiseServiceRequest req;
461  req.set_servicename(ips.serviceName);
462  req.set_endpoint(assignedPort);
463  req.set_inputtypename(ips.descInput->full_name());
464  req.set_outputtypename(ips.descOutput->full_name());
465  req.set_nodename(nodeName_);
466 
467  auto lckMain = mrpt::lockHelper(zmq_->mainReqSocketMtx);
468  mvsim::sendMessage(req, *zmq_->mainReqSocket);
469 
470  // Get the reply.
471  const zmq::message_t reply = mvsim::receiveMessage(*zmq_->mainReqSocket);
472  lckMain.unlock();
473  mvsim_msgs::GenericAnswer ans;
474  mvsim::parseMessage(reply, ans);
475 
476  if (!ans.success())
477  THROW_EXCEPTION_FMT(
478  "Error registering service `%s` in server: `%s`", serviceName.c_str(),
479  ans.errormessage().c_str());
480 
481 #else
482  THROW_EXCEPTION("MVSIM built without ZMQ & PROTOBUF");
483 #endif
484 }
485 
486 void Client::publishTopic(const std::string& topicName, const google::protobuf::Message& msg)
487 {
488  MRPT_START
489 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
490  ASSERTMSG_(
491  zmq_ && zmq_->mainReqSocket && zmq_->mainReqSocket, "Client not connected to Server");
492  mrpt::system::CTimeLoggerEntry tle(profiler_, "publishTopic");
493 
494  std::shared_lock<std::shared_mutex> lck(zmq_->advertisedTopics_mtx);
495  auto itIpat = zmq_->advertisedTopics.find(topicName);
496 
497  ASSERTMSG_(
498  itIpat != zmq_->advertisedTopics.end(),
499  mrpt::format(
500  "Topic `%s` has not been registered. Missing former call to "
501  "advertiseTopic()?",
502  topicName.c_str()));
503 
504  lck.unlock();
505 
506  auto& ipat = itIpat->second;
507 
508  ASSERTMSG_(
509  msg.GetDescriptor() == ipat.descriptor,
510  mrpt::format(
511  "Topic `%s` has type `%s`, but expected `%s` from former call to "
512  "advertiseTopic()?",
513  topicName.c_str(), msg.GetDescriptor()->name().c_str(),
514  ipat.descriptor->name().c_str()));
515 
516 #if CPPZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 7, 1)
517  ASSERT_(ipat.pubSocket);
518 #else
519  ASSERT_(ipat.pubSocket.connected());
520 #endif
521 
522  mvsim::sendMessage(msg, ipat.pubSocket);
523 
524 #else
525  THROW_EXCEPTION("MVSIM built without ZMQ & PROTOBUF");
526 #endif
527  MRPT_END
528 }
529 
531 {
532  using namespace std::string_literals;
533 
534 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
535  try
536  {
537  MRPT_LOG_INFO_STREAM("[" << nodeName_ << "] Client service thread started.");
538 
539  zmq::socket_t& s = *zmq_->srvListenSocket;
540 
541  for (;;)
542  {
543  // Wait for next request from client:
544  zmq::message_t m = mvsim::receiveMessage(s);
545 
546  // parse it:
547  mvsim_msgs::CallService csMsg;
548  mvsim::parseMessage(m, csMsg);
549 
550  std::shared_lock<std::shared_mutex> lck(zmq_->offeredServices_mtx);
551  const auto& srvName = csMsg.servicename();
552 
553  auto itSrv = zmq_->offeredServices.find(srvName);
554  if (itSrv == zmq_->offeredServices.end())
555  {
556  // Error: unknown service:
557  mvsim_msgs::GenericAnswer ans;
558  ans.set_success(false);
559  ans.set_errormessage(
560  mrpt::format("Requested unknown service `%s`", srvName.c_str()));
561  MRPT_LOG_ERROR_STREAM(ans.errormessage());
562 
563  mvsim::sendMessage(ans, s);
564  continue;
565  }
566 
567  internal::InfoPerService& ips = itSrv->second;
568 
569  // MRPT_TODO("Check input descriptor?");
570 
571  auto outMsgPtr = ips.callback(csMsg.serializedinput());
572 
573  // Send response:
574  mvsim::sendMessage(*outMsgPtr, s);
575  }
576  }
577  catch (const zmq::error_t& e)
578  {
579  if (e.num() == ETERM)
580  {
581  // This simply means someone called
582  // requestMainThreadTermination(). Just exit silently.
583  MRPT_LOG_DEBUG_STREAM(
584  "internalServiceServingThread about to exit for ZMQ term "
585  "signal.");
586  }
587  else
588  {
589  MRPT_LOG_ERROR_STREAM("internalServiceServingThread: ZMQ error: " << e.what());
590  }
591  }
592  catch (const std::exception& e)
593  {
594  MRPT_LOG_ERROR_STREAM(
595  "internalServiceServingThread: Exception: " << mrpt::exception_to_str(e));
596  }
597  MRPT_LOG_DEBUG_STREAM("internalServiceServingThread quitted.");
598 
599 #endif
600 }
601 
603 {
604  using namespace std::string_literals;
605 
606 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
607  try
608  {
609  MRPT_LOG_DEBUG_STREAM("[" << nodeName_ << "] Client topic updates thread started.");
610 
611  zmq::socket_t& s = *zmq_->topicNotificationsSocket;
612 
613  for (;;)
614  {
615  // Wait for next update from server:
616  zmq::message_t m = mvsim::receiveMessage(s);
617 
618  // parse it:
619  mvsim_msgs::TopicInfo tiMsg;
620  mvsim::parseMessage(m, tiMsg);
621 
622  // Let the server know that we received this:
623  mvsim_msgs::GenericAnswer ans;
624  ans.set_success(true);
625  mvsim::sendMessage(ans, s);
626 
627  // We got a message. This means we have to new endpoints to
628  // subscribe, either because we have just subscribed to a new topic,
629  // or a new node has advertised a topic we already subscribed in the
630  // past.
631 
632  // Look for the entry in the list of subscribed topics:
633  std::unique_lock<std::shared_mutex> lck(zmq_->subscribedTopics_mtx);
634  const auto& topicName = tiMsg.topicname();
635 
636  auto itTopic = zmq_->subscribedTopics.find(topicName);
637  if (itTopic == zmq_->subscribedTopics.end())
638  {
639  // This shouldn't happen (?).
640  MRPT_LOG_WARN_STREAM(
641  "Received a topic `" << topicName
642  << "` update message from server, but this node is not "
643  "subscribed to it (!).");
644  continue;
645  }
646 
647  MRPT_LOG_DEBUG_STREAM("[internalTopicUpdatesThread] Received: " << tiMsg.DebugString());
648 
649  internal::InfoPerSubscribedTopic& ipt = itTopic->second;
650 
651  for (int i = 0; i < tiMsg.publisherendpoint_size(); i++)
652  {
653  ipt.subSocket.connect(tiMsg.publisherendpoint(i));
654  }
655 
656  // No need to send response back to server.
657  }
658  }
659  catch (const zmq::error_t& e)
660  {
661  if (e.num() == ETERM)
662  {
663  // This simply means someone called
664  // requestMainThreadTermination(). Just exit silently.
665  MRPT_LOG_DEBUG_STREAM(
666  "internalTopicUpdatesThread about to exit for ZMQ term "
667  "signal.");
668  }
669  else
670  {
671  MRPT_LOG_ERROR_STREAM("internalTopicUpdatesThread: ZMQ error: " << e.what());
672  }
673  }
674  catch (const std::exception& e)
675  {
676  MRPT_LOG_ERROR_STREAM(
677  "internalTopicUpdatesThread: Exception: " << mrpt::exception_to_str(e));
678  }
679  MRPT_LOG_DEBUG_STREAM("[" << nodeName_ << "] Client topic updates thread quitted.");
680 
681 #endif
682 }
683 
684 // Overload for python wrapper
686  const std::string& serviceName, const std::string& inputSerializedMsg)
687 {
688  MRPT_START
689 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
690  mrpt::system::CTimeLoggerEntry tle(profiler_, "callService");
691 
692  std::string outMsgData, outMsgType;
693  doCallService(serviceName, inputSerializedMsg, std::nullopt, outMsgData, outMsgType);
694  return outMsgData;
695 #endif
696  MRPT_END
697 }
700  const std::string& topicName,
701  const std::function<void(
702  const std::string& /*msgType*/, const std::vector<uint8_t>& /*serializedMsg*/)>& callback)
703 {
704  MRPT_START
705 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
706 
708  topicName,
709  [callback](const zmq::message_t& m)
710  {
711  const auto [sType, sData] = mvsim::internal::parseMessageToParts(m);
712  std::vector<uint8_t> d(sData.size());
713  ::memcpy(d.data(), sData.data(), sData.size());
714  callback(sType, d);
715  });
716 #endif
717  MRPT_END
718 }
719 
721  const std::string& serviceName, const std::string& inputSerializedMsg,
722  mrpt::optional_ref<google::protobuf::Message> outputMsg,
723  mrpt::optional_ref<std::string> outputSerializedMsg,
724  mrpt::optional_ref<std::string> outputMsgTypeName)
725 {
726  MRPT_START
727 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
728  mrpt::system::CTimeLoggerEntry tle(profiler_, "doCallService");
729 
730  // 1) Request to the server who is serving this service:
731  std::string srvEndpoint;
732 
733  auto lckCache = mrpt::lockHelper(serviceToEndPointCacheMtx_);
734  if (auto it = serviceToEndPointCache_.find(serviceName); it != serviceToEndPointCache_.end())
735  {
736  srvEndpoint = it->second;
737  }
738  else
739  {
740  mrpt::system::CTimeLoggerEntry tle2(profiler_, "doCallService.getinfo");
741 
742  auto lckMain = mrpt::lockHelper(zmq_->mainReqSocketMtx);
743  zmq::socket_t& s = *zmq_->mainReqSocket;
744 
745  mvsim_msgs::GetServiceInfoRequest gsi;
746  gsi.set_servicename(serviceName);
747  mvsim::sendMessage(gsi, s);
748 
749  auto m = mvsim::receiveMessage(s);
750  mvsim_msgs::GetServiceInfoAnswer gsia;
751  mvsim::parseMessage(m, gsia);
752 
753  if (!gsia.success())
754  THROW_EXCEPTION_FMT(
755  "Error requesting information about service `%s`: %s", serviceName.c_str(),
756  gsia.errormessage().c_str());
757 
758  srvEndpoint = gsia.serviceendpoint();
759 
760  serviceToEndPointCache_[serviceName] = srvEndpoint;
761  }
762  lckCache.unlock();
763 
764  // 2) Connect to the service offerrer and request the execution:
765  zmq::socket_t srvReqSock(zmq_->context, ZMQ_REQ);
766  srvReqSock.connect(srvEndpoint);
767 
768  mvsim_msgs::CallService csMsg;
769  csMsg.set_servicename(serviceName);
770  csMsg.set_serializedinput(inputSerializedMsg);
771 
772  mvsim::sendMessage(csMsg, srvReqSock);
773 
774  const auto m = mvsim::receiveMessage(srvReqSock);
775  if (outputMsg)
776  {
777  mvsim::parseMessage(m, outputMsg.value().get());
778  }
779  if (outputSerializedMsg)
780  {
781  const auto [typeName, serializedData] = internal::parseMessageToParts(m);
782 
783  outputSerializedMsg.value().get() = serializedData;
784  if (outputMsgTypeName) outputMsgTypeName.value().get() = typeName;
785  }
786 #endif
787  MRPT_END
788 }
789 
790 void Client::subscribe_topic_raw(const std::string& topicName, const topic_callback_t& callback)
791 {
792  doSubscribeTopic(topicName, nullptr, callback);
793 }
794 
796  const std::string& topicName, [[maybe_unused]] const google::protobuf::Descriptor* descriptor,
797  const topic_callback_t& callback)
798 {
799  MRPT_START
800 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
801  mrpt::system::CTimeLoggerEntry tle(profiler_, "doSubscribeTopic");
802  // Register in my internal DB:
803  std::unique_lock<std::shared_mutex> lck(zmq_->subscribedTopics_mtx);
804 
805  auto& topics = zmq_->subscribedTopics;
806 
807  // It's ok to subscribe more than once:
809  topics.emplace_hint(topics.begin(), topicName, zmq_->context)->second;
810 
811  // subscribe to .recv() any message:
812 #if CPPZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 7, 1)
813  ipt.subSocket.set(zmq::sockopt::subscribe, "");
814 #else
815  ipt.subSocket.setsockopt(ZMQ_SUBSCRIBE, "", 0);
816 #endif
817 
818  ipt.callbacks.push_back(callback);
819 
820  ipt.topicName = topicName;
821 
822  lck.unlock();
823 
824  ipt.topicThread = std::thread([&]() { this->internalTopicSubscribeThread(ipt); });
825 
826  // Let the server know about our interest in the topic:
827  mvsim_msgs::SubscribeRequest subReq;
828  subReq.set_topic(topicName);
829  subReq.set_updatesendpoint(zmq_->topicNotificationsEndPoint);
830 
831  auto lckMain = mrpt::lockHelper(zmq_->mainReqSocketMtx);
832  mvsim::sendMessage(subReq, *zmq_->mainReqSocket);
833 
834  const auto m = mvsim::receiveMessage(*zmq_->mainReqSocket);
835  lckMain.unlock();
836  mvsim_msgs::SubscribeAnswer subAns;
837  mvsim::parseMessage(m, subAns);
838 
839  ASSERT_EQUAL_(subAns.topic(), topicName);
840  ASSERT_(subAns.success());
841 
842  // That is... the rest will be done upon reception of messages from the
843  // server on potential clients we should subscribe to.
844 
845 #endif
846  MRPT_END
847 }
848 
849 void Client::internalTopicSubscribeThread(internal::InfoPerSubscribedTopic& ipt)
850 {
851  using namespace std::string_literals;
852 
853 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
854  try
855  {
856  MRPT_LOG_DEBUG_STREAM(
857  "[" << nodeName_ << "] Client topic subscribe thread for `" << ipt.topicName
858  << "` started.");
859 
860  zmq::socket_t& s = ipt.subSocket;
861 
862  for (;;)
863  {
864  // Wait for next update from server:
865  const zmq::message_t m = mvsim::receiveMessage(s);
866 
867  // Send to subscriber callbacks:
868  try
869  {
870  for (const auto& callback : ipt.callbacks) callback(m);
871  }
872  catch (const std::exception& e)
873  {
874  MRPT_LOG_ERROR_STREAM(
875  "Exception in topic `"
876  << ipt.topicName << "` subscription callback:" << mrpt::exception_to_str(e));
877  }
878  }
879  }
880  catch (const zmq::error_t& e)
881  {
882  if (e.num() == ETERM)
883  {
884  // This simply means someone called
885  // requestMainThreadTermination(). Just exit silently.
886  MRPT_LOG_DEBUG_STREAM(
887  "[" << nodeName_
888  << "] Client topic subscribe thread about to exit for ZMQ "
889  "term signal.");
890  }
891  else
892  {
893  MRPT_LOG_ERROR_STREAM("internalTopicSubscribeThread: ZMQ error: " << e.what());
894  }
895  }
896  catch (const std::exception& e)
897  {
898  MRPT_LOG_ERROR_STREAM(
899  "internalTopicSubscribeThread: Exception: " << mrpt::exception_to_str(e));
900  }
901  MRPT_LOG_DEBUG_STREAM("[" << nodeName_ << "] Client topic subscribe thread quitted.");
902 
903 #endif
904 }
Client.h
mvsim::Client::internalTopicUpdatesThread
void internalTopicUpdatesThread()
Definition: src/Comms/Client.cpp:602
mvsim::Client::publishTopic
void publishTopic(const std::string &topicName, const google::protobuf::Message &msg)
Definition: src/Comms/Client.cpp:486
mvsim::Client::~Client
~Client()
Definition: src/Comms/Client.cpp:125
mvsim
Definition: Client.h:21
mvsim::Client::connected
bool connected() const
Definition: src/Comms/Client.cpp:129
mvsim::Client::subscribeTopic
void subscribeTopic(const std::string &topicName, const std::function< void(const MSG_T &)> &callback)
Definition: Client.h:198
mvsim::Client::doCallService
void doCallService(const std::string &serviceName, const std::string &inputSerializedMsg, mrpt::optional_ref< google::protobuf::Message > outputMsg, mrpt::optional_ref< std::string > outputSerializedMsg=std::nullopt, mrpt::optional_ref< std::string > outputMsgTypeName=std::nullopt)
Definition: src/Comms/Client.cpp:720
s
XmlRpcServer s
zmq_monitor.h
mvsim::Client
Definition: Client.h:48
mvsim::Client::internalServiceServingThread
void internalServiceServingThread()
Definition: src/Comms/Client.cpp:530
mvsim::Client::ZMQImpl
Definition: src/Comms/Client.cpp:95
mrpt
Definition: basic_types.h:36
mvsim::Client::InfoPerSubscribedTopic
friend struct internal::InfoPerSubscribedTopic
Definition: Client.h:173
mvsim::Client::InfoPerService
friend struct internal::InfoPerService
Definition: Client.h:172
mvsim::Client::serviceToEndPointCache_
std::map< std::string, std::string > serviceToEndPointCache_
Definition: Client.h:142
mvsim::Client::serverHostAddress_
std::string serverHostAddress_
Definition: Client.h:136
mvsim::Client::doAdvertiseTopic
void doAdvertiseTopic(const std::string &topicName, const google::protobuf::Descriptor *descriptor)
Definition: src/Comms/Client.cpp:359
mvsim::Client::topicUpdatesThread_
std::thread topicUpdatesThread_
Definition: Client.h:140
mvsim::Client::connect
void connect()
Definition: src/Comms/Client.cpp:138
mvsim::Client::nodeName_
std::string nodeName_
Definition: Client.h:137
mvsim::Client::setName
void setName(const std::string &nodeName)
Definition: src/Comms/Client.cpp:127
mvsim::Client::requestListOfTopics
std::vector< InfoPerTopic > requestListOfTopics()
Definition: src/Comms/Client.cpp:316
mvsim::Client::serviceToEndPointCacheMtx_
std::mutex serviceToEndPointCacheMtx_
Definition: Client.h:143
mvsim::Client::profiler_
mrpt::system::CTimeLogger profiler_
Definition: Client.h:145
mvsim::Client::internalTopicSubscribeThread
void internalTopicSubscribeThread(internal::InfoPerSubscribedTopic &ipt)
Definition: src/Comms/Client.cpp:849
d
d
mvsim::Client::doRegisterClient
void doRegisterClient()
Definition: src/Comms/Client.cpp:229
mvsim::MVSIM_PORTNO_MAIN_REP
constexpr unsigned int MVSIM_PORTNO_MAIN_REP
Definition: ports.h:17
mvsim::Client::doAdvertiseService
void doAdvertiseService(const std::string &serviceName, const google::protobuf::Descriptor *descIn, const google::protobuf::Descriptor *descOut, service_callback_t callback)
Definition: src/Comms/Client.cpp:430
mvsim::Client::zmq_
std::unique_ptr< ZMQImpl > zmq_
Definition: Client.h:133
common.h
mvsim::Client::subscribe_topic_raw
void subscribe_topic_raw(const std::string &topicName, const topic_callback_t &callback)
Definition: src/Comms/Client.cpp:790
mvsim::Client::shutdown
void shutdown() noexcept
Definition: src/Comms/Client.cpp:196
ports.h
std
mvsim::Client::doUnregisterClient
void doUnregisterClient()
Definition: src/Comms/Client.cpp:257
mvsim::Client::topic_callback_t
std::function< void(const zmq::message_t &)> topic_callback_t
Definition: Client.h:123
mvsim::Client::serviceInvokerThread_
std::thread serviceInvokerThread_
Definition: Client.h:139
mvsim::Client::requestListOfNodes
std::vector< InfoPerNode > requestListOfNodes()
Definition: src/Comms/Client.cpp:286
mvsim::internal
Definition: Client.h:21
mvsim::Client::service_callback_t
std::function< std::shared_ptr< google::protobuf::Message >(const std::string &)> service_callback_t
Definition: Client.h:155
t
geometry_msgs::TransformStamped t
mvsim::Client::Client
Client()
Definition: src/Comms/Client.cpp:119
mvsim::Client::callService
void callService(const std::string &serviceName, const INPUT_MSG_T &input, OUTPUT_MSG_T &output)
Definition: Client.h:213
mvsim::Client::doSubscribeTopic
void doSubscribeTopic(const std::string &topicName, const google::protobuf::Descriptor *descriptor, const topic_callback_t &callback)
Definition: src/Comms/Client.cpp:795


mvsim
Author(s):
autogenerated on Wed May 28 2025 02:13:07