Server.cpp
Go to the documentation of this file.
1 /*+-------------------------------------------------------------------------+
2  | MultiVehicle simulator (libmvsim) |
3  | |
4  | Copyright (C) 2014-2024 Jose Luis Blanco Claraco |
5  | Copyright (C) 2017 Borys Tymchenko (Odessa Polytechnic University) |
6  | Distributed under 3-clause BSD License |
7  | See COPYING |
8  +-------------------------------------------------------------------------+ */
9 
10 #include <mrpt/core/exceptions.h>
11 #include <mrpt/version.h>
12 #include <mvsim/Comms/Server.h>
13 #include <mvsim/Comms/common.h>
14 #if MRPT_VERSION >= 0x204
15 #include <mrpt/system/thread_name.h>
16 #endif
17 
18 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
19 #include <mvsim/mvsim-msgs/GenericAnswer.pb.h>
20 #include <mvsim/mvsim-msgs/GetServiceInfoAnswer.pb.h>
21 #include <mvsim/mvsim-msgs/ListNodesAnswer.pb.h>
22 #include <mvsim/mvsim-msgs/ListNodesRequest.pb.h>
23 #include <mvsim/mvsim-msgs/ListTopicsAnswer.pb.h>
24 #include <mvsim/mvsim-msgs/ListTopicsRequest.pb.h>
25 #include <mvsim/mvsim-msgs/RegisterNodeAnswer.pb.h>
26 #include <mvsim/mvsim-msgs/RegisterNodeRequest.pb.h>
27 #include <mvsim/mvsim-msgs/SubscribeAnswer.pb.h>
28 #include <mvsim/mvsim-msgs/SubscribeRequest.pb.h>
29 #include <mvsim/mvsim-msgs/UnregisterNodeRequest.pb.h>
30 
31 #include <zmq.hpp>
32 
33 #endif
34 
35 using namespace mvsim;
36 
37 Server::Server() : mrpt::system::COutputLogger("mvsim::Server") {}
38 
40 
42 {
43  ASSERTMSG_(!mainThread_.joinable(), "Server is already running.");
44 
45 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
47  mainThread_ = std::thread(&Server::internalServerThread, this);
48 
49 #if MRPT_VERSION >= 0x204
50  mrpt::system::thread_name("serverMain", mainThread_);
51 #endif
52 
53 #else
54  THROW_EXCEPTION("MVSIM needs building with ZMQ and PROTOBUF to enable client/server");
55 #endif
56 }
57 
58 void Server::shutdown() noexcept
59 {
60  try
61  {
62  MRPT_LOG_DEBUG_STREAM("Waiting for the thread to quit.");
64 
65  if (mainThread_.joinable()) mainThread_.join();
66 
67  MRPT_LOG_DEBUG_STREAM("Joined thread.");
68  }
69  catch (const std::exception& e)
70  {
71  MRPT_LOG_ERROR_STREAM("shutdown: Exception: " << mrpt::exception_to_str(e));
72  }
73 }
74 
76 {
77  using namespace std::string_literals;
78 
79 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
80  try
81  {
82  MRPT_LOG_INFO_STREAM("Server thread started.");
83 
84  zmq::context_t context(3);
85  mainThreadZMQcontext_ = &context;
86 
87  zmq::socket_t mainRepSocket(context, ZMQ_REP);
88  mainRepSocket.bind("tcp://*:"s + std::to_string(serverPortNo_));
89 
90  for (;;)
91  {
92  zmq::message_t request;
93 
94  // Wait for next request from client
95 #if CPPZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 1)
96  std::optional<size_t> reqSize = mainRepSocket.recv(request);
97  ASSERT_(reqSize.has_value());
98 #else
99  mainRepSocket.recv(&request);
100 #endif
101 
102  // Variant with all valid client requests:
103  using client_requests_t = std::variant<
104  mvsim_msgs::RegisterNodeRequest, mvsim_msgs::UnregisterNodeRequest,
105  mvsim_msgs::SubscribeRequest, mvsim_msgs::ListNodesRequest,
106  mvsim_msgs::ListTopicsRequest, mvsim_msgs::AdvertiseTopicRequest,
107  mvsim_msgs::AdvertiseServiceRequest, mvsim_msgs::GetServiceInfoRequest>;
108 
109  // Parse and dispatch:
110  try
111  {
112  client_requests_t req = mvsim::parseMessageVariant<client_requests_t>(request);
113 
114  std::visit(
115  overloaded{
116  [&](const auto& m) { this->handle(m, mainRepSocket); },
117  },
118  req);
119  }
120  catch (const UnexpectedMessageException& e)
121  {
122  MRPT_LOG_ERROR_STREAM(e.what());
123  }
124  }
125  }
126  catch (const zmq::error_t& e)
127  {
128  if (e.num() == ETERM)
129  {
130  // This simply means someone called requestMainThreadTermination().
131  // Just exit silently.
132  MRPT_LOG_DEBUG_STREAM("Server thread about to exit for ZMQ term signal.");
133  }
134  else
135  {
136  MRPT_LOG_ERROR_STREAM("internalServerThread: ZMQ error: " << e.what());
137  }
138  }
139  catch (const std::exception& e)
140  {
141  MRPT_LOG_ERROR_STREAM("internalServerThread: Exception: " << mrpt::exception_to_str(e));
142  }
143  MRPT_LOG_DEBUG_STREAM("Server thread quitted.");
144 
145  mainThreadZMQcontext_ = nullptr;
146 #endif
147 }
148 
150 {
151 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
152  zmq::context_t* ctx = mainThreadZMQcontext_;
153  if (ctx)
154  {
155 #if CPPZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 4, 0)
156  ctx->shutdown();
157 #else
158  // Missing shutdown() in older versions:
159  zmq_ctx_shutdown(ctx->operator void*());
160 #endif
161  }
162 #endif
163 }
164 
165 void Server::db_remove_node(const std::string& nodeName)
166 {
167  std::unique_lock lck(dbMutex);
168 
169  auto itNode = connectedNodes_.find(nodeName);
170  if (itNode == connectedNodes_.end()) return; // Nothing to do
171 
172  for (const std::string& topic : itNode->second.advertisedTopics)
173  {
174  auto itTopic = knownTopics_.find(topic);
175  if (itTopic == knownTopics_.end()) continue;
176  knownTopics_.erase(itTopic);
177  }
178 
179  // for (const std::string& topic : itNode->second.subscribedTopics) { }
180 
181  connectedNodes_.erase(itNode);
182 }
183 
184 void Server::db_register_node(const std::string& nodeName)
185 {
186  std::unique_lock lck(dbMutex);
187 
188  connectedNodes_.emplace_hint(connectedNodes_.end(), nodeName, nodeName);
189 }
190 
192  const std::string& topicName, const std::string& topicTypeName,
193  const std::string& publisherEndpoint, const std::string& nodeName)
194 {
195  std::unique_lock lck(dbMutex);
196 
197  // 1) Add as a source of this topic:
198  auto& dbTopic = knownTopics_[topicName];
199 
200  if (!dbTopic.topicTypeName.empty() && dbTopic.topicTypeName != topicTypeName)
201  {
202  throw std::runtime_error(mrpt::format(
203  "Trying to register topic `%s` [%s] but already known with type "
204  "[%s]",
205  topicName.c_str(), topicTypeName.c_str(), dbTopic.topicTypeName.c_str()));
206  }
207  dbTopic.topicName = topicName;
208  dbTopic.topicTypeName = topicTypeName;
209 
210  dbTopic.publishers.try_emplace(nodeName, topicName, nodeName, publisherEndpoint);
211 
212  // 2) If clients are already waiting for this topic, inform them so they
213  // can subscribe to this new source of data:
215 }
216 
218  const std::string& topicName, const std::string& updatesEndPoint)
219 {
220  std::unique_lock lck(dbMutex);
221 
222  auto& dbTopic = knownTopics_[topicName];
223 
224  dbTopic.subscribers.try_emplace(updatesEndPoint, topicName, updatesEndPoint);
225 
226  // Send all currently-existing publishers:
227  send_topic_publishers_to_subscribed_clients(topicName, updatesEndPoint /*only this one*/);
228 }
229 
231  const std::string& topicName, const std::optional<std::string>& updatesEndPoint)
232 {
233 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
234  auto& dbTopic = knownTopics_[topicName];
235 
236  mvsim_msgs::TopicInfo tiMsg;
237  tiMsg.set_topicname(topicName);
238  tiMsg.set_topictype(dbTopic.topicTypeName);
239 
240  for (const auto& pub : dbTopic.publishers)
241  {
242  tiMsg.add_publishername(pub.second.publisherNodeName);
243  tiMsg.add_publisherendpoint(pub.second.publisherEndpoint);
244  }
245 
246  ASSERT_(mainThreadZMQcontext_);
247 
248  auto lambdaSendToSub = [&](const std::string& subUpdtEndPoint)
249  {
250  try
251  {
252  MRPT_LOG_DEBUG_STREAM(
253  "[send_topic_publishers_to_subscribed_clients] Letting "
254  << subUpdtEndPoint << " know about " << dbTopic.publishers.size()
255  << " publishers for topic '" << topicName << "'");
256 
257  zmq::socket_t s(*mainThreadZMQcontext_, ZMQ_PAIR);
258  s.connect(subUpdtEndPoint);
259 #if CPPZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 7, 1)
260  ASSERT_(s);
261 #else
262  ASSERT_(s.connected());
263 #endif
264  sendMessage(tiMsg, s);
265 
266  mvsim_msgs::GenericAnswer ans;
267  const auto m = receiveMessage(s);
268  mvsim::parseMessage(m, ans);
269  ASSERT_(ans.success());
270  }
271  catch (const std::exception& e)
272  {
273  MRPT_LOG_ERROR_STREAM(
274  "Error sending topic updates to endpoint " << subUpdtEndPoint << ":\n"
275  << e.what());
276  }
277  };
278 
279  if (updatesEndPoint.has_value())
280  {
281  // send to this one only:
282  lambdaSendToSub(*updatesEndPoint);
283  }
284  else
285  {
286  // send to all:
287  for (const auto& ips : dbTopic.subscribers)
288  {
289  lambdaSendToSub(ips.second.subscriberUpdatesEndpoint);
290  }
291  }
292 #endif
293 }
294 
296  const std::string& serviceName, const std::string& inputTypeName,
297  const std::string& outputTypeName, const std::string& publisherEndpoint,
298  const std::string& nodeName)
299 {
300  std::unique_lock lck(dbMutex);
301 
302  // 1) Add as a source of this topic:
303  auto& dbSrv = knownServices_[serviceName];
304 
305  if (!dbSrv.inputTypeName.empty() &&
306  (dbSrv.inputTypeName != inputTypeName || dbSrv.outputTypeName != outputTypeName))
307  {
308  throw std::runtime_error(mrpt::format(
309  "Trying to register service `%s` [%s->%s] but already known "
310  "with "
311  "types "
312  "[%s->%s]",
313  serviceName.c_str(), inputTypeName.c_str(), outputTypeName.c_str(),
314  dbSrv.inputTypeName.c_str(), dbSrv.outputTypeName.c_str()));
315  }
316  dbSrv.serviceName = serviceName;
317  dbSrv.inputTypeName = inputTypeName;
318  dbSrv.outputTypeName = outputTypeName;
319  dbSrv.endpoint = publisherEndpoint;
320  dbSrv.nodeName = nodeName;
321 }
322 
324  const std::string& serviceName, std::string& publisherEndpoint, std::string& nodeName) const
325 {
326  std::shared_lock lck(dbMutex);
327 
328  // 1) Add as a source of this topic:
329  auto itSrv = knownServices_.find(serviceName);
330  if (itSrv == knownServices_.end()) return false;
331 
332  auto& dbSrv = itSrv->second;
333 
334  publisherEndpoint = dbSrv.endpoint;
335  nodeName = dbSrv.nodeName;
336 
337  return true;
338 }
339 
340 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
341 
342 // mvsim_msgs::RegisterNodeRequest
343 void Server::handle(const mvsim_msgs::RegisterNodeRequest& m, zmq::socket_t& s)
344 {
345  // Send reply back to client
346  MRPT_LOG_DEBUG_STREAM("Registering new node named '" << m.nodename() << "'");
347 
348  // Make sure we don't have already a node named like this:
349  // Don't raise an error if the name was already registered, since it
350  // might be that the same node disconnected and connected again:
351  db_remove_node(m.nodename());
352 
353  db_register_node(m.nodename());
354 
355  mvsim_msgs::RegisterNodeAnswer rna;
356  rna.set_success(true);
357  mvsim::sendMessage(rna, s);
358 }
359 
360 // mvsim_msgs::UnregisterNodeRequest
361 void Server::handle(const mvsim_msgs::UnregisterNodeRequest& m, zmq::socket_t& s)
362 {
363  // Send reply back to client
364  MRPT_LOG_DEBUG_STREAM("Unregistering node named '" << m.nodename() << "'");
365 
366  db_remove_node(m.nodename());
367 
368  mvsim_msgs::GenericAnswer rna;
369  rna.set_success(true);
370  mvsim::sendMessage(rna, s);
371 }
372 
373 // mvsim_msgs::SubscribeRequest
374 void Server::handle(const mvsim_msgs::SubscribeRequest& m, zmq::socket_t& s)
375 {
376  // Send reply back to client
377  MRPT_LOG_DEBUG_STREAM("Subscription request for topic " << m.topic() << "'");
378 
379  // Include in our DB of subscriptions:
380  // This also sends the subcriber the list of existing endpoints it must
381  // subscribe to:
382  db_add_topic_subscriber(m.topic(), m.updatesendpoint());
383 
384  mvsim_msgs::SubscribeAnswer ans;
385  ans.set_topic(m.topic());
386  ans.set_success(true);
387  mvsim::sendMessage(ans, s);
388 }
389 
390 // mvsim_msgs::GetServiceInfoRequest
391 void Server::handle(const mvsim_msgs::GetServiceInfoRequest& m, zmq::socket_t& s)
392 {
393  // Send reply back to client
394  MRPT_LOG_DEBUG_STREAM("GetServiceInfo request for service '" << m.servicename() << "'");
395 
396  mvsim_msgs::GetServiceInfoAnswer ans;
397  std::string node, endpoint;
398 
399  if (db_get_service_info(m.servicename(), endpoint, node))
400  {
401  ans.set_success(true);
402  ans.set_serviceendpoint(endpoint);
403  ans.set_servicenodename(node);
404  }
405  else
406  {
407  ans.set_success(false);
408  ans.set_errormessage(mrpt::format("Could not find service `%s`", m.servicename().c_str()));
409  }
410 
411  mvsim::sendMessage(ans, s);
412 }
413 
414 // mvsim_msgs::ListTopicsRequest
415 void Server::handle(const mvsim_msgs::ListTopicsRequest& m, zmq::socket_t& s)
416 {
417  // Send reply back to client
418  MRPT_LOG_DEBUG("Listing topics request");
419 
420  mvsim_msgs::ListTopicsAnswer ans;
421 
422  // Optional name filter:
423  const auto& queryPrefix = m.topicstartswith();
424 
425  std::shared_lock lck(dbMutex);
426 
427  for (const auto& kv : knownTopics_)
428  {
429  const auto& t = kv.second;
430  const auto& name = t.topicName;
431 
432  if (!queryPrefix.empty() || name.substr(0, queryPrefix.size()) == queryPrefix)
433  {
434  auto tInfo = ans.add_topics();
435  tInfo->set_topicname(name);
436  tInfo->set_topictype(t.topicTypeName);
437 
438  for (const auto& pubs : t.publishers)
439  {
440  tInfo->add_publishername(pubs.second.publisherNodeName);
441  tInfo->add_publisherendpoint(pubs.second.publisherEndpoint);
442  }
443  }
444  }
445  mvsim::sendMessage(ans, s);
446 }
447 
448 // mvsim_msgs::ListNodesRequest
449 void Server::handle(const mvsim_msgs::ListNodesRequest& m, zmq::socket_t& s)
450 {
451  // Send reply back to client
452  MRPT_LOG_DEBUG("Listing nodes request");
453 
454  // Optional name filter:
455  const auto& queryPrefix = m.nodestartswith();
456 
457  mvsim_msgs::ListNodesAnswer ans;
458  for (const auto& n : connectedNodes_)
459  {
460  const auto& name = n.second.nodeName;
461 
462  if (!queryPrefix.empty() || name.substr(0, queryPrefix.size()) == queryPrefix)
463  {
464  ans.add_nodes(name);
465  }
466  }
467  mvsim::sendMessage(ans, s);
468 }
469 
470 // mvsim_msgs::AdvertiseTopicRequest
471 void Server::handle(const mvsim_msgs::AdvertiseTopicRequest& m, zmq::socket_t& s)
472 {
473  // Send reply back to client
474  MRPT_LOG_DEBUG_FMT(
475  "Received new topic advertiser: `%s` [%s] @ %s (%s)", m.topicname().c_str(),
476  m.topictypename().c_str(), m.endpoint().c_str(), m.nodename().c_str());
477 
478  mvsim_msgs::GenericAnswer ans;
479  try
480  {
481  db_advertise_topic(m.topicname(), m.topictypename(), m.endpoint(), m.nodename());
482  ans.set_success(true);
483  }
484  catch (const std::exception& e)
485  {
486  ans.set_success(false);
487  ans.set_errormessage(mrpt::exception_to_str(e));
488  }
489  mvsim::sendMessage(ans, s);
490 }
491 
492 // mvsim_msgs::AdvertiseServiceRequest
493 void Server::handle(const mvsim_msgs::AdvertiseServiceRequest& m, zmq::socket_t& s)
494 {
495  // Send reply back to client
496  MRPT_LOG_DEBUG_FMT(
497  "Received new service offering: `%s` [%s->%s] @ %s (%s)", m.servicename().c_str(),
498  m.inputtypename().c_str(), m.outputtypename().c_str(), m.endpoint().c_str(),
499  m.nodename().c_str());
500 
501  mvsim_msgs::GenericAnswer ans;
502  try
503  {
505  m.servicename(), m.inputtypename(), m.outputtypename(), m.endpoint(), m.nodename());
506  ans.set_success(true);
507  }
508  catch (const std::exception& e)
509  {
510  ans.set_success(false);
511  ans.set_errormessage(mrpt::exception_to_str(e));
512  }
513  mvsim::sendMessage(ans, s);
514 }
515 
516 #endif
mvsim
Definition: Client.h:21
mvsim::Server::knownTopics_
std::map< topic_name_t, InfoPerTopic > knownTopics_
Definition: Server.h:190
mvsim::Server::requestMainThreadTermination
void requestMainThreadTermination()
Definition: Server.cpp:149
mvsim::Server::mainThread_
std::thread mainThread_
Definition: Server.h:80
s
XmlRpcServer s
mvsim::Server::db_remove_node
void db_remove_node(const std::string &nodeName)
Definition: Server.cpp:165
Server.h
mrpt
Definition: basic_types.h:36
mvsim::Server::Server
Server()
Definition: Server.cpp:37
mvsim::Server::send_topic_publishers_to_subscribed_clients
void send_topic_publishers_to_subscribed_clients(const std::string &topicName, const std::optional< std::string > &updatesEndPoint=std::nullopt)
Definition: Server.cpp:230
mvsim::Server::~Server
~Server()
Definition: Server.cpp:39
mvsim::Server::db_get_service_info
bool db_get_service_info(const std::string &serviceName, std::string &publisherEndpoint, std::string &nodeName) const
Definition: Server.cpp:323
mvsim::Server::serverPortNo_
unsigned int serverPortNo_
Definition: Server.h:214
mvsim::Server::dbMutex
std::shared_mutex dbMutex
Definition: Server.h:103
mvsim::Server::db_advertise_service
void db_advertise_service(const std::string &serviceName, const std::string &inputTypeName, const std::string &outputTypeName, const std::string &publisherEndpoint, const std::string &nodeName)
Definition: Server.cpp:295
mvsim::Server::start
void start()
Definition: Server.cpp:41
mvsim::Server::db_advertise_topic
void db_advertise_topic(const std::string &topicName, const std::string &topicTypeName, const std::string &publisherEndpoint, const std::string &nodeName)
Definition: Server.cpp:191
mvsim::Server::connectedNodes_
std::map< node_name_t, InfoPerNode > connectedNodes_
Definition: Server.h:148
mvsim::Server::db_register_node
void db_register_node(const std::string &nodeName)
Definition: Server.cpp:184
mvsim::Server::internalServerThread
void internalServerThread()
Definition: Server.cpp:75
common.h
mvsim::Server::knownServices_
std::map< service_name_t, InfoPerService > knownServices_
Definition: Server.h:210
mvsim::Server::mainThreadZMQcontext_
std::atomic< zmq::context_t * > mainThreadZMQcontext_
Definition: Server.h:81
mvsim::Server::shutdown
void shutdown() noexcept
Definition: Server.cpp:58
mvsim::Server::db_add_topic_subscriber
void db_add_topic_subscriber(const std::string &topicName, const std::string &updatesEndPoint)
Definition: Server.cpp:217
t
geometry_msgs::TransformStamped t


mvsim
Author(s):
autogenerated on Wed May 28 2025 02:13:08