src/Comms/Client.cpp
Go to the documentation of this file.
1 /*+-------------------------------------------------------------------------+
2  | MultiVehicle simulator (libmvsim) |
3  | |
4  | Copyright (C) 2014-2023 Jose Luis Blanco Claraco |
5  | Copyright (C) 2017 Borys Tymchenko (Odessa Polytechnic University) |
6  | Distributed under 3-clause BSD License |
7  | See COPYING |
8  +-------------------------------------------------------------------------+ */
9 
10 #include <mrpt/core/exceptions.h>
11 #include <mrpt/core/lock_helper.h>
12 #include <mrpt/version.h>
13 #include <mvsim/Comms/Client.h>
14 #include <mvsim/Comms/common.h>
15 #include <mvsim/Comms/ports.h>
17 #if MRPT_VERSION >= 0x204
18 #include <mrpt/system/thread_name.h>
19 #endif
20 
21 #include <iostream>
22 #include <mutex>
23 #include <shared_mutex>
24 
25 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
26 
27 #include <google/protobuf/text_format.h>
28 #include <mvsim/mvsim-msgs/AdvertiseServiceRequest.pb.h>
29 #include <mvsim/mvsim-msgs/AdvertiseTopicRequest.pb.h>
30 #include <mvsim/mvsim-msgs/CallService.pb.h>
31 #include <mvsim/mvsim-msgs/GenericAnswer.pb.h>
32 #include <mvsim/mvsim-msgs/GetServiceInfoAnswer.pb.h>
33 #include <mvsim/mvsim-msgs/GetServiceInfoRequest.pb.h>
34 #include <mvsim/mvsim-msgs/ListNodesAnswer.pb.h>
35 #include <mvsim/mvsim-msgs/ListNodesRequest.pb.h>
36 #include <mvsim/mvsim-msgs/ListTopicsAnswer.pb.h>
37 #include <mvsim/mvsim-msgs/ListTopicsRequest.pb.h>
38 #include <mvsim/mvsim-msgs/RegisterNodeAnswer.pb.h>
39 #include <mvsim/mvsim-msgs/RegisterNodeRequest.pb.h>
40 #include <mvsim/mvsim-msgs/SubscribeAnswer.pb.h>
41 #include <mvsim/mvsim-msgs/SubscribeRequest.pb.h>
42 #include <mvsim/mvsim-msgs/UnregisterNodeRequest.pb.h>
43 
44 #include <zmq.hpp>
45 
46 #endif
47 
48 using namespace mvsim;
49 
50 #if defined(MVSIM_HAS_ZMQ)
51 namespace mvsim::internal
52 {
53 struct InfoPerAdvertisedTopic
54 {
55  InfoPerAdvertisedTopic(zmq::context_t& c) : context(c) {}
56 
57  zmq::context_t& context;
58 
59  std::string topicName;
60  zmq::socket_t pubSocket = zmq::socket_t(context, ZMQ_PUB);
61  std::string endpoint;
62  const google::protobuf::Descriptor* descriptor = nullptr;
63 };
64 
65 struct InfoPerService
66 {
67  InfoPerService() = default;
68 
69  std::string serviceName;
70  const google::protobuf::Descriptor* descInput = nullptr;
71  const google::protobuf::Descriptor* descOutput = nullptr;
73 };
74 struct InfoPerSubscribedTopic
75 {
76  InfoPerSubscribedTopic(zmq::context_t& c) : context(c) {}
77  ~InfoPerSubscribedTopic()
78  {
79  if (topicThread.joinable()) topicThread.join();
80  }
81 
82  zmq::context_t& context;
83 
84  std::string topicName;
85  zmq::socket_t subSocket = zmq::socket_t(context, ZMQ_SUB);
86  const google::protobuf::Descriptor* descriptor = nullptr;
87 
88  std::vector<Client::topic_callback_t> callbacks;
89 
90  std::thread topicThread;
91 };
92 } // namespace mvsim::internal
93 #endif
94 
96 {
97 #if defined(MVSIM_HAS_ZMQ)
98  zmq::context_t context{2 /* io sockets */, ZMQ_MAX_SOCKETS_DFLT};
99  std::optional<zmq::socket_t> mainReqSocket;
100  std::recursive_mutex mainReqSocketMtx;
101  mvsim::SocketMonitor mainReqSocketMonitor;
102 
103  std::map<std::string, internal::InfoPerAdvertisedTopic> advertisedTopics;
104  std::shared_mutex advertisedTopics_mtx;
105 
106  std::optional<zmq::socket_t> srvListenSocket;
107  std::map<std::string, internal::InfoPerService> offeredServices;
108  std::shared_mutex offeredServices_mtx;
109 
110  std::map<std::string, internal::InfoPerSubscribedTopic> subscribedTopics;
111  std::shared_mutex subscribedTopics_mtx;
112 
113  std::optional<zmq::socket_t> topicNotificationsSocket;
114  std::string topicNotificationsEndPoint;
115 
116 #endif
117 };
118 
120  : mrpt::system::COutputLogger("mvsim::Client"),
121  zmq_(std::make_unique<Client::ZMQImpl>())
122 {
123 }
124 Client::Client(const std::string& nodeName) : Client() { setName(nodeName); }
125 
127 
128 void Client::setName(const std::string& nodeName) { nodeName_ = nodeName; }
129 
130 bool Client::connected() const
131 {
132 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
133  return zmq_->mainReqSocketMonitor.connected();
134 #else
135  return false;
136 #endif
137 }
138 
140 {
141  using namespace std::string_literals;
142  ASSERTMSG_(
143  !zmq_->mainReqSocket || !zmq_->mainReqSocket,
144  "Client is already running.");
145 
146 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
147 
148  mrpt::system::CTimeLoggerEntry tle(profiler_, "connect");
149 
150  auto lck = mrpt::lockHelper(zmq_->mainReqSocketMtx);
151 
152  zmq_->mainReqSocket.emplace(zmq_->context, ZMQ_REQ);
153 
154  // Monitor to listen on ZMQ socket events:
155  zmq_->mainReqSocketMonitor.monitor(zmq_->mainReqSocket.value());
156 
157  zmq_->mainReqSocket->connect(
158  "tcp://"s + serverHostAddress_ + ":"s +
159  std::to_string(MVSIM_PORTNO_MAIN_REP));
160 
161  // Let the server know about this new node:
163 
164  // Create listening socket for services:
165  zmq_->srvListenSocket.emplace(zmq_->context, ZMQ_REP);
166  zmq_->srvListenSocket->bind("tcp://0.0.0.0:*"s);
167 
168  if (!zmq_->srvListenSocket)
169  THROW_EXCEPTION("Error binding service listening socket.");
170 
171  ASSERTMSG_(
172  !serviceInvokerThread_.joinable(),
173  "Client service thread is already running!");
174 
176  std::thread(&Client::internalServiceServingThread, this);
177 #if MRPT_VERSION >= 0x204
178  mrpt::system::thread_name("services_"s + nodeName_, serviceInvokerThread_);
179 #endif
180 
181  // Create listening socket for subscription updates:
182  zmq_->topicNotificationsSocket.emplace(zmq_->context, ZMQ_PAIR);
183  zmq_->topicNotificationsSocket->bind("tcp://0.0.0.0:*"s);
184 
185  if (!zmq_->topicNotificationsSocket)
186  THROW_EXCEPTION("Error binding topic updates listening socket.");
187 
188  zmq_->topicNotificationsEndPoint =
189  get_zmq_endpoint(*zmq_->topicNotificationsSocket);
190 
191  ASSERTMSG_(
192  !topicUpdatesThread_.joinable(),
193  "Client topic updates thread is already running!");
194 
196  std::thread(&Client::internalTopicUpdatesThread, this);
197 #if MRPT_VERSION >= 0x204
198  mrpt::system::thread_name(
199  "topicUpdates_"s + nodeName_, topicUpdatesThread_);
200 #endif
201 
202 #else
203  THROW_EXCEPTION(
204  "MVSIM needs building with ZMQ and PROTOBUF to enable "
205  "client/server");
206 #endif
207 }
208 
209 void Client::shutdown() noexcept
210 {
211 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
212  mrpt::system::CTimeLoggerEntry tle(profiler_, "shutdown");
213 
214  auto lck = mrpt::lockHelper(zmq_->mainReqSocketMtx);
215  if (!zmq_->mainReqSocket) return;
216 
217  try
218  {
219  MRPT_LOG_DEBUG_STREAM("Unregistering from server.");
221  }
222  catch (const std::exception& e)
223  {
224  MRPT_LOG_ERROR_STREAM(
225  "shutdown: Exception: " << mrpt::exception_to_str(e));
226  }
227 
228 #if CPPZMQ_VERSIONZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 4, 0)
229  zmq_->context.shutdown();
230 #else
231  // Missing shutdown() in older versions:
232  zmq_ctx_shutdown(zmq_->context.operator void*());
233 #endif
234 
235  if (serviceInvokerThread_.joinable()) serviceInvokerThread_.join();
236  if (topicUpdatesThread_.joinable()) topicUpdatesThread_.join();
237  zmq_->subscribedTopics.clear();
238  zmq_->offeredServices.clear();
239 
240 #endif
241 }
242 
244 {
245 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
246  mrpt::system::CTimeLoggerEntry tle(profiler_, "doRegisterClient");
247 
248  auto lck = mrpt::lockHelper(zmq_->mainReqSocketMtx);
249  auto& s = *zmq_->mainReqSocket;
250 
251  mvsim_msgs::RegisterNodeRequest rnq;
252  rnq.set_nodename(nodeName_);
253  mvsim::sendMessage(rnq, s);
254 
255  // Get the reply.
256  const zmq::message_t reply = mvsim::receiveMessage(s);
257 
258  mvsim_msgs::RegisterNodeAnswer rna;
259  mvsim::parseMessage(reply, rna);
260  if (!rna.success())
261  {
262  THROW_EXCEPTION_FMT(
263  "Server did not allow registering node: %s",
264  rna.errormessage().c_str());
265  }
266  MRPT_LOG_DEBUG("Successfully registered in the server.");
267 #else
268  THROW_EXCEPTION("MVSIM built without ZMQ");
269 #endif
270 }
271 
273 {
274 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
275  mrpt::system::CTimeLoggerEntry tle(profiler_, "doUnregisterClient");
276 
277  auto lck = mrpt::lockHelper(zmq_->mainReqSocketMtx);
278  if (!zmq_->mainReqSocket) return;
279  auto& s = *zmq_->mainReqSocket;
280 
281  mvsim_msgs::UnregisterNodeRequest rnq;
282  rnq.set_nodename(nodeName_);
283  mvsim::sendMessage(rnq, s);
284 
285  // Get the reply.
286  const zmq::message_t reply = mvsim::receiveMessage(s);
287 
288  mvsim_msgs::GenericAnswer rna;
289  mvsim::parseMessage(reply, rna);
290  if (!rna.success())
291  {
292  THROW_EXCEPTION_FMT(
293  "Server answered an error unregistering node: %s",
294  rna.errormessage().c_str());
295  }
296  MRPT_LOG_DEBUG("Successfully unregistered in the server.");
297 #else
298  THROW_EXCEPTION("MVSIM built without ZMQ");
299 #endif
300 }
301 
302 std::vector<Client::InfoPerNode> Client::requestListOfNodes()
303 {
304 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
305  mrpt::system::CTimeLoggerEntry tle(profiler_, "requestListOfNodes");
306 
307  auto lck = mrpt::lockHelper(zmq_->mainReqSocketMtx);
308  auto& s = *zmq_->mainReqSocket;
309 
310  mvsim_msgs::ListNodesRequest req;
311  mvsim::sendMessage(req, s);
312 
313  // Get the reply.
314  const zmq::message_t reply = mvsim::receiveMessage(s);
315 
316  mvsim_msgs::ListNodesAnswer lna;
317  mvsim::parseMessage(reply, lna);
318 
319  std::vector<Client::InfoPerNode> nodes;
320  nodes.resize(lna.nodes_size());
321 
322  for (int i = 0; i < lna.nodes_size(); i++)
323  {
324  nodes[i].name = lna.nodes(i);
325  }
326  return nodes;
327 #else
328  THROW_EXCEPTION("MVSIM built without ZMQ");
329 #endif
330 }
331 
332 std::vector<Client::InfoPerTopic> Client::requestListOfTopics()
333 {
334 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
335  mrpt::system::CTimeLoggerEntry tle(profiler_, "requestListOfTopics");
336  auto lck = mrpt::lockHelper(zmq_->mainReqSocketMtx);
337  auto& s = *zmq_->mainReqSocket;
338 
339  mvsim_msgs::ListTopicsRequest req;
340  mvsim::sendMessage(req, s);
341 
342  // Get the reply.
343  const zmq::message_t reply = mvsim::receiveMessage(s);
344 
345  mvsim_msgs::ListTopicsAnswer lta;
346  mvsim::parseMessage(reply, lta);
347 
348  std::vector<Client::InfoPerTopic> topics;
349  topics.resize(lta.topics_size());
350 
351  for (int i = 0; i < lta.topics_size(); i++)
352  {
353  const auto& t = lta.topics(i);
354  auto& dst = topics[i];
355 
356  dst.name = t.topicname();
357  dst.type = t.topictype();
358 
359  ASSERT_EQUAL_(t.publisherendpoint_size(), t.publishername_size());
360  dst.endpoints.resize(t.publisherendpoint_size());
361  dst.publishers.resize(t.publisherendpoint_size());
362 
363  for (int k = 0; k < t.publisherendpoint_size(); k++)
364  {
365  dst.publishers[k] = t.publishername(k);
366  dst.endpoints[k] = t.publisherendpoint(k);
367  }
368  }
369  return topics;
370 #else
371  THROW_EXCEPTION("MVSIM built without ZMQ");
372 #endif
373 }
374 
376  const std::string& topicName,
377  const google::protobuf::Descriptor* descriptor)
378 {
379 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
380  mrpt::system::CTimeLoggerEntry tle(profiler_, "doAdvertiseTopic");
381  auto& advTopics = zmq_->advertisedTopics;
382 
383  std::unique_lock<std::shared_mutex> lck(zmq_->advertisedTopics_mtx);
384 
385  if (advTopics.find(topicName) != advTopics.end())
386  THROW_EXCEPTION_FMT(
387  "Topic `%s` already registered for publication in this same "
388  "client (!)",
389  topicName.c_str());
390 
391  // the ctor of InfoPerAdvertisedTopic automatically creates a ZMQ_PUB
392  // socket in pubSocket
393  internal::InfoPerAdvertisedTopic& ipat =
394  advTopics.emplace_hint(advTopics.begin(), topicName, zmq_->context)
395  ->second;
396 
397  lck.unlock();
398 
399  // Bind the PUBLISH socket:
400  ipat.pubSocket.bind("tcp://0.0.0.0:*");
401 
402 #if CPPZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 7, 1)
403  if (!ipat.pubSocket)
404 #else
405  if (!ipat.pubSocket.connected())
406 #endif
407  {
408  THROW_EXCEPTION("Could not bind publisher socket");
409  }
410 
411  // Retrieve assigned TCP port:
412  ipat.endpoint = get_zmq_endpoint(ipat.pubSocket);
413  ipat.topicName = topicName; // redundant in container, but handy.
414  ipat.descriptor = descriptor;
415 
416  MRPT_LOG_DEBUG_FMT(
417  "Advertising topic `%s` [%s] on endpoint `%s`", topicName.c_str(),
418  descriptor->full_name().c_str(), ipat.endpoint.c_str());
419 
420  // MRPT_LOG_INFO_STREAM("Type: " << descriptor->DebugString());
421 
422  mvsim_msgs::AdvertiseTopicRequest req;
423  req.set_topicname(ipat.topicName);
424  req.set_endpoint(ipat.endpoint);
425  req.set_topictypename(ipat.descriptor->full_name());
426  req.set_nodename(nodeName_);
427 
428  auto lckMain = mrpt::lockHelper(zmq_->mainReqSocketMtx);
429  mvsim::sendMessage(req, *zmq_->mainReqSocket);
430 
431  // Get the reply.
432  const zmq::message_t reply = mvsim::receiveMessage(*zmq_->mainReqSocket);
433  lckMain.unlock();
434 
435  mvsim_msgs::GenericAnswer ans;
436  mvsim::parseMessage(reply, ans);
437 
438  if (!ans.success())
439  THROW_EXCEPTION_FMT(
440  "Error registering topic `%s` in server: `%s`", topicName.c_str(),
441  ans.errormessage().c_str());
442 
443 #else
444  THROW_EXCEPTION("MVSIM built without ZMQ & PROTOBUF");
445 #endif
446 }
447 
449  const std::string& serviceName, const google::protobuf::Descriptor* descIn,
450  const google::protobuf::Descriptor* descOut, service_callback_t callback)
451 {
452 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
453  mrpt::system::CTimeLoggerEntry tle(profiler_, "doAdvertiseService");
454  std::unique_lock<std::shared_mutex> lck(zmq_->offeredServices_mtx);
455 
456  auto& services = zmq_->offeredServices;
457 
458  if (services.find(serviceName) != services.end())
459  THROW_EXCEPTION_FMT(
460  "Service `%s` already registered in this same client!",
461  serviceName.c_str());
462 
463  internal::InfoPerService& ips = services[serviceName];
464 
465  lck.unlock();
466 
467  // Retrieve assigned TCP port:
468  const auto assignedPort = mvsim::get_zmq_endpoint(*zmq_->srvListenSocket);
469 
470  ips.serviceName = serviceName; // redundant in container, but handy.
471  ips.callback = callback;
472  ips.descInput = descIn;
473  ips.descOutput = descOut;
474 
475  MRPT_LOG_DEBUG_FMT(
476  "Advertising service `%s` [%s->%s] on endpoint `%s`",
477  serviceName.c_str(), descIn->full_name().c_str(),
478  descOut->full_name().c_str(), assignedPort.c_str());
479 
480  mvsim_msgs::AdvertiseServiceRequest req;
481  req.set_servicename(ips.serviceName);
482  req.set_endpoint(assignedPort);
483  req.set_inputtypename(ips.descInput->full_name());
484  req.set_outputtypename(ips.descOutput->full_name());
485  req.set_nodename(nodeName_);
486 
487  auto lckMain = mrpt::lockHelper(zmq_->mainReqSocketMtx);
488  mvsim::sendMessage(req, *zmq_->mainReqSocket);
489 
490  // Get the reply.
491  const zmq::message_t reply = mvsim::receiveMessage(*zmq_->mainReqSocket);
492  lckMain.unlock();
493  mvsim_msgs::GenericAnswer ans;
494  mvsim::parseMessage(reply, ans);
495 
496  if (!ans.success())
497  THROW_EXCEPTION_FMT(
498  "Error registering service `%s` in server: `%s`",
499  serviceName.c_str(), ans.errormessage().c_str());
500 
501 #else
502  THROW_EXCEPTION("MVSIM built without ZMQ & PROTOBUF");
503 #endif
504 }
505 
507  const std::string& topicName, const google::protobuf::Message& msg)
508 {
509  MRPT_START
510 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
511  ASSERTMSG_(
512  zmq_ && zmq_->mainReqSocket && zmq_->mainReqSocket,
513  "Client not connected to Server");
514  mrpt::system::CTimeLoggerEntry tle(profiler_, "publishTopic");
515 
516  std::shared_lock<std::shared_mutex> lck(zmq_->advertisedTopics_mtx);
517  auto itIpat = zmq_->advertisedTopics.find(topicName);
518 
519  ASSERTMSG_(
520  itIpat != zmq_->advertisedTopics.end(),
521  mrpt::format(
522  "Topic `%s` has not been registered. Missing former call to "
523  "advertiseTopic()?",
524  topicName.c_str()));
525 
526  lck.unlock();
527 
528  auto& ipat = itIpat->second;
529 
530  ASSERTMSG_(
531  msg.GetDescriptor() == ipat.descriptor,
532  mrpt::format(
533  "Topic `%s` has type `%s`, but expected `%s` from former call to "
534  "advertiseTopic()?",
535  topicName.c_str(), msg.GetDescriptor()->name().c_str(),
536  ipat.descriptor->name().c_str()));
537 
538 #if CPPZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 7, 1)
539  ASSERT_(ipat.pubSocket);
540 #else
541  ASSERT_(ipat.pubSocket.connected());
542 #endif
543 
544  mvsim::sendMessage(msg, ipat.pubSocket);
545 
546 #if 0
547  std::cout << "Published on topic " << topicName << std::endl;
548 #endif
549 
550 #else
551  THROW_EXCEPTION("MVSIM built without ZMQ & PROTOBUF");
552 #endif
553  MRPT_END
554 }
555 
557 {
558  using namespace std::string_literals;
559 
560 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
561  try
562  {
563  MRPT_LOG_INFO_STREAM(
564  "[" << nodeName_ << "] Client service thread started.");
565 
566  zmq::socket_t& s = *zmq_->srvListenSocket;
567 
568  for (;;)
569  {
570  // Wait for next request from client:
571  zmq::message_t m = mvsim::receiveMessage(s);
572 
573  // parse it:
574  mvsim_msgs::CallService csMsg;
575  mvsim::parseMessage(m, csMsg);
576 
577  std::shared_lock<std::shared_mutex> lck(zmq_->offeredServices_mtx);
578  const auto& srvName = csMsg.servicename();
579 
580  auto itSrv = zmq_->offeredServices.find(srvName);
581  if (itSrv == zmq_->offeredServices.end())
582  {
583  // Error: unknown service:
584  mvsim_msgs::GenericAnswer ans;
585  ans.set_success(false);
586  ans.set_errormessage(mrpt::format(
587  "Requested unknown service `%s`", srvName.c_str()));
588  MRPT_LOG_ERROR_STREAM(ans.errormessage());
589 
590  mvsim::sendMessage(ans, s);
591  continue;
592  }
593 
594  internal::InfoPerService& ips = itSrv->second;
595 
596  // MRPT_TODO("Check input descriptor?");
597 
598  auto outMsgPtr = ips.callback(csMsg.serializedinput());
599 
600  // Send response:
601  mvsim::sendMessage(*outMsgPtr, s);
602  }
603  }
604  catch (const zmq::error_t& e)
605  {
606  if (e.num() == ETERM)
607  {
608  // This simply means someone called
609  // requestMainThreadTermination(). Just exit silently.
610  MRPT_LOG_DEBUG_STREAM(
611  "internalServiceServingThread about to exit for ZMQ term "
612  "signal.");
613  }
614  else
615  {
616  MRPT_LOG_ERROR_STREAM(
617  "internalServiceServingThread: ZMQ error: " << e.what());
618  }
619  }
620  catch (const std::exception& e)
621  {
622  MRPT_LOG_ERROR_STREAM(
623  "internalServiceServingThread: Exception: "
624  << mrpt::exception_to_str(e));
625  }
626  MRPT_LOG_DEBUG_STREAM("internalServiceServingThread quitted.");
627 
628 #endif
629 }
630 
632 {
633  using namespace std::string_literals;
634 
635 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
636  try
637  {
638  MRPT_LOG_DEBUG_STREAM(
639  "[" << nodeName_ << "] Client topic updates thread started.");
640 
641  zmq::socket_t& s = *zmq_->topicNotificationsSocket;
642 
643  for (;;)
644  {
645  // Wait for next update from server:
646  zmq::message_t m = mvsim::receiveMessage(s);
647 
648  // parse it:
649  mvsim_msgs::TopicInfo tiMsg;
650  mvsim::parseMessage(m, tiMsg);
651 
652  // Let the server know that we received this:
653  mvsim_msgs::GenericAnswer ans;
654  ans.set_success(true);
655  mvsim::sendMessage(ans, s);
656 
657  // We got a message. This means we have to new endpoints to
658  // subscribe, either because we have just subscribed to a new topic,
659  // or a new node has advertised a topic we already subscribed in the
660  // past.
661 
662  // Look for the entry in the list of subscribed topics:
663  std::unique_lock<std::shared_mutex> lck(zmq_->subscribedTopics_mtx);
664  const auto& topicName = tiMsg.topicname();
665 
666  auto itTopic = zmq_->subscribedTopics.find(topicName);
667  if (itTopic == zmq_->subscribedTopics.end())
668  {
669  // This shouldn't happen (?).
670  MRPT_LOG_WARN_STREAM(
671  "Received a topic `"
672  << topicName
673  << "` update message from server, but this node is not "
674  "subscribed to it (!).");
675  continue;
676  }
677 
678  MRPT_LOG_DEBUG_STREAM(
679  "[internalTopicUpdatesThread] Received: "
680  << tiMsg.DebugString());
681 
682  internal::InfoPerSubscribedTopic& ipt = itTopic->second;
683 
684  for (int i = 0; i < tiMsg.publisherendpoint_size(); i++)
685  {
686  ipt.subSocket.connect(tiMsg.publisherendpoint(i));
687  }
688 
689  // No need to send response back to server.
690  }
691  }
692  catch (const zmq::error_t& e)
693  {
694  if (e.num() == ETERM)
695  {
696  // This simply means someone called
697  // requestMainThreadTermination(). Just exit silently.
698  MRPT_LOG_DEBUG_STREAM(
699  "internalTopicUpdatesThread about to exit for ZMQ term "
700  "signal.");
701  }
702  else
703  {
704  MRPT_LOG_ERROR_STREAM(
705  "internalTopicUpdatesThread: ZMQ error: " << e.what());
706  }
707  }
708  catch (const std::exception& e)
709  {
710  MRPT_LOG_ERROR_STREAM(
711  "internalTopicUpdatesThread: Exception: "
712  << mrpt::exception_to_str(e));
713  }
714  MRPT_LOG_DEBUG_STREAM(
715  "[" << nodeName_ << "] Client topic updates thread quitted.");
716 
717 #endif
718 }
719 
720 // Overload for python wrapper
722  const std::string& serviceName, const std::string& inputSerializedMsg)
723 {
724  MRPT_START
725 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
726  mrpt::system::CTimeLoggerEntry tle(profiler_, "callService");
727 
728  std::string outMsgData, outMsgType;
730  serviceName, inputSerializedMsg, std::nullopt, outMsgData, outMsgType);
731  return outMsgData;
732 #endif
733  MRPT_END
734 }
737  const std::string& topicName,
738  const std::function<void(
739  const std::string& /*msgType*/,
740  const std::vector<uint8_t>& /*serializedMsg*/)>& callback)
741 {
742  MRPT_START
743 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
744 
745  subscribe_topic_raw(topicName, [callback](const zmq::message_t& m) {
746  const auto [sType, sData] = mvsim::internal::parseMessageToParts(m);
747  std::vector<uint8_t> d(sData.size());
748  ::memcpy(d.data(), sData.data(), sData.size());
749  callback(sType, d);
750  });
751 #endif
752  MRPT_END
753 }
754 
756  const std::string& serviceName, const std::string& inputSerializedMsg,
757  mrpt::optional_ref<google::protobuf::Message> outputMsg,
758  mrpt::optional_ref<std::string> outputSerializedMsg,
759  mrpt::optional_ref<std::string> outputMsgTypeName)
760 {
761  MRPT_START
762 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
763  mrpt::system::CTimeLoggerEntry tle(profiler_, "doCallService");
764 
765  // 1) Request to the server who is serving this service:
766  std::string srvEndpoint;
767 
768  auto lckCache = mrpt::lockHelper(serviceToEndPointCacheMtx_);
769  if (auto it = serviceToEndPointCache_.find(serviceName);
770  it != serviceToEndPointCache_.end())
771  {
772  srvEndpoint = it->second;
773  }
774  else
775  {
776  mrpt::system::CTimeLoggerEntry tle2(profiler_, "doCallService.getinfo");
777 
778  auto lckMain = mrpt::lockHelper(zmq_->mainReqSocketMtx);
779  zmq::socket_t& s = *zmq_->mainReqSocket;
780 
781  mvsim_msgs::GetServiceInfoRequest gsi;
782  gsi.set_servicename(serviceName);
783  mvsim::sendMessage(gsi, s);
784 
785  auto m = mvsim::receiveMessage(s);
786  mvsim_msgs::GetServiceInfoAnswer gsia;
787  mvsim::parseMessage(m, gsia);
788 
789  if (!gsia.success())
790  THROW_EXCEPTION_FMT(
791  "Error requesting information about service `%s`: %s",
792  serviceName.c_str(), gsia.errormessage().c_str());
793 
794  srvEndpoint = gsia.serviceendpoint();
795 
796  serviceToEndPointCache_[serviceName] = srvEndpoint;
797  }
798  lckCache.unlock();
799 
800  // 2) Connect to the service offerrer and request the execution:
801  zmq::socket_t srvReqSock(zmq_->context, ZMQ_REQ);
802  srvReqSock.connect(srvEndpoint);
803 
804  mvsim_msgs::CallService csMsg;
805  csMsg.set_servicename(serviceName);
806  csMsg.set_serializedinput(inputSerializedMsg);
807 
808  mvsim::sendMessage(csMsg, srvReqSock);
809 
810  const auto m = mvsim::receiveMessage(srvReqSock);
811  if (outputMsg)
812  {
813  mvsim::parseMessage(m, outputMsg.value().get());
814  }
815  if (outputSerializedMsg)
816  {
817  const auto [typeName, serializedData] =
818  internal::parseMessageToParts(m);
819 
820  outputSerializedMsg.value().get() = serializedData;
821  if (outputMsgTypeName) outputMsgTypeName.value().get() = typeName;
822  }
823 #endif
824  MRPT_END
825 }
826 
828  const std::string& topicName, const topic_callback_t& callback)
829 {
830  doSubscribeTopic(topicName, nullptr, callback);
831 }
832 
834  const std::string& topicName,
835  [[maybe_unused]] const google::protobuf::Descriptor* descriptor,
836  const topic_callback_t& callback)
837 {
838  MRPT_START
839 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
840  mrpt::system::CTimeLoggerEntry tle(profiler_, "doSubscribeTopic");
841  // Register in my internal DB:
842  std::unique_lock<std::shared_mutex> lck(zmq_->subscribedTopics_mtx);
843 
844  auto& topics = zmq_->subscribedTopics;
845 
846  // It's ok to subscribe more than once:
847  internal::InfoPerSubscribedTopic& ipt =
848  topics.emplace_hint(topics.begin(), topicName, zmq_->context)->second;
849 
850  // subscribe to .recv() any message:
851 #if CPPZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 7, 1)
852  ipt.subSocket.set(zmq::sockopt::subscribe, "");
853 #else
854  ipt.subSocket.setsockopt(ZMQ_SUBSCRIBE, "", 0);
855 #endif
856 
857  ipt.callbacks.push_back(callback);
858 
859  ipt.topicName = topicName;
860 
861  lck.unlock();
862 
863  ipt.topicThread =
864  std::thread([&]() { this->internalTopicSubscribeThread(ipt); });
865 
866  // Let the server know about our interest in the topic:
867  mvsim_msgs::SubscribeRequest subReq;
868  subReq.set_topic(topicName);
869  subReq.set_updatesendpoint(zmq_->topicNotificationsEndPoint);
870 
871  auto lckMain = mrpt::lockHelper(zmq_->mainReqSocketMtx);
872  mvsim::sendMessage(subReq, *zmq_->mainReqSocket);
873 
874  const auto m = mvsim::receiveMessage(*zmq_->mainReqSocket);
875  lckMain.unlock();
876  mvsim_msgs::SubscribeAnswer subAns;
877  mvsim::parseMessage(m, subAns);
878 
879  ASSERT_EQUAL_(subAns.topic(), topicName);
880  ASSERT_(subAns.success());
881 
882  // That is... the rest will be done upon reception of messages from the
883  // server on potential clients we should subscribe to.
884 
885 #endif
886  MRPT_END
887 }
888 
889 void Client::internalTopicSubscribeThread(internal::InfoPerSubscribedTopic& ipt)
890 {
891  using namespace std::string_literals;
892 
893 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
894  try
895  {
896  MRPT_LOG_DEBUG_STREAM(
897  "[" << nodeName_ << "] Client topic subscribe thread for `"
898  << ipt.topicName << "` started.");
899 
900  zmq::socket_t& s = ipt.subSocket;
901 
902  for (;;)
903  {
904  // Wait for next update from server:
905  const zmq::message_t m = mvsim::receiveMessage(s);
906 
907  // Send to subscriber callbacks:
908  try
909  {
910  for (const auto& callback : ipt.callbacks) callback(m);
911  }
912  catch (const std::exception& e)
913  {
914  MRPT_LOG_ERROR_STREAM(
915  "Exception in topic `"
916  << ipt.topicName
917  << "` subscription callback:" << mrpt::exception_to_str(e));
918  }
919  }
920  }
921  catch (const zmq::error_t& e)
922  {
923  if (e.num() == ETERM)
924  {
925  // This simply means someone called
926  // requestMainThreadTermination(). Just exit silently.
927  MRPT_LOG_DEBUG_STREAM(
928  "[" << nodeName_
929  << "] Client topic subscribe thread about to exit for ZMQ "
930  "term signal.");
931  }
932  else
933  {
934  MRPT_LOG_ERROR_STREAM(
935  "internalTopicSubscribeThread: ZMQ error: " << e.what());
936  }
937  }
938  catch (const std::exception& e)
939  {
940  MRPT_LOG_ERROR_STREAM(
941  "internalTopicSubscribeThread: Exception: "
942  << mrpt::exception_to_str(e));
943  }
944  MRPT_LOG_DEBUG_STREAM(
945  "[" << nodeName_ << "] Client topic subscribe thread quitted.");
946 
947 #endif
948 }
d
mrpt::system::CTimeLogger profiler_
Definition: Client.h:150
void internalTopicSubscribeThread(internal::InfoPerSubscribedTopic &ipt)
void internalTopicUpdatesThread()
std::thread topicUpdatesThread_
Definition: Client.h:145
void shutdown() noexcept
void setName(const std::string &nodeName)
void doAdvertiseService(const std::string &serviceName, const google::protobuf::Descriptor *descIn, const google::protobuf::Descriptor *descOut, service_callback_t callback)
std::unique_ptr< ZMQImpl > zmq_
Definition: Client.h:138
std::map< std::string, std::string > serviceToEndPointCache_
Definition: Client.h:147
void subscribeTopic(const std::string &topicName, const std::function< void(const MSG_T &)> &callback)
Definition: Client.h:207
XmlRpcServer s
std::function< void(const zmq::message_t &)> topic_callback_t
Definition: Client.h:127
std::vector< InfoPerNode > requestListOfNodes()
void publishTopic(const std::string &topicName, const google::protobuf::Message &msg)
geometry_msgs::TransformStamped t
std::string serverHostAddress_
Definition: Client.h:141
void callService(const std::string &serviceName, const INPUT_MSG_T &input, OUTPUT_MSG_T &output)
Definition: Client.h:221
std::string nodeName_
Definition: Client.h:142
std::vector< InfoPerTopic > requestListOfTopics()
bool connected() const
void doAdvertiseTopic(const std::string &topicName, const google::protobuf::Descriptor *descriptor)
void internalServiceServingThread()
constexpr unsigned int MVSIM_PORTNO_MAIN_REP
Definition: ports.h:17
std::thread serviceInvokerThread_
Definition: Client.h:144
void doCallService(const std::string &serviceName, const std::string &inputSerializedMsg, mrpt::optional_ref< google::protobuf::Message > outputMsg, mrpt::optional_ref< std::string > outputSerializedMsg=std::nullopt, mrpt::optional_ref< std::string > outputMsgTypeName=std::nullopt)
void doSubscribeTopic(const std::string &topicName, const google::protobuf::Descriptor *descriptor, const topic_callback_t &callback)
void subscribe_topic_raw(const std::string &topicName, const topic_callback_t &callback)
std::function< std::shared_ptr< google::protobuf::Message >(const std::string &)> service_callback_t
Definition: Client.h:161
std::mutex serviceToEndPointCacheMtx_
Definition: Client.h:148


mvsim
Author(s):
autogenerated on Tue Jul 4 2023 03:08:19