Server.h
Go to the documentation of this file.
1 /*+-------------------------------------------------------------------------+
2  | MultiVehicle simulator (libmvsim) |
3  | |
4  | Copyright (C) 2014-2023 Jose Luis Blanco Claraco |
5  | Copyright (C) 2017 Borys Tymchenko (Odessa Polytechnic University) |
6  | Distributed under 3-clause BSD License |
7  | See COPYING |
8  +-------------------------------------------------------------------------+ */
9 
10 #pragma once
11 
12 #include <mrpt/core/Clock.h>
13 #include <mrpt/system/COutputLogger.h>
14 #include <mvsim/Comms/ports.h>
15 #include <mvsim/Comms/zmq_fwrds.h>
16 
17 #include <atomic>
18 #include <set>
19 #include <shared_mutex> // read/write mutex
20 #include <thread>
21 
22 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
23 
24 #include <mvsim/mvsim-msgs/AdvertiseServiceRequest.pb.h>
25 #include <mvsim/mvsim-msgs/AdvertiseTopicRequest.pb.h>
26 #include <mvsim/mvsim-msgs/GetServiceInfoRequest.pb.h>
27 #include <mvsim/mvsim-msgs/ListNodesRequest.pb.h>
28 #include <mvsim/mvsim-msgs/ListTopicsRequest.pb.h>
29 #include <mvsim/mvsim-msgs/RegisterNodeRequest.pb.h>
30 #include <mvsim/mvsim-msgs/SubscribeRequest.pb.h>
31 #include <mvsim/mvsim-msgs/UnregisterNodeRequest.pb.h>
32 
33 #endif
34 
35 namespace mvsim
36 {
40 class World;
41 
60 class Server : public mrpt::system::COutputLogger
61 {
62  public:
63  Server();
64  ~Server();
65 
69  void start();
70 
73  void shutdown() noexcept;
76  unsigned int listenningPort() const { return serverPortNo_; }
77  void listenningPort(unsigned int port) { serverPortNo_ = port; }
78 
79  private:
80  std::thread mainThread_;
81  std::atomic<zmq::context_t*> mainThreadZMQcontext_ = nullptr;
83 
84  void internalServerThread();
85 
86  // ========= Message handlers ========
87 #if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
88  void handle(const mvsim_msgs::RegisterNodeRequest& m, zmq::socket_t& s);
89  void handle(const mvsim_msgs::UnregisterNodeRequest& m, zmq::socket_t& s);
90  void handle(const mvsim_msgs::SubscribeRequest& m, zmq::socket_t& s);
91  void handle(const mvsim_msgs::ListTopicsRequest& m, zmq::socket_t& s);
92  void handle(const mvsim_msgs::ListNodesRequest& m, zmq::socket_t& s);
93  void handle(const mvsim_msgs::AdvertiseTopicRequest& m, zmq::socket_t& s);
94  void handle(const mvsim_msgs::AdvertiseServiceRequest& m, zmq::socket_t& s);
95  void handle(const mvsim_msgs::GetServiceInfoRequest& m, zmq::socket_t& s);
96 #endif
97 
103  mutable std::shared_mutex dbMutex;
104 
106  void db_remove_node(const std::string& nodeName);
107 
109  void db_register_node(const std::string& nodeName);
110 
112  void db_advertise_topic(
113  const std::string& topicName, const std::string& topicTypeName,
114  const std::string& publisherEndpoint, const std::string& nodeName);
115 
118  const std::string& serviceName, const std::string& inputTypeName,
119  const std::string& outputTypeName, const std::string& publisherEndpoint,
120  const std::string& nodeName);
121 
123  bool db_get_service_info(
124  const std::string& serviceName, std::string& publisherEndpoint,
125  std::string& nodeName) const;
126 
128  const std::string& topicName, const std::string& updatesEndPoint);
129 
133  const std::string& topicName,
134  const std::optional<std::string>& updatesEndPoint = std::nullopt);
135 
136  struct InfoPerNode
137  {
138  InfoPerNode(const std::string& name) : nodeName(name) {}
139 
140  const std::string nodeName;
141 
143  const mrpt::Clock::time_point timeConnected = mrpt::Clock::now();
144 
145  std::set<std::string> advertisedTopics;
146  std::set<std::string> subscribedTopics;
147  };
148  using node_name_t = std::string;
149  std::map<node_name_t, InfoPerNode> connectedNodes_;
150 
151  using endpoint_t = std::string;
152 
154  {
156  const std::string& topic_name,
157  const std::string& publisher_node_name,
158  const std::string& publisher_endpoint)
159  : topicName(topic_name),
160  publisherNodeName(publisher_node_name),
161  publisherEndpoint(publisher_endpoint)
162  {
163  }
164  const std::string topicName;
165  const std::string publisherNodeName;
167  };
168 
170  {
172  const std::string& topic_name,
173  const std::string& sub_updates_endpoint)
174  : topicName(topic_name),
175  subscriberUpdatesEndpoint(sub_updates_endpoint)
176  {
177  }
178  const std::string topicName;
180  };
181 
183  {
184  InfoPerTopic() = default;
186  const std::string& name, const std::string& topic_type_name)
187  : topicName(name), topicTypeName(topic_type_name)
188  {
189  }
190  std::string topicName, topicTypeName;
191 
192  std::map<node_name_t, InfoPerPublisher> publishers;
193  std::map<endpoint_t, InfoPerSubscriber> subscribers;
194  };
195  using topic_name_t = std::string;
196  std::map<topic_name_t, InfoPerTopic> knownTopics_;
197 
199  {
200  InfoPerService() = default;
202  const std::string& name, const std::string& in_type_name,
203  const std::string& out_type_name, const std::string& end_point,
204  const std::string& node_name)
205  : serviceName(name),
206  inputTypeName(in_type_name),
207  outputTypeName(out_type_name),
208  endpoint(end_point),
209  nodeName(node_name)
210  {
211  }
212  std::string serviceName, inputTypeName, outputTypeName;
213  std::string endpoint, nodeName;
214  };
215  using service_name_t = std::string;
216  std::map<service_name_t, InfoPerService> knownServices_;
217 
221 };
224 } // namespace mvsim
const std::string nodeName
Definition: Server.h:140
void db_advertise_topic(const std::string &topicName, const std::string &topicTypeName, const std::string &publisherEndpoint, const std::string &nodeName)
Definition: Server.cpp:199
std::set< std::string > subscribedTopics
Definition: Server.h:146
const std::string topicName
Definition: Server.h:164
std::map< node_name_t, InfoPerNode > connectedNodes_
Definition: Server.h:149
InfoPerNode(const std::string &name)
Definition: Server.h:138
InfoPerService(const std::string &name, const std::string &in_type_name, const std::string &out_type_name, const std::string &end_point, const std::string &node_name)
Definition: Server.h:201
void db_advertise_service(const std::string &serviceName, const std::string &inputTypeName, const std::string &outputTypeName, const std::string &publisherEndpoint, const std::string &nodeName)
Definition: Server.cpp:310
const endpoint_t subscriberUpdatesEndpoint
Definition: Server.h:179
const mrpt::Clock::time_point timeConnected
Definition: Server.h:143
std::string service_name_t
Definition: Server.h:215
std::map< service_name_t, InfoPerService > knownServices_
Definition: Server.h:216
bool db_get_service_info(const std::string &serviceName, std::string &publisherEndpoint, std::string &nodeName) const
Definition: Server.cpp:339
void internalServerThread()
Definition: Server.cpp:77
std::string node_name_t
Definition: Server.h:148
void db_remove_node(const std::string &nodeName)
Definition: Server.cpp:173
std::shared_mutex dbMutex
Definition: Server.h:103
unsigned int listenningPort() const
Definition: Server.h:76
InfoPerTopic(const std::string &name, const std::string &topic_type_name)
Definition: Server.h:185
std::string topic_name_t
Definition: Server.h:195
void listenningPort(unsigned int port)
Definition: Server.h:77
InfoPerSubscriber(const std::string &topic_name, const std::string &sub_updates_endpoint)
Definition: Server.h:171
std::thread mainThread_
Definition: Server.h:80
std::map< node_name_t, InfoPerPublisher > publishers
Definition: Server.h:192
const std::string publisherNodeName
Definition: Server.h:165
std::atomic< zmq::context_t * > mainThreadZMQcontext_
Definition: Server.h:81
std::map< topic_name_t, InfoPerTopic > knownTopics_
Definition: Server.h:196
void requestMainThreadTermination()
Definition: Server.cpp:157
void start()
Definition: Server.cpp:41
std::set< std::string > advertisedTopics
Definition: Server.h:145
void shutdown() noexcept
Definition: Server.cpp:59
unsigned int serverPortNo_
Definition: Server.h:220
void db_add_topic_subscriber(const std::string &topicName, const std::string &updatesEndPoint)
Definition: Server.cpp:228
constexpr unsigned int MVSIM_PORTNO_MAIN_REP
Definition: ports.h:17
const endpoint_t publisherEndpoint
Definition: Server.h:166
std::string endpoint_t
Definition: Server.h:151
void db_register_node(const std::string &nodeName)
Definition: Server.cpp:192
void send_topic_publishers_to_subscribed_clients(const std::string &topicName, const std::optional< std::string > &updatesEndPoint=std::nullopt)
Definition: Server.cpp:243
std::string topicTypeName
Definition: Server.h:190
std::map< endpoint_t, InfoPerSubscriber > subscribers
Definition: Server.h:193
const std::string topicName
Definition: Server.h:178
InfoPerPublisher(const std::string &topic_name, const std::string &publisher_node_name, const std::string &publisher_endpoint)
Definition: Server.h:155


mvsim
Author(s):
autogenerated on Tue Jul 4 2023 03:08:21