Server.cpp
Go to the documentation of this file.
1 /*+-------------------------------------------------------------------------+
2  | MultiVehicle simulator (libmvsim) |
3  | |
4  | Copyright (C) 2014-2020 Jose Luis Blanco Claraco |
5  | Copyright (C) 2017 Borys Tymchenko (Odessa Polytechnic University) |
6  | Distributed under 3-clause BSD License |
7  | See COPYING |
8  +-------------------------------------------------------------------------+ */
9 
10 #include <mrpt/core/exceptions.h>
11 #include <mrpt/version.h>
12 #include <mvsim/Comms/Server.h>
13 #include <mvsim/Comms/common.h>
14 #if MRPT_VERSION >= 0x204
15 #include <mrpt/system/thread_name.h>
16 #endif
17 
18 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
19 #include <zmq.hpp>
20 
21 #include "GenericAnswer.pb.h"
22 #include "GetServiceInfoAnswer.pb.h"
23 #include "ListNodesAnswer.pb.h"
24 #include "ListNodesRequest.pb.h"
25 #include "ListTopicsAnswer.pb.h"
26 #include "ListTopicsRequest.pb.h"
27 #include "RegisterNodeAnswer.pb.h"
28 #include "RegisterNodeRequest.pb.h"
29 #include "SubscribeAnswer.pb.h"
30 #include "SubscribeRequest.pb.h"
31 #include "UnregisterNodeRequest.pb.h"
32 
33 #endif
34 
35 using namespace mvsim;
36 
37 Server::Server() : mrpt::system::COutputLogger("mvsim::Server") {}
38 
40 
42 {
43  ASSERTMSG_(!mainThread_.joinable(), "Server is already running.");
44 
45 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
47  mainThread_ = std::thread(&Server::internalServerThread, this);
48 
49 #if MRPT_VERSION >= 0x204
50  mrpt::system::thread_name("serverMain", mainThread_);
51 #endif
52 
53 #else
55  "MVSIM needs building with ZMQ and PROTOBUF to enable client/server");
56 #endif
57 }
58 
59 void Server::shutdown() noexcept
60 {
61  try
62  {
63  MRPT_LOG_DEBUG_STREAM("Waiting for the thread to quit.");
65 
66  if (mainThread_.joinable()) mainThread_.join();
67 
68  MRPT_LOG_DEBUG_STREAM("Joined thread.");
69  }
70  catch (const std::exception& e)
71  {
73  "shutdown: Exception: " << mrpt::exception_to_str(e));
74  }
75 }
76 
78 {
79  using namespace std::string_literals;
80 
81 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
82  try
83  {
84  MRPT_LOG_INFO_STREAM("Server thread started.");
85 
86  zmq::context_t context(1);
87  mainThreadZMQcontext_ = &context;
88 
89  zmq::socket_t mainRepSocket(context, ZMQ_REP);
90  mainRepSocket.bind("tcp://*:"s + std::to_string(serverPortNo_));
91 
92  for (;;)
93  {
94  zmq::message_t request;
95 
96  // Wait for next request from client
97 #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 1)
98  std::optional<size_t> reqSize = mainRepSocket.recv(request);
99  ASSERT_(reqSize.has_value());
100 #else
101  mainRepSocket.recv(&request);
102 #endif
103 
104  // Variant with all valid client requests:
105  using client_requests_t = std::variant<
106  mvsim_msgs::RegisterNodeRequest,
107  mvsim_msgs::UnregisterNodeRequest, mvsim_msgs::SubscribeRequest,
108  mvsim_msgs::ListNodesRequest, mvsim_msgs::ListTopicsRequest,
109  mvsim_msgs::AdvertiseTopicRequest,
110  mvsim_msgs::AdvertiseServiceRequest,
111  mvsim_msgs::GetServiceInfoRequest>;
112 
113  // Parse and dispatch:
114  try
115  {
116  client_requests_t req =
117  mvsim::parseMessageVariant<client_requests_t>(request);
118 
119  std::visit(
120  overloaded{
121  [&](const auto& m) { this->handle(m, mainRepSocket); },
122  },
123  req);
124  }
125  catch (const UnexpectedMessageException& e)
126  {
127  MRPT_LOG_ERROR_STREAM(e.what());
128  }
129  }
130  }
131  catch (const zmq::error_t& e)
132  {
133  if (e.num() == ETERM)
134  {
135  // This simply means someone called requestMainThreadTermination().
136  // Just exit silently.
138  "Server thread about to exit for ZMQ term signal.");
139  }
140  else
141  {
143  "internalServerThread: ZMQ error: " << e.what());
144  }
145  }
146  catch (const std::exception& e)
147  {
149  "internalServerThread: Exception: " << mrpt::exception_to_str(e));
150  }
151  MRPT_LOG_DEBUG_STREAM("Server thread quitted.");
152 
153  mainThreadZMQcontext_ = nullptr;
154 #endif
155 }
156 
158 {
159 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
160  zmq::context_t* ctx = mainThreadZMQcontext_;
161  if (ctx)
162  {
163 #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 4, 0)
164  ctx->shutdown();
165 #else
166  // Missing shutdown() in older versions:
167  zmq_ctx_shutdown(ctx->operator void*());
168 #endif
169  }
170 #endif
171 }
172 
173 void Server::db_remove_node(const std::string& nodeName)
174 {
175  std::unique_lock lck(dbMutex);
176 
177  auto itNode = connectedNodes_.find(nodeName);
178  if (itNode == connectedNodes_.end()) return; // Nothing to do
179 
180  for (const std::string& topic : itNode->second.advertisedTopics)
181  {
182  auto itTopic = knownTopics_.find(topic);
183  if (itTopic == knownTopics_.end()) continue;
184  knownTopics_.erase(itTopic);
185  }
186 
187  // for (const std::string& topic : itNode->second.subscribedTopics) { }
188 
189  connectedNodes_.erase(itNode);
190 }
191 
192 void Server::db_register_node(const std::string& nodeName)
193 {
194  std::unique_lock lck(dbMutex);
195 
196  InfoPerNode& ipn =
197  connectedNodes_.emplace_hint(connectedNodes_.end(), nodeName, nodeName)
198  ->second;
199 }
200 
202  const std::string& topicName, const std::string& topicTypeName,
203  const std::string& publisherEndpoint, const std::string& nodeName)
204 {
205  std::unique_lock lck(dbMutex);
206 
207  // 1) Add as a source of this topic:
208  auto& dbTopic = knownTopics_[topicName];
209 
210  if (!dbTopic.topicTypeName.empty() &&
211  dbTopic.topicTypeName != topicTypeName)
212  {
213  throw std::runtime_error(mrpt::format(
214  "Trying to register topic `%s` [%s] but already known with type "
215  "[%s]",
216  topicName.c_str(), topicTypeName.c_str(),
217  dbTopic.topicTypeName.c_str()));
218  }
219  dbTopic.topicName = topicName;
220  dbTopic.topicTypeName = topicTypeName;
221 
222  dbTopic.publishers.try_emplace(
223  nodeName, topicName, nodeName, publisherEndpoint);
224 
225  // 2) If clients are already waiting for this topic, inform them so they
226  // can subscribe to this new source of data:
227 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
228 #endif
229  MRPT_TODO("TO-DO");
230 }
231 
233  const std::string& topicName, const std::string& updatesEndPoint)
234 {
235  std::unique_lock lck(dbMutex);
236 
237  auto& dbTopic = knownTopics_[topicName];
238 
239  dbTopic.subscribers.try_emplace(
240  updatesEndPoint, topicName, updatesEndPoint);
241 
242  // Send all currently-existing publishers:
243 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
244  mvsim_msgs::TopicInfo tiMsg;
245  tiMsg.set_topicname(topicName);
246  tiMsg.set_topictype(dbTopic.topicTypeName);
247 
248  for (const auto& pub : dbTopic.publishers)
249  {
250  tiMsg.add_publishername(pub.second.publisherNodeName);
251  tiMsg.add_publisherendpoint(pub.second.publisherEndpoint);
252  }
253 
255  zmq::socket_t s(*mainThreadZMQcontext_, ZMQ_PAIR);
256  s.connect(updatesEndPoint);
257  ASSERT_(s.connected());
258  sendMessage(tiMsg, s);
259 
260  mvsim_msgs::GenericAnswer ans;
261  const auto m = receiveMessage(s);
262  mvsim::parseMessage(m, ans);
263  ASSERT_(ans.success());
264 
265 #endif
266 }
267 
269  const std::string& serviceName, const std::string& inputTypeName,
270  const std::string& outputTypeName, const std::string& publisherEndpoint,
271  const std::string& nodeName)
272 {
273  std::unique_lock lck(dbMutex);
274 
275  // 1) Add as a source of this topic:
276  auto& dbSrv = knownServices_[serviceName];
277 
278  if (!dbSrv.inputTypeName.empty() &&
279  (dbSrv.inputTypeName != inputTypeName ||
280  dbSrv.outputTypeName != outputTypeName))
281  {
282  throw std::runtime_error(mrpt::format(
283  "Trying to register service `%s` [%s->%s] but already known with "
284  "types "
285  "[%s->%s]",
286  serviceName.c_str(), inputTypeName.c_str(), outputTypeName.c_str(),
287  dbSrv.inputTypeName.c_str(), dbSrv.outputTypeName.c_str()));
288  }
289  dbSrv.serviceName = serviceName;
290  dbSrv.inputTypeName = inputTypeName;
291  dbSrv.outputTypeName = outputTypeName;
292  dbSrv.endpoint = publisherEndpoint;
293  dbSrv.nodeName = nodeName;
294 }
295 
297  const std::string& serviceName, std::string& publisherEndpoint,
298  std::string& nodeName) const
299 {
300  std::shared_lock lck(dbMutex);
301 
302  // 1) Add as a source of this topic:
303  auto itSrv = knownServices_.find(serviceName);
304  if (itSrv == knownServices_.end()) return false;
305 
306  auto& dbSrv = itSrv->second;
307 
308  publisherEndpoint = dbSrv.endpoint;
309  nodeName = dbSrv.nodeName;
310 
311  return true;
312 }
313 
314 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
315 
316 // mvsim_msgs::RegisterNodeRequest
317 void Server::handle(const mvsim_msgs::RegisterNodeRequest& m, zmq::socket_t& s)
318 {
319  // Send reply back to client
321  "Registering new node named '" << m.nodename() << "'");
322 
323  // Make sure we don't have already a node named like this:
324  // Don't raise an error if the name was already registered, since it might
325  // be that the same node disconnected and connected again:
326  db_remove_node(m.nodename());
327 
328  db_register_node(m.nodename());
329 
330  mvsim_msgs::RegisterNodeAnswer rna;
331  rna.set_success(true);
332  mvsim::sendMessage(rna, s);
333 }
334 
335 // mvsim_msgs::UnregisterNodeRequest
336 void Server::handle(
337  const mvsim_msgs::UnregisterNodeRequest& m, zmq::socket_t& s)
338 {
339  // Send reply back to client
340  MRPT_LOG_DEBUG_STREAM("Unregistering node named '" << m.nodename() << "'");
341 
342  db_remove_node(m.nodename());
343 
344  mvsim_msgs::GenericAnswer rna;
345  rna.set_success(true);
346  mvsim::sendMessage(rna, s);
347 }
348 
349 // mvsim_msgs::SubscribeRequest
350 void Server::handle(const mvsim_msgs::SubscribeRequest& m, zmq::socket_t& s)
351 {
352  // Send reply back to client
354  "Subscription request for topic " << m.topic() << "'");
355 
356  // Include in our DB of subscriptions:
357  // This also sends the subcriber the list of existing endpoints it must
358  // subscribe to:
359  db_add_topic_subscriber(m.topic(), m.updatesendpoint());
360 
361  mvsim_msgs::SubscribeAnswer ans;
362  ans.set_topic(m.topic());
363  ans.set_success(true);
364  mvsim::sendMessage(ans, s);
365 }
366 
367 // mvsim_msgs::GetServiceInfoRequest
368 void Server::handle(
369  const mvsim_msgs::GetServiceInfoRequest& m, zmq::socket_t& s)
370 {
371  // Send reply back to client
373  "GetServiceInfo request for service '" << m.servicename() << "'");
374 
375  mvsim_msgs::GetServiceInfoAnswer ans;
376  std::string node, endpoint;
377 
378  if (db_get_service_info(m.servicename(), endpoint, node))
379  {
380  ans.set_success(true);
381  ans.set_serviceendpoint(endpoint);
382  ans.set_servicenodename(node);
383  }
384  else
385  {
386  ans.set_success(false);
387  ans.set_errormessage(mrpt::format(
388  "Could not find service `%s`", m.servicename().c_str()));
389  }
390 
391  mvsim::sendMessage(ans, s);
392 }
393 
394 // mvsim_msgs::ListTopicsRequest
395 void Server::handle(const mvsim_msgs::ListTopicsRequest& m, zmq::socket_t& s)
396 {
397  // Send reply back to client
398  MRPT_LOG_DEBUG("Listing topics request");
399 
400  mvsim_msgs::ListTopicsAnswer ans;
401 
402  // Optional name filter:
403  const auto& queryPrefix = m.topicstartswith();
404 
405  std::shared_lock lck(dbMutex);
406 
407  for (const auto& kv : knownTopics_)
408  {
409  const auto& t = kv.second;
410  const auto& name = t.topicName;
411 
412  if (!queryPrefix.empty() ||
413  name.substr(0, queryPrefix.size()) == queryPrefix)
414  {
415  auto tInfo = ans.add_topics();
416  tInfo->set_topicname(name);
417  tInfo->set_topictype(t.topicTypeName);
418 
419  for (const auto& pubs : t.publishers)
420  {
421  tInfo->add_publishername(pubs.second.publisherNodeName);
422  tInfo->add_publisherendpoint(pubs.second.publisherEndpoint);
423  }
424  }
425  }
426  mvsim::sendMessage(ans, s);
427 }
428 
429 // mvsim_msgs::ListNodesRequest
430 void Server::handle(const mvsim_msgs::ListNodesRequest& m, zmq::socket_t& s)
431 {
432  // Send reply back to client
433  MRPT_LOG_DEBUG("Listing nodes request");
434 
435  // Optional name filter:
436  const auto& queryPrefix = m.nodestartswith();
437 
438  mvsim_msgs::ListNodesAnswer ans;
439  for (const auto& n : connectedNodes_)
440  {
441  const auto& name = n.second.nodeName;
442 
443  if (!queryPrefix.empty() ||
444  name.substr(0, queryPrefix.size()) == queryPrefix)
445  {
446  ans.add_nodes(name);
447  }
448  }
449  mvsim::sendMessage(ans, s);
450 }
451 
452 // mvsim_msgs::AdvertiseTopicRequest
453 void Server::handle(
454  const mvsim_msgs::AdvertiseTopicRequest& m, zmq::socket_t& s)
455 {
456  // Send reply back to client
458  "Received new topic advertiser: `%s` [%s] @ %s (%s)",
459  m.topicname().c_str(), m.topictypename().c_str(), m.endpoint().c_str(),
460  m.nodename().c_str());
461 
462  mvsim_msgs::GenericAnswer ans;
463  try
464  {
466  m.topicname(), m.topictypename(), m.endpoint(), m.nodename());
467  ans.set_success(true);
468  }
469  catch (const std::exception& e)
470  {
471  ans.set_success(false);
472  ans.set_errormessage(mrpt::exception_to_str(e));
473  }
474  mvsim::sendMessage(ans, s);
475 }
476 
477 // mvsim_msgs::AdvertiseServiceRequest
478 void Server::handle(
479  const mvsim_msgs::AdvertiseServiceRequest& m, zmq::socket_t& s)
480 {
481  // Send reply back to client
483  "Received new service offering: `%s` [%s->%s] @ %s (%s)",
484  m.servicename().c_str(), m.inputtypename().c_str(),
485  m.outputtypename().c_str(), m.endpoint().c_str(), m.nodename().c_str());
486 
487  mvsim_msgs::GenericAnswer ans;
488  try
489  {
491  m.servicename(), m.inputtypename(), m.outputtypename(),
492  m.endpoint(), m.nodename());
493  ans.set_success(true);
494  }
495  catch (const std::exception& e)
496  {
497  ans.set_success(false);
498  ans.set_errormessage(mrpt::exception_to_str(e));
499  }
500  mvsim::sendMessage(ans, s);
501 }
502 
503 #endif
void db_advertise_topic(const std::string &topicName, const std::string &topicTypeName, const std::string &publisherEndpoint, const std::string &nodeName)
Definition: Server.cpp:201
std::map< node_name_t, InfoPerNode > connectedNodes_
Definition: Server.h:143
void db_advertise_service(const std::string &serviceName, const std::string &inputTypeName, const std::string &outputTypeName, const std::string &publisherEndpoint, const std::string &nodeName)
Definition: Server.cpp:268
#define THROW_EXCEPTION(msg)
GLdouble GLdouble t
std::map< service_name_t, InfoPerService > knownServices_
Definition: Server.h:210
void internalServerThread()
Definition: Server.cpp:77
void db_remove_node(const std::string &nodeName)
Definition: Server.cpp:173
#define MRPT_LOG_INFO_STREAM(__CONTENTS)
#define MRPT_LOG_DEBUG(_STRING)
std::shared_mutex dbMutex
Definition: Server.h:103
MRPT_TODO("Modify ping to run on Windows + Test this")
#define MRPT_LOG_DEBUG_STREAM(__CONTENTS)
GLdouble s
#define MRPT_LOG_DEBUG_FMT(_FMT_STRING,...)
GLsizei n
std::thread mainThread_
Definition: Server.h:80
std::atomic< zmq::context_t * > mainThreadZMQcontext_
Definition: Server.h:81
std::string BASE_IMPEXP format(const char *fmt,...) MRPT_printf_format_check(1
GLuint const GLchar * name
std::map< topic_name_t, InfoPerTopic > knownTopics_
Definition: Server.h:190
void requestMainThreadTermination()
Definition: Server.cpp:157
bool db_get_service_info(const std::string &serviceName, std::string &publisherEndpoint, std::string &nodeName) const
Definition: Server.cpp:296
void start()
Definition: Server.cpp:41
void shutdown() noexcept
Definition: Server.cpp:59
unsigned int serverPortNo_
Definition: Server.h:214
void db_add_topic_subscriber(const std::string &topicName, const std::string &updatesEndPoint)
Definition: Server.cpp:232
#define ASSERT_(f)
const GLdouble * m
void db_register_node(const std::string &nodeName)
Definition: Server.cpp:192
#define MRPT_LOG_ERROR_STREAM(__CONTENTS)
#define ASSERTMSG_(f, __ERROR_MSG)


mvsim
Author(s):
autogenerated on Fri May 7 2021 03:05:51