Server.cpp
Go to the documentation of this file.
1 /*+-------------------------------------------------------------------------+
2  | MultiVehicle simulator (libmvsim) |
3  | |
4  | Copyright (C) 2014-2023 Jose Luis Blanco Claraco |
5  | Copyright (C) 2017 Borys Tymchenko (Odessa Polytechnic University) |
6  | Distributed under 3-clause BSD License |
7  | See COPYING |
8  +-------------------------------------------------------------------------+ */
9 
10 #include <mrpt/core/exceptions.h>
11 #include <mrpt/version.h>
12 #include <mvsim/Comms/Server.h>
13 #include <mvsim/Comms/common.h>
14 #if MRPT_VERSION >= 0x204
15 #include <mrpt/system/thread_name.h>
16 #endif
17 
18 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
19 #include <mvsim/mvsim-msgs/GenericAnswer.pb.h>
20 #include <mvsim/mvsim-msgs/GetServiceInfoAnswer.pb.h>
21 #include <mvsim/mvsim-msgs/ListNodesAnswer.pb.h>
22 #include <mvsim/mvsim-msgs/ListNodesRequest.pb.h>
23 #include <mvsim/mvsim-msgs/ListTopicsAnswer.pb.h>
24 #include <mvsim/mvsim-msgs/ListTopicsRequest.pb.h>
25 #include <mvsim/mvsim-msgs/RegisterNodeAnswer.pb.h>
26 #include <mvsim/mvsim-msgs/RegisterNodeRequest.pb.h>
27 #include <mvsim/mvsim-msgs/SubscribeAnswer.pb.h>
28 #include <mvsim/mvsim-msgs/SubscribeRequest.pb.h>
29 #include <mvsim/mvsim-msgs/UnregisterNodeRequest.pb.h>
30 
31 #include <zmq.hpp>
32 
33 #endif
34 
35 using namespace mvsim;
36 
37 Server::Server() : mrpt::system::COutputLogger("mvsim::Server") {}
38 
40 
42 {
43  ASSERTMSG_(!mainThread_.joinable(), "Server is already running.");
44 
45 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
47  mainThread_ = std::thread(&Server::internalServerThread, this);
48 
49 #if MRPT_VERSION >= 0x204
50  mrpt::system::thread_name("serverMain", mainThread_);
51 #endif
52 
53 #else
54  THROW_EXCEPTION(
55  "MVSIM needs building with ZMQ and PROTOBUF to enable client/server");
56 #endif
57 }
58 
59 void Server::shutdown() noexcept
60 {
61  try
62  {
63  MRPT_LOG_DEBUG_STREAM("Waiting for the thread to quit.");
65 
66  if (mainThread_.joinable()) mainThread_.join();
67 
68  MRPT_LOG_DEBUG_STREAM("Joined thread.");
69  }
70  catch (const std::exception& e)
71  {
72  MRPT_LOG_ERROR_STREAM(
73  "shutdown: Exception: " << mrpt::exception_to_str(e));
74  }
75 }
76 
78 {
79  using namespace std::string_literals;
80 
81 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
82  try
83  {
84  MRPT_LOG_INFO_STREAM("Server thread started.");
85 
86  zmq::context_t context(3);
87  mainThreadZMQcontext_ = &context;
88 
89  zmq::socket_t mainRepSocket(context, ZMQ_REP);
90  mainRepSocket.bind("tcp://*:"s + std::to_string(serverPortNo_));
91 
92  for (;;)
93  {
94  zmq::message_t request;
95 
96  // Wait for next request from client
97 #if CPPZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 1)
98  std::optional<size_t> reqSize = mainRepSocket.recv(request);
99  ASSERT_(reqSize.has_value());
100 #else
101  mainRepSocket.recv(&request);
102 #endif
103 
104  // Variant with all valid client requests:
105  using client_requests_t = std::variant<
106  mvsim_msgs::RegisterNodeRequest,
107  mvsim_msgs::UnregisterNodeRequest, mvsim_msgs::SubscribeRequest,
108  mvsim_msgs::ListNodesRequest, mvsim_msgs::ListTopicsRequest,
109  mvsim_msgs::AdvertiseTopicRequest,
110  mvsim_msgs::AdvertiseServiceRequest,
111  mvsim_msgs::GetServiceInfoRequest>;
112 
113  // Parse and dispatch:
114  try
115  {
116  client_requests_t req =
117  mvsim::parseMessageVariant<client_requests_t>(request);
118 
119  std::visit(
120  overloaded{
121  [&](const auto& m) { this->handle(m, mainRepSocket); },
122  },
123  req);
124  }
125  catch (const UnexpectedMessageException& e)
126  {
127  MRPT_LOG_ERROR_STREAM(e.what());
128  }
129  }
130  }
131  catch (const zmq::error_t& e)
132  {
133  if (e.num() == ETERM)
134  {
135  // This simply means someone called requestMainThreadTermination().
136  // Just exit silently.
137  MRPT_LOG_DEBUG_STREAM(
138  "Server thread about to exit for ZMQ term signal.");
139  }
140  else
141  {
142  MRPT_LOG_ERROR_STREAM(
143  "internalServerThread: ZMQ error: " << e.what());
144  }
145  }
146  catch (const std::exception& e)
147  {
148  MRPT_LOG_ERROR_STREAM(
149  "internalServerThread: Exception: " << mrpt::exception_to_str(e));
150  }
151  MRPT_LOG_DEBUG_STREAM("Server thread quitted.");
152 
153  mainThreadZMQcontext_ = nullptr;
154 #endif
155 }
156 
158 {
159 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
160  zmq::context_t* ctx = mainThreadZMQcontext_;
161  if (ctx)
162  {
163 #if CPPZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 4, 0)
164  ctx->shutdown();
165 #else
166  // Missing shutdown() in older versions:
167  zmq_ctx_shutdown(ctx->operator void*());
168 #endif
169  }
170 #endif
171 }
172 
173 void Server::db_remove_node(const std::string& nodeName)
174 {
175  std::unique_lock lck(dbMutex);
176 
177  auto itNode = connectedNodes_.find(nodeName);
178  if (itNode == connectedNodes_.end()) return; // Nothing to do
179 
180  for (const std::string& topic : itNode->second.advertisedTopics)
181  {
182  auto itTopic = knownTopics_.find(topic);
183  if (itTopic == knownTopics_.end()) continue;
184  knownTopics_.erase(itTopic);
185  }
186 
187  // for (const std::string& topic : itNode->second.subscribedTopics) { }
188 
189  connectedNodes_.erase(itNode);
190 }
191 
192 void Server::db_register_node(const std::string& nodeName)
193 {
194  std::unique_lock lck(dbMutex);
195 
196  connectedNodes_.emplace_hint(connectedNodes_.end(), nodeName, nodeName);
197 }
198 
200  const std::string& topicName, const std::string& topicTypeName,
201  const std::string& publisherEndpoint, const std::string& nodeName)
202 {
203  std::unique_lock lck(dbMutex);
204 
205  // 1) Add as a source of this topic:
206  auto& dbTopic = knownTopics_[topicName];
207 
208  if (!dbTopic.topicTypeName.empty() &&
209  dbTopic.topicTypeName != topicTypeName)
210  {
211  throw std::runtime_error(mrpt::format(
212  "Trying to register topic `%s` [%s] but already known with type "
213  "[%s]",
214  topicName.c_str(), topicTypeName.c_str(),
215  dbTopic.topicTypeName.c_str()));
216  }
217  dbTopic.topicName = topicName;
218  dbTopic.topicTypeName = topicTypeName;
219 
220  dbTopic.publishers.try_emplace(
221  nodeName, topicName, nodeName, publisherEndpoint);
222 
223  // 2) If clients are already waiting for this topic, inform them so they
224  // can subscribe to this new source of data:
226 }
227 
229  const std::string& topicName, const std::string& updatesEndPoint)
230 {
231  std::unique_lock lck(dbMutex);
232 
233  auto& dbTopic = knownTopics_[topicName];
234 
235  dbTopic.subscribers.try_emplace(
236  updatesEndPoint, topicName, updatesEndPoint);
237 
238  // Send all currently-existing publishers:
240  topicName, updatesEndPoint /*only this one*/);
241 }
242 
244  const std::string& topicName,
245  const std::optional<std::string>& updatesEndPoint)
246 {
247 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
248  auto& dbTopic = knownTopics_[topicName];
249 
250  mvsim_msgs::TopicInfo tiMsg;
251  tiMsg.set_topicname(topicName);
252  tiMsg.set_topictype(dbTopic.topicTypeName);
253 
254  for (const auto& pub : dbTopic.publishers)
255  {
256  tiMsg.add_publishername(pub.second.publisherNodeName);
257  tiMsg.add_publisherendpoint(pub.second.publisherEndpoint);
258  }
259 
260  ASSERT_(mainThreadZMQcontext_);
261 
262  auto lambdaSendToSub = [&](const std::string& subUpdtEndPoint) {
263  try
264  {
265  MRPT_LOG_DEBUG_STREAM(
266  "[send_topic_publishers_to_subscribed_clients] Letting "
267  << subUpdtEndPoint << " know about "
268  << dbTopic.publishers.size() << " publishers for topic '"
269  << topicName << "'");
270 
271  zmq::socket_t s(*mainThreadZMQcontext_, ZMQ_PAIR);
272  s.connect(subUpdtEndPoint);
273 #if CPPZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 7, 1)
274  ASSERT_(s);
275 #else
276  ASSERT_(s.connected());
277 #endif
278  sendMessage(tiMsg, s);
279 
280  mvsim_msgs::GenericAnswer ans;
281  const auto m = receiveMessage(s);
282  mvsim::parseMessage(m, ans);
283  ASSERT_(ans.success());
284  }
285  catch (const std::exception& e)
286  {
287  MRPT_LOG_ERROR_STREAM(
288  "Error sending topic updates to endpoint " << subUpdtEndPoint
289  << ":\n"
290  << e.what());
291  }
292  };
293 
294  if (updatesEndPoint.has_value())
295  {
296  // send to this one only:
297  lambdaSendToSub(*updatesEndPoint);
298  }
299  else
300  {
301  // send to all:
302  for (const auto& ips : dbTopic.subscribers)
303  {
304  lambdaSendToSub(ips.second.subscriberUpdatesEndpoint);
305  }
306  }
307 #endif
308 }
309 
311  const std::string& serviceName, const std::string& inputTypeName,
312  const std::string& outputTypeName, const std::string& publisherEndpoint,
313  const std::string& nodeName)
314 {
315  std::unique_lock lck(dbMutex);
316 
317  // 1) Add as a source of this topic:
318  auto& dbSrv = knownServices_[serviceName];
319 
320  if (!dbSrv.inputTypeName.empty() &&
321  (dbSrv.inputTypeName != inputTypeName ||
322  dbSrv.outputTypeName != outputTypeName))
323  {
324  throw std::runtime_error(mrpt::format(
325  "Trying to register service `%s` [%s->%s] but already known "
326  "with "
327  "types "
328  "[%s->%s]",
329  serviceName.c_str(), inputTypeName.c_str(), outputTypeName.c_str(),
330  dbSrv.inputTypeName.c_str(), dbSrv.outputTypeName.c_str()));
331  }
332  dbSrv.serviceName = serviceName;
333  dbSrv.inputTypeName = inputTypeName;
334  dbSrv.outputTypeName = outputTypeName;
335  dbSrv.endpoint = publisherEndpoint;
336  dbSrv.nodeName = nodeName;
337 }
338 
340  const std::string& serviceName, std::string& publisherEndpoint,
341  std::string& nodeName) const
342 {
343  std::shared_lock lck(dbMutex);
344 
345  // 1) Add as a source of this topic:
346  auto itSrv = knownServices_.find(serviceName);
347  if (itSrv == knownServices_.end()) return false;
348 
349  auto& dbSrv = itSrv->second;
350 
351  publisherEndpoint = dbSrv.endpoint;
352  nodeName = dbSrv.nodeName;
353 
354  return true;
355 }
356 
357 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
358 
359 // mvsim_msgs::RegisterNodeRequest
360 void Server::handle(const mvsim_msgs::RegisterNodeRequest& m, zmq::socket_t& s)
361 {
362  // Send reply back to client
363  MRPT_LOG_DEBUG_STREAM(
364  "Registering new node named '" << m.nodename() << "'");
365 
366  // Make sure we don't have already a node named like this:
367  // Don't raise an error if the name was already registered, since it
368  // might be that the same node disconnected and connected again:
369  db_remove_node(m.nodename());
370 
371  db_register_node(m.nodename());
372 
373  mvsim_msgs::RegisterNodeAnswer rna;
374  rna.set_success(true);
375  mvsim::sendMessage(rna, s);
376 }
377 
378 // mvsim_msgs::UnregisterNodeRequest
379 void Server::handle(
380  const mvsim_msgs::UnregisterNodeRequest& m, zmq::socket_t& s)
381 {
382  // Send reply back to client
383  MRPT_LOG_DEBUG_STREAM("Unregistering node named '" << m.nodename() << "'");
384 
385  db_remove_node(m.nodename());
386 
387  mvsim_msgs::GenericAnswer rna;
388  rna.set_success(true);
389  mvsim::sendMessage(rna, s);
390 }
391 
392 // mvsim_msgs::SubscribeRequest
393 void Server::handle(const mvsim_msgs::SubscribeRequest& m, zmq::socket_t& s)
394 {
395  // Send reply back to client
396  MRPT_LOG_DEBUG_STREAM(
397  "Subscription request for topic " << m.topic() << "'");
398 
399  // Include in our DB of subscriptions:
400  // This also sends the subcriber the list of existing endpoints it must
401  // subscribe to:
402  db_add_topic_subscriber(m.topic(), m.updatesendpoint());
403 
404  mvsim_msgs::SubscribeAnswer ans;
405  ans.set_topic(m.topic());
406  ans.set_success(true);
407  mvsim::sendMessage(ans, s);
408 }
409 
410 // mvsim_msgs::GetServiceInfoRequest
411 void Server::handle(
412  const mvsim_msgs::GetServiceInfoRequest& m, zmq::socket_t& s)
413 {
414  // Send reply back to client
415  MRPT_LOG_DEBUG_STREAM(
416  "GetServiceInfo request for service '" << m.servicename() << "'");
417 
418  mvsim_msgs::GetServiceInfoAnswer ans;
419  std::string node, endpoint;
420 
421  if (db_get_service_info(m.servicename(), endpoint, node))
422  {
423  ans.set_success(true);
424  ans.set_serviceendpoint(endpoint);
425  ans.set_servicenodename(node);
426  }
427  else
428  {
429  ans.set_success(false);
430  ans.set_errormessage(mrpt::format(
431  "Could not find service `%s`", m.servicename().c_str()));
432  }
433 
434  mvsim::sendMessage(ans, s);
435 }
436 
437 // mvsim_msgs::ListTopicsRequest
438 void Server::handle(const mvsim_msgs::ListTopicsRequest& m, zmq::socket_t& s)
439 {
440  // Send reply back to client
441  MRPT_LOG_DEBUG("Listing topics request");
442 
443  mvsim_msgs::ListTopicsAnswer ans;
444 
445  // Optional name filter:
446  const auto& queryPrefix = m.topicstartswith();
447 
448  std::shared_lock lck(dbMutex);
449 
450  for (const auto& kv : knownTopics_)
451  {
452  const auto& t = kv.second;
453  const auto& name = t.topicName;
454 
455  if (!queryPrefix.empty() ||
456  name.substr(0, queryPrefix.size()) == queryPrefix)
457  {
458  auto tInfo = ans.add_topics();
459  tInfo->set_topicname(name);
460  tInfo->set_topictype(t.topicTypeName);
461 
462  for (const auto& pubs : t.publishers)
463  {
464  tInfo->add_publishername(pubs.second.publisherNodeName);
465  tInfo->add_publisherendpoint(pubs.second.publisherEndpoint);
466  }
467  }
468  }
469  mvsim::sendMessage(ans, s);
470 }
471 
472 // mvsim_msgs::ListNodesRequest
473 void Server::handle(const mvsim_msgs::ListNodesRequest& m, zmq::socket_t& s)
474 {
475  // Send reply back to client
476  MRPT_LOG_DEBUG("Listing nodes request");
477 
478  // Optional name filter:
479  const auto& queryPrefix = m.nodestartswith();
480 
481  mvsim_msgs::ListNodesAnswer ans;
482  for (const auto& n : connectedNodes_)
483  {
484  const auto& name = n.second.nodeName;
485 
486  if (!queryPrefix.empty() ||
487  name.substr(0, queryPrefix.size()) == queryPrefix)
488  {
489  ans.add_nodes(name);
490  }
491  }
492  mvsim::sendMessage(ans, s);
493 }
494 
495 // mvsim_msgs::AdvertiseTopicRequest
496 void Server::handle(
497  const mvsim_msgs::AdvertiseTopicRequest& m, zmq::socket_t& s)
498 {
499  // Send reply back to client
500  MRPT_LOG_DEBUG_FMT(
501  "Received new topic advertiser: `%s` [%s] @ %s (%s)",
502  m.topicname().c_str(), m.topictypename().c_str(), m.endpoint().c_str(),
503  m.nodename().c_str());
504 
505  mvsim_msgs::GenericAnswer ans;
506  try
507  {
509  m.topicname(), m.topictypename(), m.endpoint(), m.nodename());
510  ans.set_success(true);
511  }
512  catch (const std::exception& e)
513  {
514  ans.set_success(false);
515  ans.set_errormessage(mrpt::exception_to_str(e));
516  }
517  mvsim::sendMessage(ans, s);
518 }
519 
520 // mvsim_msgs::AdvertiseServiceRequest
521 void Server::handle(
522  const mvsim_msgs::AdvertiseServiceRequest& m, zmq::socket_t& s)
523 {
524  // Send reply back to client
525  MRPT_LOG_DEBUG_FMT(
526  "Received new service offering: `%s` [%s->%s] @ %s (%s)",
527  m.servicename().c_str(), m.inputtypename().c_str(),
528  m.outputtypename().c_str(), m.endpoint().c_str(), m.nodename().c_str());
529 
530  mvsim_msgs::GenericAnswer ans;
531  try
532  {
534  m.servicename(), m.inputtypename(), m.outputtypename(),
535  m.endpoint(), m.nodename());
536  ans.set_success(true);
537  }
538  catch (const std::exception& e)
539  {
540  ans.set_success(false);
541  ans.set_errormessage(mrpt::exception_to_str(e));
542  }
543  mvsim::sendMessage(ans, s);
544 }
545 
546 #endif
void db_advertise_topic(const std::string &topicName, const std::string &topicTypeName, const std::string &publisherEndpoint, const std::string &nodeName)
Definition: Server.cpp:199
std::map< node_name_t, InfoPerNode > connectedNodes_
Definition: Server.h:149
void db_advertise_service(const std::string &serviceName, const std::string &inputTypeName, const std::string &outputTypeName, const std::string &publisherEndpoint, const std::string &nodeName)
Definition: Server.cpp:310
XmlRpcServer s
std::map< service_name_t, InfoPerService > knownServices_
Definition: Server.h:216
bool db_get_service_info(const std::string &serviceName, std::string &publisherEndpoint, std::string &nodeName) const
Definition: Server.cpp:339
void internalServerThread()
Definition: Server.cpp:77
void db_remove_node(const std::string &nodeName)
Definition: Server.cpp:173
std::shared_mutex dbMutex
Definition: Server.h:103
geometry_msgs::TransformStamped t
std::thread mainThread_
Definition: Server.h:80
std::atomic< zmq::context_t * > mainThreadZMQcontext_
Definition: Server.h:81
std::map< topic_name_t, InfoPerTopic > knownTopics_
Definition: Server.h:196
void requestMainThreadTermination()
Definition: Server.cpp:157
void start()
Definition: Server.cpp:41
void shutdown() noexcept
Definition: Server.cpp:59
unsigned int serverPortNo_
Definition: Server.h:220
void db_add_topic_subscriber(const std::string &topicName, const std::string &updatesEndPoint)
Definition: Server.cpp:228
void db_register_node(const std::string &nodeName)
Definition: Server.cpp:192
void send_topic_publishers_to_subscribed_clients(const std::string &topicName, const std::optional< std::string > &updatesEndPoint=std::nullopt)
Definition: Server.cpp:243


mvsim
Author(s):
autogenerated on Tue Jul 4 2023 03:08:21