10 #include <mrpt/core/exceptions.h> 11 #include <mrpt/version.h> 14 #if MRPT_VERSION >= 0x204 15 #include <mrpt/system/thread_name.h> 18 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 19 #include <mvsim/mvsim-msgs/GenericAnswer.pb.h> 20 #include <mvsim/mvsim-msgs/GetServiceInfoAnswer.pb.h> 21 #include <mvsim/mvsim-msgs/ListNodesAnswer.pb.h> 22 #include <mvsim/mvsim-msgs/ListNodesRequest.pb.h> 23 #include <mvsim/mvsim-msgs/ListTopicsAnswer.pb.h> 24 #include <mvsim/mvsim-msgs/ListTopicsRequest.pb.h> 25 #include <mvsim/mvsim-msgs/RegisterNodeAnswer.pb.h> 26 #include <mvsim/mvsim-msgs/RegisterNodeRequest.pb.h> 27 #include <mvsim/mvsim-msgs/SubscribeAnswer.pb.h> 28 #include <mvsim/mvsim-msgs/SubscribeRequest.pb.h> 29 #include <mvsim/mvsim-msgs/UnregisterNodeRequest.pb.h> 35 using namespace mvsim;
43 ASSERTMSG_(!
mainThread_.joinable(),
"Server is already running.");
45 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 49 #if MRPT_VERSION >= 0x204 50 mrpt::system::thread_name(
"serverMain",
mainThread_);
55 "MVSIM needs building with ZMQ and PROTOBUF to enable client/server");
63 MRPT_LOG_DEBUG_STREAM(
"Waiting for the thread to quit.");
68 MRPT_LOG_DEBUG_STREAM(
"Joined thread.");
70 catch (
const std::exception& e)
72 MRPT_LOG_ERROR_STREAM(
73 "shutdown: Exception: " << mrpt::exception_to_str(e));
79 using namespace std::string_literals;
81 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 84 MRPT_LOG_INFO_STREAM(
"Server thread started.");
86 zmq::context_t context(3);
89 zmq::socket_t mainRepSocket(context, ZMQ_REP);
94 zmq::message_t request;
97 #if CPPZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 1) 98 std::optional<size_t> reqSize = mainRepSocket.recv(request);
99 ASSERT_(reqSize.has_value());
101 mainRepSocket.recv(&request);
105 using client_requests_t = std::variant<
106 mvsim_msgs::RegisterNodeRequest,
107 mvsim_msgs::UnregisterNodeRequest, mvsim_msgs::SubscribeRequest,
108 mvsim_msgs::ListNodesRequest, mvsim_msgs::ListTopicsRequest,
109 mvsim_msgs::AdvertiseTopicRequest,
110 mvsim_msgs::AdvertiseServiceRequest,
111 mvsim_msgs::GetServiceInfoRequest>;
116 client_requests_t req =
117 mvsim::parseMessageVariant<client_requests_t>(request);
121 [&](
const auto& m) { this->handle(m, mainRepSocket); },
125 catch (
const UnexpectedMessageException& e)
127 MRPT_LOG_ERROR_STREAM(e.what());
131 catch (
const zmq::error_t& e)
133 if (e.num() == ETERM)
137 MRPT_LOG_DEBUG_STREAM(
138 "Server thread about to exit for ZMQ term signal.");
142 MRPT_LOG_ERROR_STREAM(
143 "internalServerThread: ZMQ error: " << e.what());
146 catch (
const std::exception& e)
148 MRPT_LOG_ERROR_STREAM(
149 "internalServerThread: Exception: " << mrpt::exception_to_str(e));
151 MRPT_LOG_DEBUG_STREAM(
"Server thread quitted.");
159 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 163 #if CPPZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 4, 0) 167 zmq_ctx_shutdown(ctx->operator
void*());
180 for (
const std::string& topic : itNode->second.advertisedTopics)
200 const std::string& topicName,
const std::string& topicTypeName,
201 const std::string& publisherEndpoint,
const std::string& nodeName)
208 if (!dbTopic.topicTypeName.empty() &&
209 dbTopic.topicTypeName != topicTypeName)
211 throw std::runtime_error(mrpt::format(
212 "Trying to register topic `%s` [%s] but already known with type " 214 topicName.c_str(), topicTypeName.c_str(),
215 dbTopic.topicTypeName.c_str()));
217 dbTopic.topicName = topicName;
218 dbTopic.topicTypeName = topicTypeName;
220 dbTopic.publishers.try_emplace(
221 nodeName, topicName, nodeName, publisherEndpoint);
229 const std::string& topicName,
const std::string& updatesEndPoint)
235 dbTopic.subscribers.try_emplace(
236 updatesEndPoint, topicName, updatesEndPoint);
240 topicName, updatesEndPoint );
244 const std::string& topicName,
245 const std::optional<std::string>& updatesEndPoint)
247 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 250 mvsim_msgs::TopicInfo tiMsg;
251 tiMsg.set_topicname(topicName);
252 tiMsg.set_topictype(dbTopic.topicTypeName);
254 for (
const auto& pub : dbTopic.publishers)
256 tiMsg.add_publishername(pub.second.publisherNodeName);
257 tiMsg.add_publisherendpoint(pub.second.publisherEndpoint);
262 auto lambdaSendToSub = [&](
const std::string& subUpdtEndPoint) {
265 MRPT_LOG_DEBUG_STREAM(
266 "[send_topic_publishers_to_subscribed_clients] Letting " 267 << subUpdtEndPoint <<
" know about " 268 << dbTopic.publishers.size() <<
" publishers for topic '" 269 << topicName <<
"'");
272 s.connect(subUpdtEndPoint);
273 #if CPPZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 7, 1) 276 ASSERT_(s.connected());
278 sendMessage(tiMsg, s);
280 mvsim_msgs::GenericAnswer ans;
281 const auto m = receiveMessage(s);
282 mvsim::parseMessage(m, ans);
283 ASSERT_(ans.success());
285 catch (
const std::exception& e)
287 MRPT_LOG_ERROR_STREAM(
288 "Error sending topic updates to endpoint " << subUpdtEndPoint
294 if (updatesEndPoint.has_value())
297 lambdaSendToSub(*updatesEndPoint);
302 for (
const auto& ips : dbTopic.subscribers)
304 lambdaSendToSub(ips.second.subscriberUpdatesEndpoint);
311 const std::string& serviceName,
const std::string& inputTypeName,
312 const std::string& outputTypeName,
const std::string& publisherEndpoint,
313 const std::string& nodeName)
320 if (!dbSrv.inputTypeName.empty() &&
321 (dbSrv.inputTypeName != inputTypeName ||
322 dbSrv.outputTypeName != outputTypeName))
324 throw std::runtime_error(mrpt::format(
325 "Trying to register service `%s` [%s->%s] but already known " 329 serviceName.c_str(), inputTypeName.c_str(), outputTypeName.c_str(),
330 dbSrv.inputTypeName.c_str(), dbSrv.outputTypeName.c_str()));
332 dbSrv.serviceName = serviceName;
333 dbSrv.inputTypeName = inputTypeName;
334 dbSrv.outputTypeName = outputTypeName;
335 dbSrv.endpoint = publisherEndpoint;
336 dbSrv.nodeName = nodeName;
340 const std::string& serviceName, std::string& publisherEndpoint,
341 std::string& nodeName)
const 349 auto& dbSrv = itSrv->second;
351 publisherEndpoint = dbSrv.endpoint;
352 nodeName = dbSrv.nodeName;
357 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF) 360 void Server::handle(
const mvsim_msgs::RegisterNodeRequest& m, zmq::socket_t&
s)
363 MRPT_LOG_DEBUG_STREAM(
364 "Registering new node named '" << m.nodename() <<
"'");
373 mvsim_msgs::RegisterNodeAnswer rna;
374 rna.set_success(
true);
375 mvsim::sendMessage(rna, s);
380 const mvsim_msgs::UnregisterNodeRequest& m, zmq::socket_t&
s)
383 MRPT_LOG_DEBUG_STREAM(
"Unregistering node named '" << m.nodename() <<
"'");
387 mvsim_msgs::GenericAnswer rna;
388 rna.set_success(
true);
389 mvsim::sendMessage(rna, s);
393 void Server::handle(
const mvsim_msgs::SubscribeRequest& m, zmq::socket_t&
s)
396 MRPT_LOG_DEBUG_STREAM(
397 "Subscription request for topic " << m.topic() <<
"'");
404 mvsim_msgs::SubscribeAnswer ans;
405 ans.set_topic(m.topic());
406 ans.set_success(
true);
407 mvsim::sendMessage(ans, s);
412 const mvsim_msgs::GetServiceInfoRequest& m, zmq::socket_t&
s)
415 MRPT_LOG_DEBUG_STREAM(
416 "GetServiceInfo request for service '" << m.servicename() <<
"'");
418 mvsim_msgs::GetServiceInfoAnswer ans;
419 std::string node, endpoint;
423 ans.set_success(
true);
424 ans.set_serviceendpoint(endpoint);
425 ans.set_servicenodename(node);
429 ans.set_success(
false);
430 ans.set_errormessage(mrpt::format(
431 "Could not find service `%s`", m.servicename().c_str()));
434 mvsim::sendMessage(ans, s);
438 void Server::handle(
const mvsim_msgs::ListTopicsRequest& m, zmq::socket_t&
s)
441 MRPT_LOG_DEBUG(
"Listing topics request");
443 mvsim_msgs::ListTopicsAnswer ans;
446 const auto& queryPrefix = m.topicstartswith();
452 const auto&
t = kv.second;
453 const auto& name =
t.topicName;
455 if (!queryPrefix.empty() ||
456 name.substr(0, queryPrefix.size()) == queryPrefix)
458 auto tInfo = ans.add_topics();
459 tInfo->set_topicname(name);
460 tInfo->set_topictype(
t.topicTypeName);
462 for (
const auto& pubs :
t.publishers)
464 tInfo->add_publishername(pubs.second.publisherNodeName);
465 tInfo->add_publisherendpoint(pubs.second.publisherEndpoint);
469 mvsim::sendMessage(ans, s);
473 void Server::handle(
const mvsim_msgs::ListNodesRequest& m, zmq::socket_t&
s)
476 MRPT_LOG_DEBUG(
"Listing nodes request");
479 const auto& queryPrefix = m.nodestartswith();
481 mvsim_msgs::ListNodesAnswer ans;
484 const auto& name = n.second.nodeName;
486 if (!queryPrefix.empty() ||
487 name.substr(0, queryPrefix.size()) == queryPrefix)
492 mvsim::sendMessage(ans, s);
497 const mvsim_msgs::AdvertiseTopicRequest& m, zmq::socket_t&
s)
501 "Received new topic advertiser: `%s` [%s] @ %s (%s)",
502 m.topicname().c_str(), m.topictypename().c_str(), m.endpoint().c_str(),
503 m.nodename().c_str());
505 mvsim_msgs::GenericAnswer ans;
509 m.topicname(), m.topictypename(), m.endpoint(), m.nodename());
510 ans.set_success(
true);
512 catch (
const std::exception& e)
514 ans.set_success(
false);
515 ans.set_errormessage(mrpt::exception_to_str(e));
517 mvsim::sendMessage(ans, s);
522 const mvsim_msgs::AdvertiseServiceRequest& m, zmq::socket_t&
s)
526 "Received new service offering: `%s` [%s->%s] @ %s (%s)",
527 m.servicename().c_str(), m.inputtypename().c_str(),
528 m.outputtypename().c_str(), m.endpoint().c_str(), m.nodename().c_str());
530 mvsim_msgs::GenericAnswer ans;
534 m.servicename(), m.inputtypename(), m.outputtypename(),
535 m.endpoint(), m.nodename());
536 ans.set_success(
true);
538 catch (
const std::exception& e)
540 ans.set_success(
false);
541 ans.set_errormessage(mrpt::exception_to_str(e));
543 mvsim::sendMessage(ans, s);
void db_advertise_topic(const std::string &topicName, const std::string &topicTypeName, const std::string &publisherEndpoint, const std::string &nodeName)
std::map< node_name_t, InfoPerNode > connectedNodes_
void db_advertise_service(const std::string &serviceName, const std::string &inputTypeName, const std::string &outputTypeName, const std::string &publisherEndpoint, const std::string &nodeName)
std::map< service_name_t, InfoPerService > knownServices_
bool db_get_service_info(const std::string &serviceName, std::string &publisherEndpoint, std::string &nodeName) const
void internalServerThread()
void db_remove_node(const std::string &nodeName)
std::shared_mutex dbMutex
geometry_msgs::TransformStamped t
std::atomic< zmq::context_t * > mainThreadZMQcontext_
std::map< topic_name_t, InfoPerTopic > knownTopics_
void requestMainThreadTermination()
unsigned int serverPortNo_
void db_add_topic_subscriber(const std::string &topicName, const std::string &updatesEndPoint)
void db_register_node(const std::string &nodeName)
void send_topic_publishers_to_subscribed_clients(const std::string &topicName, const std::optional< std::string > &updatesEndPoint=std::nullopt)