src/Comms/Client.cpp
Go to the documentation of this file.
1 /*+-------------------------------------------------------------------------+
2  | MultiVehicle simulator (libmvsim) |
3  | |
4  | Copyright (C) 2014-2020 Jose Luis Blanco Claraco |
5  | Copyright (C) 2017 Borys Tymchenko (Odessa Polytechnic University) |
6  | Distributed under 3-clause BSD License |
7  | See COPYING |
8  +-------------------------------------------------------------------------+ */
9 
10 #include <mrpt/core/exceptions.h>
11 #include <mrpt/version.h>
12 #include <mvsim/Comms/Client.h>
13 #include <mvsim/Comms/common.h>
14 #include <mvsim/Comms/ports.h>
16 #if MRPT_VERSION >= 0x204
17 #include <mrpt/system/thread_name.h>
18 #endif
19 
20 #include <iostream>
21 #include <mutex>
22 #include <shared_mutex>
23 
24 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
25 
26 #include <google/protobuf/text_format.h>
27 
28 #include <zmq.hpp>
29 
30 #include "AdvertiseServiceRequest.pb.h"
31 #include "AdvertiseTopicRequest.pb.h"
32 #include "CallService.pb.h"
33 #include "GenericAnswer.pb.h"
34 #include "GetServiceInfoAnswer.pb.h"
35 #include "GetServiceInfoRequest.pb.h"
36 #include "ListNodesAnswer.pb.h"
37 #include "ListNodesRequest.pb.h"
38 #include "ListTopicsAnswer.pb.h"
39 #include "ListTopicsRequest.pb.h"
40 #include "RegisterNodeAnswer.pb.h"
41 #include "RegisterNodeRequest.pb.h"
42 #include "SubscribeAnswer.pb.h"
43 #include "SubscribeRequest.pb.h"
44 #include "UnregisterNodeRequest.pb.h"
45 
46 #endif
47 
48 using namespace mvsim;
49 
50 #if defined(MVSIM_HAS_ZMQ)
51 namespace mvsim::internal
52 {
53 struct InfoPerAdvertisedTopic
54 {
55  InfoPerAdvertisedTopic(zmq::context_t& c) : context(c) {}
56 
57  zmq::context_t& context;
58 
59  std::string topicName;
60  zmq::socket_t pubSocket = zmq::socket_t(context, ZMQ_PUB);
61  std::string endpoint;
62  const google::protobuf::Descriptor* descriptor = nullptr;
63 };
64 
65 struct InfoPerService
66 {
67  InfoPerService() = default;
68 
69  std::string serviceName;
70  const google::protobuf::Descriptor* descInput = nullptr;
71  const google::protobuf::Descriptor* descOutput = nullptr;
73 };
74 struct InfoPerSubscribedTopic
75 {
76  InfoPerSubscribedTopic(zmq::context_t& c) : context(c) {}
77  ~InfoPerSubscribedTopic()
78  {
79  if (topicThread.joinable()) topicThread.join();
80  }
81 
82  zmq::context_t& context;
83 
84  std::string topicName;
85  zmq::socket_t subSocket = zmq::socket_t(context, ZMQ_SUB);
86  const google::protobuf::Descriptor* descriptor = nullptr;
87 
88  std::vector<Client::topic_callback_t> callbacks;
89 
90  std::thread topicThread;
91 };
92 } // namespace mvsim::internal
93 #endif
94 
96 {
97 #if defined(MVSIM_HAS_ZMQ)
98  zmq::context_t context{1, ZMQ_MAX_SOCKETS_DFLT};
99  std::optional<zmq::socket_t> mainReqSocket;
100  mvsim::SocketMonitor mainReqSocketMonitor;
101 
102  std::map<std::string, internal::InfoPerAdvertisedTopic> advertisedTopics;
103  std::shared_mutex advertisedTopics_mtx;
104 
105  std::optional<zmq::socket_t> srvListenSocket;
106  std::map<std::string, internal::InfoPerService> offeredServices;
107  std::shared_mutex offeredServices_mtx;
108 
109  std::map<std::string, internal::InfoPerSubscribedTopic> subscribedTopics;
110  std::shared_mutex subscribedTopics_mtx;
111 
112  std::optional<zmq::socket_t> topicNotificationsSocket;
113  std::string topicNotificationsEndPoint;
114 
115 #endif
116 };
117 
119  : mrpt::system::COutputLogger("mvsim::Client"),
120  zmq_(std::make_unique<Client::ZMQImpl>())
121 {
122 }
123 Client::Client(const std::string& nodeName) : Client() { setName(nodeName); }
124 
126 
127 void Client::setName(const std::string& nodeName) { nodeName_ = nodeName; }
128 
129 bool Client::connected() const
130 {
131 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
132  return zmq_->mainReqSocketMonitor.connected();
133 #else
134  return false;
135 #endif
136 }
137 
139 {
140  using namespace std::string_literals;
141  ASSERTMSG_(
142  !zmq_->mainReqSocket || !zmq_->mainReqSocket->connected(),
143  "Client is already running.");
144 
145 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
146 
147  zmq_->mainReqSocket.emplace(zmq_->context, ZMQ_REQ);
148 
149  // Monitor to listen on ZMQ socket events:
150  zmq_->mainReqSocketMonitor.monitor(zmq_->mainReqSocket.value());
151 
152  zmq_->mainReqSocket->connect(
153  "tcp://"s + serverHostAddress_ + ":"s +
154  std::to_string(MVSIM_PORTNO_MAIN_REP));
155 
156  // Let the server know about this new node:
158 
159  // Create listening socket for services:
160  zmq_->srvListenSocket.emplace(zmq_->context, ZMQ_REP);
161  zmq_->srvListenSocket->bind("tcp://0.0.0.0:*"s);
162 
163  if (!zmq_->srvListenSocket->connected())
164  THROW_EXCEPTION("Error binding service listening socket.");
165 
166  ASSERTMSG_(
167  !serviceInvokerThread_.joinable(),
168  "Client service thread is already running!");
169 
171  std::thread(&Client::internalServiceServingThread, this);
172 #if MRPT_VERSION >= 0x204
173  mrpt::system::thread_name("services_"s + nodeName_, serviceInvokerThread_);
174 #endif
175 
176  // Create listening socket for subscription updates:
177  zmq_->topicNotificationsSocket.emplace(zmq_->context, ZMQ_PAIR);
178  zmq_->topicNotificationsSocket->bind("tcp://0.0.0.0:*"s);
179 
180  if (!zmq_->topicNotificationsSocket->connected())
181  THROW_EXCEPTION("Error binding topic updates listening socket.");
182 
183  zmq_->topicNotificationsEndPoint =
184  get_zmq_endpoint(*zmq_->topicNotificationsSocket);
185 
186  ASSERTMSG_(
187  !topicUpdatesThread_.joinable(),
188  "Client topic updates thread is already running!");
189 
191  std::thread(&Client::internalTopicUpdatesThread, this);
192 #if MRPT_VERSION >= 0x204
193  mrpt::system::thread_name(
194  "topicUpdates_"s + nodeName_, topicUpdatesThread_);
195 #endif
196 
197 #else
199  "MVSIM needs building with ZMQ and PROTOBUF to enable "
200  "client/server");
201 #endif
202 }
203 
204 void Client::shutdown() noexcept
205 {
206 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
207 
208  if (!zmq_->mainReqSocket->connected()) return;
209 
210  try
211  {
212  MRPT_LOG_DEBUG_STREAM("Unregistering from server.");
214  }
215  catch (const std::exception& e)
216  {
218  "shutdown: Exception: " << mrpt::exception_to_str(e));
219  }
220 
221 #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 4, 0)
222  zmq_->context.shutdown();
223 #else
224  // Missing shutdown() in older versions:
225  zmq_ctx_shutdown(zmq_->context.operator void*());
226 #endif
227 
228  if (serviceInvokerThread_.joinable()) serviceInvokerThread_.join();
229  if (topicUpdatesThread_.joinable()) topicUpdatesThread_.join();
230  zmq_->subscribedTopics.clear();
231  zmq_->offeredServices.clear();
232 
233 #endif
234 }
235 
237 {
238 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
239  auto& s = *zmq_->mainReqSocket;
240 
241  mvsim_msgs::RegisterNodeRequest rnq;
242  rnq.set_nodename(nodeName_);
243  mvsim::sendMessage(rnq, s);
244 
245  // Get the reply.
246  const zmq::message_t reply = mvsim::receiveMessage(s);
247 
248  mvsim_msgs::RegisterNodeAnswer rna;
249  mvsim::parseMessage(reply, rna);
250  if (!rna.success())
251  {
253  "Server did not allow registering node: %s",
254  rna.errormessage().c_str());
255  }
256  MRPT_LOG_DEBUG("Successfully registered in the server.");
257 #else
258  THROW_EXCEPTION("MVSIM built without ZMQ");
259 #endif
260 }
261 
263 {
264 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
265  auto& s = *zmq_->mainReqSocket;
266 
267  mvsim_msgs::UnregisterNodeRequest rnq;
268  rnq.set_nodename(nodeName_);
269  mvsim::sendMessage(rnq, s);
270 
271  // Get the reply.
272  const zmq::message_t reply = mvsim::receiveMessage(s);
273 
274  mvsim_msgs::GenericAnswer rna;
275  mvsim::parseMessage(reply, rna);
276  if (!rna.success())
277  {
279  "Server answered an error unregistering node: %s",
280  rna.errormessage().c_str());
281  }
282  MRPT_LOG_DEBUG("Successfully unregistered in the server.");
283 #else
284  THROW_EXCEPTION("MVSIM built without ZMQ");
285 #endif
286 }
287 
288 std::vector<Client::InfoPerNode> Client::requestListOfNodes()
289 {
290 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
291  auto& s = *zmq_->mainReqSocket;
292 
293  mvsim_msgs::ListNodesRequest req;
294  mvsim::sendMessage(req, s);
295 
296  // Get the reply.
297  const zmq::message_t reply = mvsim::receiveMessage(s);
298 
299  mvsim_msgs::ListNodesAnswer lna;
300  mvsim::parseMessage(reply, lna);
301 
302  std::vector<Client::InfoPerNode> nodes;
303  nodes.resize(lna.nodes_size());
304 
305  for (int i = 0; i < lna.nodes_size(); i++)
306  {
307  nodes[i].name = lna.nodes(i);
308  }
309  return nodes;
310 #else
311  THROW_EXCEPTION("MVSIM built without ZMQ");
312 #endif
313 }
314 
315 std::vector<Client::InfoPerTopic> Client::requestListOfTopics()
316 {
317 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
318  auto& s = *zmq_->mainReqSocket;
319 
320  mvsim_msgs::ListTopicsRequest req;
321  mvsim::sendMessage(req, s);
322 
323  // Get the reply.
324  const zmq::message_t reply = mvsim::receiveMessage(s);
325 
326  mvsim_msgs::ListTopicsAnswer lta;
327  mvsim::parseMessage(reply, lta);
328 
329  std::vector<Client::InfoPerTopic> topics;
330  topics.resize(lta.topics_size());
331 
332  for (int i = 0; i < lta.topics_size(); i++)
333  {
334  const auto& t = lta.topics(i);
335  auto& dst = topics[i];
336 
337  dst.name = t.topicname();
338  dst.type = t.topictype();
339 
340  ASSERT_EQUAL_(t.publisherendpoint_size(), t.publishername_size());
341  dst.endpoints.resize(t.publisherendpoint_size());
342  dst.publishers.resize(t.publisherendpoint_size());
343 
344  for (int k = 0; k < t.publisherendpoint_size(); k++)
345  {
346  dst.publishers[k] = t.publishername(k);
347  dst.endpoints[k] = t.publisherendpoint(k);
348  }
349  }
350  return topics;
351 #else
352  THROW_EXCEPTION("MVSIM built without ZMQ");
353 #endif
354 }
355 
357  const std::string& topicName,
358  const google::protobuf::Descriptor* descriptor)
359 {
360 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
361 
362  auto& advTopics = zmq_->advertisedTopics;
363 
364  std::unique_lock<std::shared_mutex> lck(zmq_->advertisedTopics_mtx);
365 
366  if (advTopics.find(topicName) != advTopics.end())
368  "Topic `%s` already registered for publication in this same "
369  "client (!)",
370  topicName.c_str());
371 
372  // the ctor of InfoPerAdvertisedTopic automatically creates a ZMQ_PUB
373  // socket in pubSocket
374  internal::InfoPerAdvertisedTopic& ipat =
375  advTopics.emplace_hint(advTopics.begin(), topicName, zmq_->context)
376  ->second;
377 
378  lck.unlock();
379 
380  // Bind the PUBLISH socket:
381  ipat.pubSocket.bind("tcp://0.0.0.0:*");
382  if (!ipat.pubSocket.connected())
383  THROW_EXCEPTION("Could not bind publisher socket");
384 
385  // Retrieve assigned TCP port:
386  ipat.endpoint = get_zmq_endpoint(ipat.pubSocket);
387  ipat.topicName = topicName; // redundant in container, but handy.
388  ipat.descriptor = descriptor;
389 
391  "Advertising topic `%s` [%s] on endpoint `%s`", topicName.c_str(),
392  descriptor->full_name().c_str(), ipat.endpoint.c_str());
393 
394  // MRPT_LOG_INFO_STREAM("Type: " << descriptor->DebugString());
395 
396  mvsim_msgs::AdvertiseTopicRequest req;
397  req.set_topicname(ipat.topicName);
398  req.set_endpoint(ipat.endpoint);
399  req.set_topictypename(ipat.descriptor->full_name());
400  req.set_nodename(nodeName_);
401 
402  mvsim::sendMessage(req, *zmq_->mainReqSocket);
403 
404  // Get the reply.
405  const zmq::message_t reply = mvsim::receiveMessage(*zmq_->mainReqSocket);
406  mvsim_msgs::GenericAnswer ans;
407  mvsim::parseMessage(reply, ans);
408 
409  if (!ans.success())
411  "Error registering topic `%s` in server: `%s`", topicName.c_str(),
412  ans.errormessage().c_str());
413 
414 #else
415  THROW_EXCEPTION("MVSIM built without ZMQ & PROTOBUF");
416 #endif
417 }
418 
420  const std::string& serviceName, const google::protobuf::Descriptor* descIn,
421  const google::protobuf::Descriptor* descOut, service_callback_t callback)
422 {
423 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
424 
425  std::unique_lock<std::shared_mutex> lck(zmq_->offeredServices_mtx);
426 
427  auto& services = zmq_->offeredServices;
428 
429  if (services.find(serviceName) != services.end())
431  "Service `%s` already registered in this same client!",
432  serviceName.c_str());
433 
434  internal::InfoPerService& ips = services[serviceName];
435 
436  lck.unlock();
437 
438  // Retrieve assigned TCP port:
439  char assignedPort[100];
440  size_t assignedPortLen = sizeof(assignedPort);
441  zmq_->srvListenSocket->getsockopt(
442  ZMQ_LAST_ENDPOINT, assignedPort, &assignedPortLen);
443  assignedPort[assignedPortLen] = '\0';
444 
445  ips.serviceName = serviceName; // redundant in container, but handy.
446  ips.callback = callback;
447  ips.descInput = descIn;
448  ips.descOutput = descOut;
449 
451  "Advertising service `%s` [%s->%s] on endpoint `%s`",
452  serviceName.c_str(), descIn->full_name().c_str(),
453  descOut->full_name().c_str(), assignedPort);
454 
455  mvsim_msgs::AdvertiseServiceRequest req;
456  req.set_servicename(ips.serviceName);
457  req.set_endpoint(assignedPort);
458  req.set_inputtypename(ips.descInput->full_name());
459  req.set_outputtypename(ips.descOutput->full_name());
460  req.set_nodename(nodeName_);
461 
462  mvsim::sendMessage(req, *zmq_->mainReqSocket);
463 
464  // Get the reply.
465  const zmq::message_t reply = mvsim::receiveMessage(*zmq_->mainReqSocket);
466  mvsim_msgs::GenericAnswer ans;
467  mvsim::parseMessage(reply, ans);
468 
469  if (!ans.success())
471  "Error registering service `%s` in server: `%s`",
472  serviceName.c_str(), ans.errormessage().c_str());
473 
474 #else
475  THROW_EXCEPTION("MVSIM built without ZMQ & PROTOBUF");
476 #endif
477 }
478 
480  const std::string& topicName, const google::protobuf::Message& msg)
481 {
482  MRPT_START
483 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
484  ASSERTMSG_(
485  zmq_ && zmq_->mainReqSocket && zmq_->mainReqSocket->connected(),
486  "Client not connected to Server");
487 
488  std::shared_lock<std::shared_mutex> lck(zmq_->advertisedTopics_mtx);
489  auto itIpat = zmq_->advertisedTopics.find(topicName);
490 
491  ASSERTMSG_(
492  itIpat != zmq_->advertisedTopics.end(),
493  mrpt::format(
494  "Topic `%s` cannot been registered. Missing former call to "
495  "advertiseTopic()?",
496  topicName.c_str()));
497 
498  lck.unlock();
499 
500  auto& ipat = itIpat->second;
501 
502  ASSERTMSG_(
503  msg.GetDescriptor() == ipat.descriptor,
504  mrpt::format(
505  "Topic `%s` has type `%s`, but expected `%s` from former call to "
506  "advertiseTopic()?",
507  topicName.c_str(), msg.GetDescriptor()->name().c_str(),
508  ipat.descriptor->name().c_str()));
509 
510  ASSERT_(ipat.pubSocket.connected());
511 
512  mvsim::sendMessage(msg, ipat.pubSocket);
513 
514 #if 0
516  "Published on topic `%s`: %s", topicName.c_str(),
517  msg.DebugString().c_str());
518 #endif
519 
520 #else
521  THROW_EXCEPTION("MVSIM built without ZMQ & PROTOBUF");
522 #endif
523  MRPT_END
524 }
525 
527 {
528  using namespace std::string_literals;
529 
530 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
531  try
532  {
534  "[" << nodeName_ << "] Client service thread started.");
535 
536  zmq::socket_t& s = *zmq_->srvListenSocket;
537 
538  for (;;)
539  {
540  // Wait for next request from client:
541  zmq::message_t m = mvsim::receiveMessage(s);
542 
543  // parse it:
544  mvsim_msgs::CallService csMsg;
545  mvsim::parseMessage(m, csMsg);
546 
547  std::shared_lock<std::shared_mutex> lck(zmq_->offeredServices_mtx);
548  const auto& srvName = csMsg.servicename();
549 
550  auto itSrv = zmq_->offeredServices.find(srvName);
551  if (itSrv == zmq_->offeredServices.end())
552  {
553  // Error: unknown service:
554  mvsim_msgs::GenericAnswer ans;
555  ans.set_success(false);
556  ans.set_errormessage(mrpt::format(
557  "Requested unknown service `%s`", srvName.c_str()));
558  MRPT_LOG_ERROR_STREAM(ans.errormessage());
559 
560  mvsim::sendMessage(ans, s);
561  continue;
562  }
563 
564  internal::InfoPerService& ips = itSrv->second;
565 
566  // MRPT_TODO("Check input descriptor?");
567 
568  auto outMsgPtr = ips.callback(csMsg.serializedinput());
569 
570  // Send response:
571  mvsim::sendMessage(*outMsgPtr, s);
572  }
573  }
574  catch (const zmq::error_t& e)
575  {
576  if (e.num() == ETERM)
577  {
578  // This simply means someone called
579  // requestMainThreadTermination(). Just exit silently.
581  "internalServiceServingThread about to exit for ZMQ term "
582  "signal.");
583  }
584  else
585  {
587  "internalServiceServingThread: ZMQ error: " << e.what());
588  }
589  }
590  catch (const std::exception& e)
591  {
593  "internalServiceServingThread: Exception: "
594  << mrpt::exception_to_str(e));
595  }
596  MRPT_LOG_DEBUG_STREAM("internalServiceServingThread quitted.");
597 
598 #endif
599 }
600 
602 {
603  using namespace std::string_literals;
604 
605 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
606  try
607  {
609  "[" << nodeName_ << "] Client topic updates thread started.");
610 
611  zmq::socket_t& s = *zmq_->topicNotificationsSocket;
612 
613  for (;;)
614  {
615  // Wait for next update from server:
616  zmq::message_t m = mvsim::receiveMessage(s);
617 
618  // parse it:
619  mvsim_msgs::TopicInfo tiMsg;
620  mvsim::parseMessage(m, tiMsg);
621 
622  // Let the server know that we received this:
623  mvsim_msgs::GenericAnswer ans;
624  ans.set_success(true);
625  mvsim::sendMessage(ans, s);
626 
627  // We got a message. This means we have to new endpoints to
628  // subscribe, either because we have just subscribed to a new topic,
629  // or a new node has advertised a topic we already subscribed in the
630  // past.
631 
632  // Look for the entry in the list of subscribed topics:
633  std::shared_lock<std::shared_mutex> lck(zmq_->subscribedTopics_mtx);
634  const auto& topicName = tiMsg.topicname();
635 
636  auto itTopic = zmq_->subscribedTopics.find(topicName);
637  if (itTopic == zmq_->subscribedTopics.end())
638  {
639  // This shouldn't happen (?).
641  "Received a topic `"
642  << topicName
643  << "` update message from server, but this node is not "
644  "subscribed to it (!).");
645  continue;
646  }
647 
649  "[internalTopicUpdatesThread] Received: "
650  << tiMsg.DebugString());
651 
652  internal::InfoPerSubscribedTopic& ipt = itTopic->second;
653 
654  for (int i = 0; i < tiMsg.publisherendpoint_size(); i++)
655  {
656  ipt.subSocket.connect(tiMsg.publisherendpoint(i));
657  }
658 
659  // No need to send response back to server.
660  }
661  }
662  catch (const zmq::error_t& e)
663  {
664  if (e.num() == ETERM)
665  {
666  // This simply means someone called
667  // requestMainThreadTermination(). Just exit silently.
669  "internalTopicUpdatesThread about to exit for ZMQ term "
670  "signal.");
671  }
672  else
673  {
675  "internalTopicUpdatesThread: ZMQ error: " << e.what());
676  }
677  }
678  catch (const std::exception& e)
679  {
681  "internalTopicUpdatesThread: Exception: "
682  << mrpt::exception_to_str(e));
683  }
685  "[" << nodeName_ << "] Client topic updates thread quitted.");
686 
687 #endif
688 }
689 
690 // Overload for python wrapper
692  const std::string& serviceName, const std::string& inputSerializedMsg)
693 {
694  MRPT_START
695 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
696 
697  std::string outMsgData, outMsgType;
699  serviceName, inputSerializedMsg, std::nullopt, outMsgData, outMsgType);
700  return outMsgData;
701 #endif
702  MRPT_END
703 }
704 
706  const std::string& serviceName, const std::string& inputSerializedMsg,
707  mrpt::optional_ref<google::protobuf::Message> outputMsg,
708  mrpt::optional_ref<std::string> outputSerializedMsg,
709  mrpt::optional_ref<std::string> outputMsgTypeName)
710 {
711  MRPT_START
712 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
713 
714  // 1) Request to the server who is serving this service:
715  // TODO: Cache?
716  std::string srvEndpoint;
717  {
718  zmq::socket_t& s = *zmq_->mainReqSocket;
719 
720  mvsim_msgs::GetServiceInfoRequest gsi;
721  gsi.set_servicename(serviceName);
722  mvsim::sendMessage(gsi, s);
723 
724  auto m = mvsim::receiveMessage(s);
725  mvsim_msgs::GetServiceInfoAnswer gsia;
726  mvsim::parseMessage(m, gsia);
727 
728  if (!gsia.success())
730  "Error requesting information about service `%s`: %s",
731  serviceName.c_str(), gsia.errormessage().c_str());
732 
733  srvEndpoint = gsia.serviceendpoint();
734  }
735 
736  // 2) Connect to the service offerrer and request the execution:
737  zmq::socket_t srvReqSock(zmq_->context, ZMQ_REQ);
738  srvReqSock.connect(srvEndpoint);
739 
740  mvsim_msgs::CallService csMsg;
741  csMsg.set_servicename(serviceName);
742  csMsg.set_serializedinput(inputSerializedMsg);
743 
744  mvsim::sendMessage(csMsg, srvReqSock);
745 
746  const auto m = mvsim::receiveMessage(srvReqSock);
747  if (outputMsg)
748  {
749  mvsim::parseMessage(m, outputMsg.value().get());
750  }
751  if (outputSerializedMsg)
752  {
753  const auto [typeName, serializedData] =
754  internal::parseMessageToParts(m);
755 
756  outputSerializedMsg.value().get() = serializedData;
757  if (outputMsgTypeName) outputMsgTypeName.value().get() = typeName;
758  }
759 #endif
760  MRPT_END
761 }
762 
764  const std::string& topicName,
765  const google::protobuf::Descriptor* descriptor,
766  const topic_callback_t& callback)
767 {
768  MRPT_START
769 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
770  // Register in my internal DB:
771  std::unique_lock<std::shared_mutex> lck(zmq_->subscribedTopics_mtx);
772 
773  auto& topics = zmq_->subscribedTopics;
774 
775  // It's ok to subscribe more than once:
776  internal::InfoPerSubscribedTopic& ipt =
777  topics.emplace_hint(topics.begin(), topicName, zmq_->context)->second;
778 
779  // subscribe to .recv() any message:
780  ipt.subSocket.setsockopt(ZMQ_SUBSCRIBE, "", 0);
781 
782  ipt.callbacks.push_back(callback);
783 
784  ipt.topicName = topicName;
785 
786  lck.unlock();
787 
788  ipt.topicThread =
789  std::thread([&]() { this->internalTopicSubscribeThread(ipt); });
790 
791  // Let the server know about our interest in the topic:
792  mvsim_msgs::SubscribeRequest subReq;
793  subReq.set_topic(topicName);
794  subReq.set_updatesendpoint(zmq_->topicNotificationsEndPoint);
795 
796  mvsim::sendMessage(subReq, *zmq_->mainReqSocket);
797 
798  const auto m = mvsim::receiveMessage(*zmq_->mainReqSocket);
799  mvsim_msgs::SubscribeAnswer subAns;
800  mvsim::parseMessage(m, subAns);
801 
802  ASSERT_EQUAL_(subAns.topic(), topicName);
803  ASSERT_(subAns.success());
804 
805  // That is... the rest will be done upon reception of messages from the
806  // server on potential clients we should subscribe to.
807 
808 #endif
809  MRPT_END
810 }
811 
812 void Client::internalTopicSubscribeThread(internal::InfoPerSubscribedTopic& ipt)
813 {
814  using namespace std::string_literals;
815 
816 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
817  try
818  {
820  "[" << nodeName_ << "] Client topic subscribe thread for `"
821  << ipt.topicName << "` started.");
822 
823  zmq::socket_t& s = ipt.subSocket;
824 
825  for (;;)
826  {
827  // Wait for next update from server:
828  const zmq::message_t m = mvsim::receiveMessage(s);
829 
830  // Send to subscriber callbacks:
831  try
832  {
833  for (auto callback : ipt.callbacks) callback(m);
834  }
835  catch (const std::exception& e)
836  {
838  "Exception in topic `"
839  << ipt.topicName
840  << "` subscription callback:" << mrpt::exception_to_str(e));
841  }
842  }
843  }
844  catch (const zmq::error_t& e)
845  {
846  if (e.num() == ETERM)
847  {
848  // This simply means someone called
849  // requestMainThreadTermination(). Just exit silently.
851  "[" << nodeName_
852  << "] Client topic subscribe thread about to exit for ZMQ "
853  "term signal.");
854  }
855  else
856  {
858  "internalTopicSubscribeThread: ZMQ error: " << e.what());
859  }
860  }
861  catch (const std::exception& e)
862  {
864  "internalTopicSubscribeThread: Exception: "
865  << mrpt::exception_to_str(e));
866  }
868  "[" << nodeName_ << "] Client topic subscribe thread quitted.");
869 
870 #endif
871 }
#define ASSERT_EQUAL_(__A, __B)
void internalTopicSubscribeThread(internal::InfoPerSubscribedTopic &ipt)
void internalTopicUpdatesThread()
std::thread topicUpdatesThread_
Definition: Client.h:119
void shutdown() noexcept
void setName(const std::string &nodeName)
void doAdvertiseService(const std::string &serviceName, const google::protobuf::Descriptor *descIn, const google::protobuf::Descriptor *descOut, service_callback_t callback)
std::unique_ptr< ZMQImpl > zmq_
Definition: Client.h:112
const GLfloat * c
#define THROW_EXCEPTION(msg)
#define THROW_EXCEPTION_FMT(_FORMAT_STRING,...)
GLdouble GLdouble t
std::function< void(const zmq::message_t &)> topic_callback_t
Definition: Client.h:141
#define MRPT_LOG_WARN_STREAM(__CONTENTS)
std::vector< InfoPerNode > requestListOfNodes()
void publishTopic(const std::string &topicName, const google::protobuf::Message &msg)
#define MRPT_LOG_INFO_STREAM(__CONTENTS)
#define MRPT_LOG_DEBUG(_STRING)
#define MRPT_LOG_DEBUG_STREAM(__CONTENTS)
GLdouble s
#define MRPT_LOG_DEBUG_FMT(_FMT_STRING,...)
std::string serverHostAddress_
Definition: Client.h:115
#define MRPT_END
void callService(const std::string &serviceName, const INPUT_MSG_T &input, OUTPUT_MSG_T &output)
Definition: Client.h:192
GLuint dst
std::string nodeName_
Definition: Client.h:116
std::string BASE_IMPEXP format(const char *fmt,...) MRPT_printf_format_check(1
std::vector< InfoPerTopic > requestListOfTopics()
#define MRPT_START
void doAdvertiseTopic(const std::string &topicName, const google::protobuf::Descriptor *descriptor)
void internalServiceServingThread()
constexpr unsigned int MVSIM_PORTNO_MAIN_REP
Definition: ports.h:17
std::thread serviceInvokerThread_
Definition: Client.h:118
#define ASSERT_(f)
void doCallService(const std::string &serviceName, const std::string &inputSerializedMsg, mrpt::optional_ref< google::protobuf::Message > outputMsg, mrpt::optional_ref< std::string > outputSerializedMsg=std::nullopt, mrpt::optional_ref< std::string > outputMsgTypeName=std::nullopt)
const GLdouble * m
#define MRPT_LOG_ERROR_STREAM(__CONTENTS)
bool connected() const
#define ASSERTMSG_(f, __ERROR_MSG)
void doSubscribeTopic(const std::string &topicName, const google::protobuf::Descriptor *descriptor, const topic_callback_t &callback)
std::function< std::shared_ptr< google::protobuf::Message >(const std::string &)> service_callback_t
Definition: Client.h:130


mvsim
Author(s):
autogenerated on Fri May 7 2021 03:05:51