websocket_server.hpp
Go to the documentation of this file.
1 #pragma once
2 
3 #include <algorithm>
4 #include <chrono>
5 #include <cstdint>
6 #include <functional>
7 #include <map>
8 #include <memory>
9 #include <optional>
10 #include <shared_mutex>
11 #include <string_view>
12 #include <thread>
13 #include <unordered_map>
14 #include <unordered_set>
15 #include <vector>
16 
17 #include <nlohmann/json.hpp>
18 #include <websocketpp/config/asio.hpp>
19 #include <websocketpp/server.hpp>
20 
21 #include "callback_queue.hpp"
22 #include "common.hpp"
23 #include "parameter.hpp"
24 #include "regex_utils.hpp"
25 #include "serialization.hpp"
26 #include "server_interface.hpp"
27 #include "websocket_logging.hpp"
28 
29 // Debounce a function call (tied to the line number)
30 // This macro takes in a function and the debounce time in milliseconds
31 #define FOXGLOVE_DEBOUNCE(f, ms) \
32  { \
33  static auto last_call = std::chrono::system_clock::now(); \
34  const auto now = std::chrono::system_clock::now(); \
35  if (std::chrono::duration_cast<std::chrono::milliseconds>(now - last_call).count() > ms) { \
36  last_call = now; \
37  f(); \
38  } \
39  }
40 
41 namespace foxglove {
42 
44 
45 using ConnHandle = websocketpp::connection_hdl;
46 using OpCode = websocketpp::frame::opcode::value;
47 
48 static const websocketpp::log::level APP = websocketpp::log::alevel::app;
49 static const websocketpp::log::level WARNING = websocketpp::log::elevel::warn;
50 static const websocketpp::log::level RECOVERABLE = websocketpp::log::elevel::rerror;
51 
52 constexpr uint32_t Integer(const std::string_view str) {
53  uint32_t result = 0x811C9DC5; // FNV-1a 32-bit algorithm
54  for (char c : str) {
55  result = (static_cast<uint32_t>(c) ^ result) * 0x01000193;
56  }
57  return result;
58 }
59 
61 const std::unordered_map<std::string, std::string> CAPABILITY_BY_CLIENT_OPERATION = {
62  // {"subscribe", }, // No required capability.
63  // {"unsubscribe", }, // No required capability.
64  {"advertise", CAPABILITY_CLIENT_PUBLISH},
65  {"unadvertise", CAPABILITY_CLIENT_PUBLISH},
66  {"getParameters", CAPABILITY_PARAMETERS},
67  {"setParameters", CAPABILITY_PARAMETERS},
68  {"subscribeParameterUpdates", CAPABILITY_PARAMETERS_SUBSCRIBE},
69  {"unsubscribeParameterUpdates", CAPABILITY_PARAMETERS_SUBSCRIBE},
70  {"subscribeConnectionGraph", CAPABILITY_CONNECTION_GRAPH},
71  {"unsubscribeConnectionGraph", CAPABILITY_CONNECTION_GRAPH},
72 };
73 
75 const std::unordered_map<ClientBinaryOpcode, std::string> CAPABILITY_BY_CLIENT_BINARY_OPERATION = {
78 };
79 
80 enum class StatusLevel : uint8_t {
81  Info = 0,
82  Warning = 1,
83  Error = 2,
84 };
85 
86 constexpr websocketpp::log::level StatusLevelToLogLevel(StatusLevel level) {
87  switch (level) {
88  case StatusLevel::Info:
89  return APP;
91  return WARNING;
92  case StatusLevel::Error:
93  return RECOVERABLE;
94  default:
95  return RECOVERABLE;
96  }
97 }
98 
99 template <typename ServerConfiguration>
100 class Server final : public ServerInterface<ConnHandle> {
101 public:
102  using ServerType = websocketpp::server<ServerConfiguration>;
103  using ConnectionType = websocketpp::connection<ServerConfiguration>;
104  using MessagePtr = typename ServerType::message_ptr;
105  using Tcp = websocketpp::lib::asio::ip::tcp;
106 
107  explicit Server(std::string name, LogCallback logger, const ServerOptions& options);
108  virtual ~Server();
109 
110  Server(const Server&) = delete;
111  Server(Server&&) = delete;
112  Server& operator=(const Server&) = delete;
113  Server& operator=(Server&&) = delete;
114 
115  void start(const std::string& host, uint16_t port) override;
116  void stop() override;
117 
118  std::vector<ChannelId> addChannels(const std::vector<ChannelWithoutId>& channels) override;
119  void removeChannels(const std::vector<ChannelId>& channelIds) override;
120  void publishParameterValues(ConnHandle clientHandle, const std::vector<Parameter>& parameters,
121  const std::optional<std::string>& requestId = std::nullopt) override;
122  void updateParameterValues(const std::vector<Parameter>& parameters) override;
123  std::vector<ServiceId> addServices(const std::vector<ServiceWithoutId>& services) override;
124  void removeServices(const std::vector<ServiceId>& serviceIds) override;
125 
126  void setHandlers(ServerHandlers<ConnHandle>&& handlers) override;
127 
128  void sendMessage(ConnHandle clientHandle, ChannelId chanId, uint64_t timestamp,
129  const uint8_t* payload, size_t payloadSize) override;
130  void broadcastTime(uint64_t timestamp) override;
131  void sendServiceResponse(ConnHandle clientHandle, const ServiceResponse& response) override;
132  void updateConnectionGraph(const MapOfSets& publishedTopics, const MapOfSets& subscribedTopics,
133  const MapOfSets& advertisedServices) override;
134 
135  uint16_t getPort() override;
136  std::string remoteEndpointString(ConnHandle clientHandle) override;
137 
138 private:
139  struct ClientInfo {
140  std::string name;
142  std::unordered_map<ChannelId, SubscriptionId> subscriptionsByChannel;
143  std::unordered_set<ClientChannelId> advertisedChannels;
144  bool subscribedToConnectionGraph = false;
145 
146  explicit ClientInfo(const std::string& name, ConnHandle handle)
147  : name(name)
148  , handle(handle) {}
149 
150  ClientInfo(const ClientInfo&) = delete;
151  ClientInfo& operator=(const ClientInfo&) = delete;
152 
153  ClientInfo(ClientInfo&&) = default;
154  ClientInfo& operator=(ClientInfo&&) = default;
155  };
156 
157  std::string _name;
161  std::unique_ptr<std::thread> _serverThread;
162  std::unique_ptr<CallbackQueue> _handlerCallbackQueue;
163 
164  uint32_t _nextChannelId = 0;
165  std::map<ConnHandle, ClientInfo, std::owner_less<>> _clients;
166  std::unordered_map<ChannelId, Channel> _channels;
167  std::map<ConnHandle, std::unordered_map<ClientChannelId, ClientAdvertisement>, std::owner_less<>>
169  std::map<ConnHandle, std::unordered_set<std::string>, std::owner_less<>>
171  ServiceId _nextServiceId = 0;
172  std::unordered_map<ServiceId, ServiceWithoutId> _services;
174  std::shared_mutex _clientsMutex;
175  std::shared_mutex _channelsMutex;
176  std::shared_mutex _clientChannelsMutex;
177  std::shared_mutex _servicesMutex;
179 
180  struct {
181  int subscriptionCount = 0;
185  } _connectionGraph;
186  std::shared_mutex _connectionGraphMutex;
187 
188  void setupTlsHandler();
189  void socketInit(ConnHandle hdl);
190  bool validateConnection(ConnHandle hdl);
191  void handleConnectionOpened(ConnHandle hdl);
192  void handleConnectionClosed(ConnHandle hdl);
193  void handleMessage(ConnHandle hdl, MessagePtr msg);
194  void handleTextMessage(ConnHandle hdl, MessagePtr msg);
195  void handleBinaryMessage(ConnHandle hdl, MessagePtr msg);
196 
197  void sendJson(ConnHandle hdl, json&& payload);
198  void sendJsonRaw(ConnHandle hdl, const std::string& payload);
199  void sendBinary(ConnHandle hdl, const uint8_t* payload, size_t payloadSize);
200  void sendStatusAndLogMsg(ConnHandle clientHandle, const StatusLevel level,
201  const std::string& message);
202  void unsubscribeParamsWithoutSubscriptions(ConnHandle hdl,
203  const std::unordered_set<std::string>& paramNames);
204  bool isParameterSubscribed(const std::string& paramName) const;
205  bool hasCapability(const std::string& capability) const;
206 };
207 
208 template <typename ServerConfiguration>
209 inline Server<ServerConfiguration>::Server(std::string name, LogCallback logger,
210  const ServerOptions& options)
211  : _name(std::move(name))
212  , _logger(logger)
213  , _options(options) {
214  // Redirect logging
215  _server.get_alog().set_callback(_logger);
216  _server.get_elog().set_callback(_logger);
217 
218  std::error_code ec;
219  _server.init_asio(ec);
220  if (ec) {
221  throw std::runtime_error("Failed to initialize websocket server: " + ec.message());
222  }
223 
224  _server.clear_access_channels(websocketpp::log::alevel::all);
225  _server.set_access_channels(APP);
226  _server.set_tcp_pre_init_handler(std::bind(&Server::socketInit, this, std::placeholders::_1));
227  this->setupTlsHandler();
228  _server.set_validate_handler(std::bind(&Server::validateConnection, this, std::placeholders::_1));
229  _server.set_open_handler(std::bind(&Server::handleConnectionOpened, this, std::placeholders::_1));
230  _server.set_close_handler(
231  std::bind(&Server::handleConnectionClosed, this, std::placeholders::_1));
232  _server.set_message_handler(
233  std::bind(&Server::handleMessage, this, std::placeholders::_1, std::placeholders::_2));
234  _server.set_reuse_addr(true);
235  _server.set_listen_backlog(128);
236 
237  // Callback queue for handling client requests.
238  _handlerCallbackQueue = std::make_unique<CallbackQueue>(_logger, /*numThreads=*/1ul);
239 }
240 
241 template <typename ServerConfiguration>
243 
244 template <typename ServerConfiguration>
246  std::error_code ec;
247  _server.get_con_from_hdl(hdl)->get_raw_socket().set_option(Tcp::no_delay(true), ec);
248  if (ec) {
249  _server.get_elog().write(RECOVERABLE, "Failed to set TCP_NODELAY: " + ec.message());
250  }
251 }
252 
253 template <typename ServerConfiguration>
255  auto con = _server.get_con_from_hdl(hdl);
256 
257  const auto& subprotocols = con->get_requested_subprotocols();
258  if (std::find(subprotocols.begin(), subprotocols.end(), SUPPORTED_SUBPROTOCOL) !=
259  subprotocols.end()) {
260  con->select_subprotocol(SUPPORTED_SUBPROTOCOL);
261  return true;
262  }
263  _server.get_alog().write(APP, "Rejecting client " + remoteEndpointString(hdl) +
264  " which did not declare support for subprotocol " +
266  return false;
267 }
268 
269 template <typename ServerConfiguration>
271  auto con = _server.get_con_from_hdl(hdl);
272  const auto endpoint = remoteEndpointString(hdl);
273  _server.get_alog().write(APP, "Client " + endpoint + " connected via " + con->get_resource());
274 
275  {
276  std::unique_lock<std::shared_mutex> lock(_clientsMutex);
277  _clients.emplace(hdl, ClientInfo(endpoint, hdl));
278  }
279 
280  con->send(json({
281  {"op", "serverInfo"},
282  {"name", _name},
283  {"capabilities", _options.capabilities},
284  {"supportedEncodings", _options.supportedEncodings},
285  {"metadata", _options.metadata},
286  {"sessionId", _options.sessionId},
287  })
288  .dump());
289 
290  std::vector<Channel> channels;
291  {
292  std::shared_lock<std::shared_mutex> lock(_channelsMutex);
293  for (const auto& [id, channel] : _channels) {
294  (void)id;
295  channels.push_back(channel);
296  }
297  }
298  sendJson(hdl, {
299  {"op", "advertise"},
300  {"channels", std::move(channels)},
301  });
302 
303  std::vector<Service> services;
304  {
305  std::shared_lock<std::shared_mutex> lock(_servicesMutex);
306  for (const auto& [id, service] : _services) {
307  services.push_back(Service(service, id));
308  }
309  }
310  sendJson(hdl, {
311  {"op", "advertiseServices"},
312  {"services", std::move(services)},
313  });
314 }
315 
316 template <typename ServerConfiguration>
318  std::unordered_map<ChannelId, SubscriptionId> oldSubscriptionsByChannel;
319  std::unordered_set<ClientChannelId> oldAdvertisedChannels;
320  std::string clientName;
321  bool wasSubscribedToConnectionGraph;
322  {
323  std::unique_lock<std::shared_mutex> lock(_clientsMutex);
324  const auto clientIt = _clients.find(hdl);
325  if (clientIt == _clients.end()) {
326  _server.get_elog().write(RECOVERABLE, "Client " + remoteEndpointString(hdl) +
327  " disconnected but not found in _clients");
328  return;
329  }
330 
331  const auto& client = clientIt->second;
332  clientName = client.name;
333  _server.get_alog().write(APP, "Client " + clientName + " disconnected");
334 
335  oldSubscriptionsByChannel = std::move(client.subscriptionsByChannel);
336  oldAdvertisedChannels = std::move(client.advertisedChannels);
337  wasSubscribedToConnectionGraph = client.subscribedToConnectionGraph;
338  _clients.erase(clientIt);
339  }
340 
341  // Unadvertise all channels this client advertised
342  for (const auto clientChannelId : oldAdvertisedChannels) {
343  _server.get_alog().write(APP, "Client " + clientName + " unadvertising channel " +
344  std::to_string(clientChannelId) + " due to disconnect");
346  _handlers.clientUnadvertiseHandler(clientChannelId, hdl);
347  }
348  }
349 
350  {
351  std::unique_lock<std::shared_mutex> lock(_clientChannelsMutex);
352  _clientChannels.erase(hdl);
353  }
354 
355  // Unsubscribe all channels this client subscribed to
357  for (const auto& [chanId, subs] : oldSubscriptionsByChannel) {
358  (void)subs;
359  _handlers.unsubscribeHandler(chanId, hdl);
360  }
361  }
362 
363  // Unsubscribe from parameters this client subscribed to
364  std::unordered_set<std::string> clientSubscribedParameters;
365  {
366  std::lock_guard<std::mutex> lock(_clientParamSubscriptionsMutex);
367  clientSubscribedParameters = _clientParamSubscriptions[hdl];
368  _clientParamSubscriptions.erase(hdl);
369  }
370  unsubscribeParamsWithoutSubscriptions(hdl, clientSubscribedParameters);
371 
372  if (wasSubscribedToConnectionGraph) {
373  std::unique_lock<std::shared_mutex> lock(_connectionGraphMutex);
374  _connectionGraph.subscriptionCount--;
375  if (_connectionGraph.subscriptionCount == 0 && _handlers.subscribeConnectionGraphHandler) {
376  _server.get_alog().write(APP, "Unsubscribing from connection graph updates.");
378  }
379  }
380 
381 } // namespace foxglove
382 
383 template <typename ServerConfiguration>
385  _handlers = handlers;
386 }
387 
388 template <typename ServerConfiguration>
390  if (_server.stopped()) {
391  return;
392  }
393 
394  _server.get_alog().write(APP, "Stopping WebSocket server");
395  std::error_code ec;
396 
397  _server.stop_perpetual();
398 
399  if (_server.is_listening()) {
400  _server.stop_listening(ec);
401  if (ec) {
402  _server.get_elog().write(RECOVERABLE, "Failed to stop listening: " + ec.message());
403  }
404  }
405 
406  std::vector<std::shared_ptr<ConnectionType>> connections;
407  {
408  std::shared_lock<std::shared_mutex> lock(_clientsMutex);
409  connections.reserve(_clients.size());
410  for (const auto& [hdl, client] : _clients) {
411  (void)client;
412  if (auto connection = _server.get_con_from_hdl(hdl, ec)) {
413  connections.push_back(connection);
414  }
415  }
416  }
417 
418  if (!connections.empty()) {
419  _server.get_alog().write(
420  APP, "Closing " + std::to_string(connections.size()) + " client connection(s)");
421 
422  // Iterate over all client connections and start the close connection handshake
423  for (const auto& connection : connections) {
424  connection->close(websocketpp::close::status::going_away, "server shutdown", ec);
425  if (ec) {
426  _server.get_elog().write(RECOVERABLE, "Failed to close connection: " + ec.message());
427  }
428  }
429 
430  // Wait for all connections to close
431  constexpr size_t MAX_SHUTDOWN_MS = 1000;
432  constexpr size_t SLEEP_MS = 10;
433  size_t durationMs = 0;
434  while (!_server.stopped() && durationMs < MAX_SHUTDOWN_MS) {
435  std::this_thread::sleep_for(std::chrono::milliseconds(SLEEP_MS));
436  _server.poll_one();
437  durationMs += SLEEP_MS;
438  }
439 
440  if (!_server.stopped()) {
441  _server.get_elog().write(RECOVERABLE, "Failed to close all connections, forcefully stopping");
442  for (const auto& hdl : connections) {
443  if (auto con = _server.get_con_from_hdl(hdl, ec)) {
444  _server.get_elog().write(RECOVERABLE,
445  "Terminating connection to " + remoteEndpointString(hdl));
446  con->terminate(ec);
447  }
448  }
449  _server.stop();
450  }
451  }
452 
453  _server.get_alog().write(APP, "All WebSocket connections closed");
454 
455  if (_serverThread) {
456  _server.get_alog().write(APP, "Waiting for WebSocket server run loop to terminate");
457  _serverThread->join();
458  _serverThread.reset();
459  _server.get_alog().write(APP, "WebSocket server run loop terminated");
460  }
461 
462  std::unique_lock<std::shared_mutex> lock(_clientsMutex);
463  _clients.clear();
464 }
465 
466 template <typename ServerConfiguration>
467 inline void Server<ServerConfiguration>::start(const std::string& host, uint16_t port) {
468  if (_serverThread) {
469  throw std::runtime_error("Server already started");
470  }
471 
472  std::error_code ec;
473 
474  _server.listen(host, std::to_string(port), ec);
475  if (ec) {
476  throw std::runtime_error("Failed to listen on port " + std::to_string(port) + ": " +
477  ec.message());
478  }
479 
480  _server.start_accept(ec);
481  if (ec) {
482  throw std::runtime_error("Failed to start accepting connections: " + ec.message());
483  }
484 
485  _serverThread = std::make_unique<std::thread>([this]() {
486  _server.get_alog().write(APP, "WebSocket server run loop started");
487  _server.run();
488  _server.get_alog().write(APP, "WebSocket server run loop stopped");
489  });
490 
491  if (!_server.is_listening()) {
492  throw std::runtime_error("WebSocket server failed to listen on port " + std::to_string(port));
493  }
494 
495  auto endpoint = _server.get_local_endpoint(ec);
496  if (ec) {
497  throw std::runtime_error("Failed to resolve the local endpoint: " + ec.message());
498  }
499 
500  const std::string protocol = _options.useTls ? "wss" : "ws";
501  auto address = endpoint.address();
502  _server.get_alog().write(APP, "WebSocket server listening at " + protocol + "://" +
503  IPAddressToString(address) + ":" +
504  std::to_string(endpoint.port()));
505 }
506 
507 template <typename ServerConfiguration>
509  try {
510  _server.send(hdl, std::move(payload).dump(), OpCode::TEXT);
511  } catch (std::exception const& e) {
512  _server.get_elog().write(RECOVERABLE, e.what());
513  }
514 }
515 
516 template <typename ServerConfiguration>
517 inline void Server<ServerConfiguration>::sendJsonRaw(ConnHandle hdl, const std::string& payload) {
518  try {
519  _server.send(hdl, payload, OpCode::TEXT);
520  } catch (std::exception const& e) {
521  _server.get_elog().write(RECOVERABLE, e.what());
522  }
523 }
524 
525 template <typename ServerConfiguration>
526 inline void Server<ServerConfiguration>::sendBinary(ConnHandle hdl, const uint8_t* payload,
527  size_t payloadSize) {
528  try {
529  _server.send(hdl, payload, payloadSize, OpCode::BINARY);
530  } catch (std::exception const& e) {
531  _server.get_elog().write(RECOVERABLE, e.what());
532  }
533 }
534 
535 template <typename ServerConfiguration>
537  const StatusLevel level,
538  const std::string& message) {
539  const std::string endpoint = remoteEndpointString(clientHandle);
540  const std::string logMessage = endpoint + ": " + message;
541  const auto logLevel = StatusLevelToLogLevel(level);
542  auto logger = level == StatusLevel::Info ? _server.get_alog() : _server.get_elog();
543  logger.write(logLevel, logMessage);
544 
545  sendJson(clientHandle, json{
546  {"op", "status"},
547  {"level", static_cast<uint8_t>(level)},
548  {"message", message},
549  });
550 }
551 
552 template <typename ServerConfiguration>
554  const OpCode op = msg->get_opcode();
555 
556  try {
557  switch (op) {
558  case OpCode::TEXT: {
559  _handlerCallbackQueue->addCallback([this, hdl, msg]() {
560  try {
561  handleTextMessage(hdl, msg);
562  } catch (const std::exception& e) {
563  sendStatusAndLogMsg(hdl, StatusLevel::Error, e.what());
564  } catch (...) {
566  "Exception occurred when executing text message handler");
567  }
568  });
569  } break;
570  case OpCode::BINARY: {
571  _handlerCallbackQueue->addCallback([this, hdl, msg]() {
572  try {
573  handleBinaryMessage(hdl, msg);
574  } catch (const std::exception& e) {
575  sendStatusAndLogMsg(hdl, StatusLevel::Error, e.what());
576  } catch (...) {
578  "Exception occurred when executing binary message handler");
579  }
580  });
581  } break;
582  default:
583  break;
584  }
585  } catch (std::exception const& ex) {
587  std::string{"Error parsing message: "} + ex.what());
588  }
589 }
590 
591 template <typename ServerConfiguration>
593  const json payload = json::parse(msg->get_payload());
594  const std::string& op = payload.at("op").get<std::string>();
595 
596  const auto requiredCapabilityIt = CAPABILITY_BY_CLIENT_OPERATION.find(op);
597  if (requiredCapabilityIt != CAPABILITY_BY_CLIENT_OPERATION.end() &&
598  !hasCapability(requiredCapabilityIt->second)) {
600  "Operation '" + op + "' not supported as server capability '" +
601  requiredCapabilityIt->second + "' is missing");
602  return;
603  }
604 
605  std::shared_lock<std::shared_mutex> clientsLock(_clientsMutex);
606  auto& clientInfo = _clients.at(hdl);
607 
608  const auto findSubscriptionBySubId = [&clientInfo](SubscriptionId subId) {
609  return std::find_if(clientInfo.subscriptionsByChannel.begin(),
610  clientInfo.subscriptionsByChannel.end(), [&subId](const auto& mo) {
611  return mo.second == subId;
612  });
613  };
614 
615  constexpr auto SUBSCRIBE = Integer("subscribe");
616  constexpr auto UNSUBSCRIBE = Integer("unsubscribe");
617  constexpr auto ADVERTISE = Integer("advertise");
618  constexpr auto UNADVERTISE = Integer("unadvertise");
619  constexpr auto GET_PARAMETERS = Integer("getParameters");
620  constexpr auto SET_PARAMETERS = Integer("setParameters");
621  constexpr auto SUBSCRIBE_PARAMETER_UPDATES = Integer("subscribeParameterUpdates");
622  constexpr auto UNSUBSCRIBE_PARAMETER_UPDATES = Integer("unsubscribeParameterUpdates");
623  constexpr auto SUBSCRIBE_CONNECTION_GRAPH = Integer("subscribeConnectionGraph");
624  constexpr auto UNSUBSCRIBE_CONNECTION_GRAPH = Integer("unsubscribeConnectionGraph");
625 
626  switch (Integer(op)) {
627  case SUBSCRIBE: {
629  return;
630  }
631  for (const auto& sub : payload.at("subscriptions")) {
632  SubscriptionId subId = sub.at("id");
633  ChannelId channelId = sub.at("channelId");
634  if (findSubscriptionBySubId(subId) != clientInfo.subscriptionsByChannel.end()) {
636  "Client subscription id " + std::to_string(subId) +
637  " was already used; ignoring subscription");
638  continue;
639  }
640  const auto& channelIt = _channels.find(channelId);
641  if (channelIt == _channels.end()) {
644  "Channel " + std::to_string(channelId) + " is not available; ignoring subscription");
645  continue;
646  }
647 
648  try {
649  _handlers.subscribeHandler(channelId, hdl);
650  clientInfo.subscriptionsByChannel.emplace(channelId, subId);
651  } catch (const ChannelError& e) {
652  sendStatusAndLogMsg(hdl, StatusLevel::Error, e.what());
653  } catch (...) {
654  sendStatusAndLogMsg(hdl, StatusLevel::Error, op + ": Failed to execute handler");
655  }
656  }
657  } break;
658  case UNSUBSCRIBE: {
660  return;
661  }
662  for (const auto& subIdJson : payload.at("subscriptionIds")) {
663  SubscriptionId subId = subIdJson;
664  const auto& sub = findSubscriptionBySubId(subId);
665  if (sub == clientInfo.subscriptionsByChannel.end()) {
667  "Client subscription id " + std::to_string(subId) +
668  " did not exist; ignoring unsubscription");
669  continue;
670  }
671 
672  ChannelId chanId = sub->first;
673  try {
674  _handlers.unsubscribeHandler(chanId, hdl);
675  clientInfo.subscriptionsByChannel.erase(sub);
676  } catch (const ChannelError& e) {
677  sendStatusAndLogMsg(hdl, StatusLevel::Error, e.what());
678  } catch (...) {
679  sendStatusAndLogMsg(hdl, StatusLevel::Error, op + ": Failed to execute handler");
680  }
681  }
682  } break;
683  case ADVERTISE: {
685  return;
686  }
687  std::unique_lock<std::shared_mutex> clientChannelsLock(_clientChannelsMutex);
688  auto [clientPublicationsIt, isFirstPublication] =
689  _clientChannels.emplace(hdl, std::unordered_map<ClientChannelId, ClientAdvertisement>());
690 
691  auto& clientPublications = clientPublicationsIt->second;
692 
693  for (const auto& chan : payload.at("channels")) {
694  ClientChannelId channelId = chan.at("id");
695  if (!isFirstPublication && clientPublications.find(channelId) != clientPublications.end()) {
697  "Channel " + std::to_string(channelId) + " was already advertised");
698  continue;
699  }
700 
701  const auto topic = chan.at("topic").get<std::string>();
704  "Can't advertise channel " + std::to_string(channelId) + ", topic '" +
705  topic + "' not whitelisted");
706  continue;
707  }
708  ClientAdvertisement advertisement{};
709  advertisement.channelId = channelId;
710  advertisement.topic = topic;
711  advertisement.encoding = chan.at("encoding").get<std::string>();
712  advertisement.schemaName = chan.at("schemaName").get<std::string>();
713 
714  try {
715  _handlers.clientAdvertiseHandler(advertisement, hdl);
716  clientPublications.emplace(channelId, advertisement);
717  clientInfo.advertisedChannels.emplace(channelId);
718  } catch (const ClientChannelError& e) {
719  sendStatusAndLogMsg(hdl, StatusLevel::Error, e.what());
720  } catch (...) {
721  sendStatusAndLogMsg(hdl, StatusLevel::Error, op + ": Failed to execute handler");
722  }
723  }
724  } break;
725  case UNADVERTISE: {
727  return;
728  }
729  std::unique_lock<std::shared_mutex> clientChannelsLock(_clientChannelsMutex);
730  auto clientPublicationsIt = _clientChannels.find(hdl);
731  if (clientPublicationsIt == _clientChannels.end()) {
732  sendStatusAndLogMsg(hdl, StatusLevel::Error, "Client has no advertised channels");
733  break;
734  }
735 
736  auto& clientPublications = clientPublicationsIt->second;
737 
738  for (const auto& chanIdJson : payload.at("channelIds")) {
739  ClientChannelId channelId = chanIdJson.get<ClientChannelId>();
740  const auto& channelIt = clientPublications.find(channelId);
741  if (channelIt == clientPublications.end()) {
742  continue;
743  }
744 
745  try {
746  _handlers.clientUnadvertiseHandler(channelId, hdl);
747  clientPublications.erase(channelIt);
748  const auto advertisedChannelIt = clientInfo.advertisedChannels.find(channelId);
749  if (advertisedChannelIt != clientInfo.advertisedChannels.end()) {
750  clientInfo.advertisedChannels.erase(advertisedChannelIt);
751  }
752  } catch (const ClientChannelError& e) {
753  sendStatusAndLogMsg(hdl, StatusLevel::Error, e.what());
754  } catch (...) {
755  sendStatusAndLogMsg(hdl, StatusLevel::Error, op + ": Failed to execute handler");
756  }
757  }
758  } break;
759  case GET_PARAMETERS: {
761  return;
762  }
763 
764  const auto paramNames = payload.at("parameterNames").get<std::vector<std::string>>();
765  const auto requestId = payload.find("id") == payload.end()
766  ? std::nullopt
767  : std::optional<std::string>(payload["id"].get<std::string>());
768 
769  try {
770  _handlers.parameterRequestHandler(paramNames, requestId, hdl);
771  } catch (const std::exception& e) {
772  sendStatusAndLogMsg(hdl, StatusLevel::Error, e.what());
773  } catch (...) {
774  sendStatusAndLogMsg(hdl, StatusLevel::Error, op + ": Failed to execute handler");
775  }
776  } break;
777  case SET_PARAMETERS: {
779  return;
780  }
781 
782  const auto parameters = payload.at("parameters").get<std::vector<Parameter>>();
783  const auto requestId = payload.find("id") == payload.end()
784  ? std::nullopt
785  : std::optional<std::string>(payload["id"].get<std::string>());
786  try {
787  _handlers.parameterChangeHandler(parameters, requestId, hdl);
788  } catch (const std::exception& e) {
789  sendStatusAndLogMsg(hdl, StatusLevel::Error, e.what());
790  } catch (...) {
791  sendStatusAndLogMsg(hdl, StatusLevel::Error, op + ": Failed to execute handler");
792  }
793  } break;
794  case SUBSCRIBE_PARAMETER_UPDATES: {
796  return;
797  }
798 
799  const auto paramNames = payload.at("parameterNames").get<std::unordered_set<std::string>>();
800  std::vector<std::string> paramsToSubscribe;
801  {
802  // Only consider parameters that are not subscribed yet (by this or by other clients)
803  std::lock_guard<std::mutex> lock(_clientParamSubscriptionsMutex);
804  std::copy_if(paramNames.begin(), paramNames.end(), std::back_inserter(paramsToSubscribe),
805  [this](const std::string& paramName) {
806  return !isParameterSubscribed(paramName);
807  });
808 
809  // Update the client's parameter subscriptions.
810  auto& clientSubscribedParams = _clientParamSubscriptions[hdl];
811  clientSubscribedParams.insert(paramNames.begin(), paramNames.end());
812  }
813 
814  if (paramsToSubscribe.empty()) {
815  return;
816  }
817 
818  try {
819  _handlers.parameterSubscriptionHandler(paramsToSubscribe,
821  } catch (const std::exception& e) {
822  sendStatusAndLogMsg(hdl, StatusLevel::Error, e.what());
823  } catch (...) {
824  sendStatusAndLogMsg(hdl, StatusLevel::Error, op + ": Failed to execute handler");
825  }
826  } break;
827  case UNSUBSCRIBE_PARAMETER_UPDATES: {
829  return;
830  }
831 
832  const auto paramNames = payload.at("parameterNames").get<std::unordered_set<std::string>>();
833  {
834  std::lock_guard<std::mutex> lock(_clientParamSubscriptionsMutex);
835  auto& clientSubscribedParams = _clientParamSubscriptions[hdl];
836  for (const auto& paramName : paramNames) {
837  clientSubscribedParams.erase(paramName);
838  }
839  }
840 
841  unsubscribeParamsWithoutSubscriptions(hdl, paramNames);
842  } break;
843  case SUBSCRIBE_CONNECTION_GRAPH: {
845  return;
846  }
847 
848  bool subscribeToConnnectionGraph = false;
849  {
850  std::unique_lock<std::shared_mutex> lock(_connectionGraphMutex);
851  _connectionGraph.subscriptionCount++;
852  subscribeToConnnectionGraph = _connectionGraph.subscriptionCount == 1;
853  }
854 
855  if (subscribeToConnnectionGraph) {
856  // First subscriber, let the handler know that we are interested in updates.
857  _server.get_alog().write(APP, "Subscribing to connection graph updates.");
859  clientInfo.subscribedToConnectionGraph = true;
860  }
861 
862  json::array_t publishedTopicsJson, subscribedTopicsJson, advertisedServicesJson;
863  {
864  std::shared_lock<std::shared_mutex> lock(_connectionGraphMutex);
865  for (const auto& [name, ids] : _connectionGraph.publishedTopics) {
866  publishedTopicsJson.push_back(nlohmann::json{{"name", name}, {"publisherIds", ids}});
867  }
868  for (const auto& [name, ids] : _connectionGraph.subscribedTopics) {
869  subscribedTopicsJson.push_back(nlohmann::json{{"name", name}, {"subscriberIds", ids}});
870  }
871  for (const auto& [name, ids] : _connectionGraph.advertisedServices) {
872  advertisedServicesJson.push_back(nlohmann::json{{"name", name}, {"providerIds", ids}});
873  }
874  }
875 
876  const json jsonMsg = {
877  {"op", "connectionGraphUpdate"},
878  {"publishedTopics", publishedTopicsJson},
879  {"subscribedTopics", subscribedTopicsJson},
880  {"advertisedServices", advertisedServicesJson},
881  {"removedTopics", json::array()},
882  {"removedServices", json::array()},
883  };
884 
885  sendJsonRaw(hdl, jsonMsg.dump());
886  } break;
887  case UNSUBSCRIBE_CONNECTION_GRAPH: {
889  return;
890  }
891 
892  if (clientInfo.subscribedToConnectionGraph) {
893  clientInfo.subscribedToConnectionGraph = false;
894  bool unsubscribeFromConnnectionGraph = false;
895  {
896  std::unique_lock<std::shared_mutex> lock(_connectionGraphMutex);
897  _connectionGraph.subscriptionCount--;
898  unsubscribeFromConnnectionGraph = _connectionGraph.subscriptionCount == 0;
899  }
900  if (unsubscribeFromConnnectionGraph) {
901  _server.get_alog().write(APP, "Unsubscribing from connection graph updates.");
903  }
904  } else {
906  "Client was not subscribed to connection graph updates");
907  }
908  } break;
909  default: {
910  sendStatusAndLogMsg(hdl, StatusLevel::Error, "Unrecognized client opcode \"" + op + "\"");
911  } break;
912  }
913 }
914 
915 template <typename ServerConfiguration>
917  const auto& payload = msg->get_payload();
918  const uint8_t* data = reinterpret_cast<const uint8_t*>(payload.data());
919  const size_t length = payload.size();
920 
921  if (length < 1) {
922  sendStatusAndLogMsg(hdl, StatusLevel::Error, "Received an empty binary message");
923  return;
924  }
925 
926  const auto op = static_cast<ClientBinaryOpcode>(data[0]);
927 
928  const auto requiredCapabilityIt = CAPABILITY_BY_CLIENT_BINARY_OPERATION.find(op);
929  if (requiredCapabilityIt != CAPABILITY_BY_CLIENT_BINARY_OPERATION.end() &&
930  !hasCapability(requiredCapabilityIt->second)) {
932  "Binary operation '" + std::to_string(static_cast<int>(op)) +
933  "' not supported as server capability '" + requiredCapabilityIt->second +
934  "' is missing");
935  return;
936  }
937 
938  switch (op) {
941  return;
942  }
943 
944  if (length < 5) {
946  "Invalid message length " + std::to_string(length));
947  return;
948  }
949  const auto timestamp = std::chrono::duration_cast<std::chrono::nanoseconds>(
950  std::chrono::high_resolution_clock::now().time_since_epoch())
951  .count();
952  const ClientChannelId channelId = *reinterpret_cast<const ClientChannelId*>(data + 1);
953  std::shared_lock<std::shared_mutex> lock(_clientChannelsMutex);
954 
955  auto clientPublicationsIt = _clientChannels.find(hdl);
956  if (clientPublicationsIt == _clientChannels.end()) {
957  sendStatusAndLogMsg(hdl, StatusLevel::Error, "Client has no advertised channels");
958  return;
959  }
960 
961  auto& clientPublications = clientPublicationsIt->second;
962  const auto& channelIt = clientPublications.find(channelId);
963  if (channelIt == clientPublications.end()) {
965  "Channel " + std::to_string(channelId) + " is not advertised");
966  return;
967  }
968 
969  try {
970  const auto& advertisement = channelIt->second;
971  const uint32_t sequence = 0;
972  const ClientMessage clientMessage{static_cast<uint64_t>(timestamp),
973  static_cast<uint64_t>(timestamp),
974  sequence,
975  advertisement,
976  length,
977  data};
978  _handlers.clientMessageHandler(clientMessage, hdl);
979  } catch (const ServiceError& e) {
980  sendStatusAndLogMsg(hdl, StatusLevel::Error, e.what());
981  } catch (...) {
982  sendStatusAndLogMsg(hdl, StatusLevel::Error, "callService: Failed to execute handler");
983  }
984  } break;
986  ServiceRequest request;
987  if (length < request.size()) {
989  "Invalid service call request length " + std::to_string(length));
990  return;
991  }
992 
993  request.read(data + 1, length - 1);
994 
995  {
996  std::shared_lock<std::shared_mutex> lock(_servicesMutex);
997  if (_services.find(request.serviceId) == _services.end()) {
999  hdl, StatusLevel::Error,
1000  "Service " + std::to_string(request.serviceId) + " is not advertised");
1001  return;
1002  }
1003  }
1004 
1006  _handlers.serviceRequestHandler(request, hdl);
1007  }
1008  } break;
1009  default: {
1011  "Unrecognized client opcode " + std::to_string(uint8_t(op)));
1012  } break;
1013  }
1014 }
1015 
1016 template <typename ServerConfiguration>
1017 inline std::vector<ChannelId> Server<ServerConfiguration>::addChannels(
1018  const std::vector<ChannelWithoutId>& channels) {
1019  if (channels.empty()) {
1020  return {};
1021  }
1022 
1023  std::vector<ChannelId> channelIds;
1024  channelIds.reserve(channels.size());
1025  json::array_t channelsJson;
1026 
1027  {
1028  std::unique_lock<std::shared_mutex> lock(_channelsMutex);
1029  for (const auto& channelWithoutId : channels) {
1030  const auto newId = ++_nextChannelId;
1031  channelIds.push_back(newId);
1032  Channel newChannel{newId, channelWithoutId};
1033  channelsJson.push_back(newChannel);
1034  _channels.emplace(newId, std::move(newChannel));
1035  }
1036  }
1037 
1038  const auto msg = json{{"op", "advertise"}, {"channels", channelsJson}}.dump();
1039  std::shared_lock<std::shared_mutex> clientsLock(_clientsMutex);
1040  for (const auto& [hdl, clientInfo] : _clients) {
1041  (void)clientInfo;
1042  sendJsonRaw(hdl, msg);
1043  }
1044 
1045  return channelIds;
1046 }
1047 
1048 template <typename ServerConfiguration>
1049 inline void Server<ServerConfiguration>::removeChannels(const std::vector<ChannelId>& channelIds) {
1050  if (channelIds.empty()) {
1051  return;
1052  }
1053 
1054  {
1055  std::unique_lock<std::shared_mutex> channelsLock(_channelsMutex);
1056  for (auto channelId : channelIds) {
1057  _channels.erase(channelId);
1058  }
1059  }
1060 
1061  const auto msg = json{{"op", "unadvertise"}, {"channelIds", channelIds}}.dump();
1062 
1063  std::unique_lock<std::shared_mutex> clientsLock(_clientsMutex);
1064  for (auto& [hdl, clientInfo] : _clients) {
1065  for (auto channelId : channelIds) {
1066  if (const auto it = clientInfo.subscriptionsByChannel.find(channelId);
1067  it != clientInfo.subscriptionsByChannel.end()) {
1068  clientInfo.subscriptionsByChannel.erase(it);
1069  }
1070  }
1071  sendJsonRaw(hdl, msg);
1072  }
1073 }
1074 
1075 template <typename ServerConfiguration>
1077  ConnHandle hdl, const std::vector<Parameter>& parameters,
1078  const std::optional<std::string>& requestId) {
1079  // Filter out parameters which are not set.
1080  std::vector<Parameter> nonEmptyParameters;
1081  std::copy_if(parameters.begin(), parameters.end(), std::back_inserter(nonEmptyParameters),
1082  [](const auto& p) {
1083  return p.getType() != ParameterType::PARAMETER_NOT_SET;
1084  });
1085 
1086  nlohmann::json jsonPayload{{"op", "parameterValues"}, {"parameters", nonEmptyParameters}};
1087  if (requestId) {
1088  jsonPayload["id"] = requestId.value();
1089  }
1090  sendJsonRaw(hdl, jsonPayload.dump());
1091 }
1092 
1093 template <typename ServerConfiguration>
1095  const std::vector<Parameter>& parameters) {
1096  std::lock_guard<std::mutex> lock(_clientParamSubscriptionsMutex);
1097  for (const auto& clientParamSubscriptions : _clientParamSubscriptions) {
1098  std::vector<Parameter> paramsToSendToClient;
1099 
1100  // Only consider parameters that are subscribed by the client
1101  std::copy_if(parameters.begin(), parameters.end(), std::back_inserter(paramsToSendToClient),
1102  [clientParamSubscriptions](const Parameter& param) {
1103  return clientParamSubscriptions.second.find(param.getName()) !=
1104  clientParamSubscriptions.second.end();
1105  });
1106 
1107  if (!paramsToSendToClient.empty()) {
1108  publishParameterValues(clientParamSubscriptions.first, paramsToSendToClient);
1109  }
1110  }
1111 }
1112 
1113 template <typename ServerConfiguration>
1114 inline std::vector<ServiceId> Server<ServerConfiguration>::addServices(
1115  const std::vector<ServiceWithoutId>& services) {
1116  if (services.empty()) {
1117  return {};
1118  }
1119 
1120  std::unique_lock<std::shared_mutex> lock(_servicesMutex);
1121  std::vector<ServiceId> serviceIds;
1122  json newServices;
1123  for (const auto& service : services) {
1124  const ServiceId serviceId = ++_nextServiceId;
1125  _services.emplace(serviceId, service);
1126  serviceIds.push_back(serviceId);
1127  newServices.push_back(Service(service, serviceId));
1128  }
1129 
1130  const auto msg = json{{"op", "advertiseServices"}, {"services", std::move(newServices)}}.dump();
1131  std::shared_lock<std::shared_mutex> clientsLock(_clientsMutex);
1132  for (const auto& [hdl, clientInfo] : _clients) {
1133  (void)clientInfo;
1134  sendJsonRaw(hdl, msg);
1135  }
1136 
1137  return serviceIds;
1138 }
1139 
1140 template <typename ServerConfiguration>
1141 inline void Server<ServerConfiguration>::removeServices(const std::vector<ServiceId>& serviceIds) {
1142  std::unique_lock<std::shared_mutex> lock(_servicesMutex);
1143  std::vector<ServiceId> removedServices;
1144  for (const auto& serviceId : serviceIds) {
1145  if (const auto it = _services.find(serviceId); it != _services.end()) {
1146  _services.erase(it);
1147  removedServices.push_back(serviceId);
1148  }
1149  }
1150 
1151  if (!removedServices.empty()) {
1152  const auto msg =
1153  json{{"op", "unadvertiseServices"}, {"serviceIds", std::move(removedServices)}}.dump();
1154  std::shared_lock<std::shared_mutex> clientsLock(_clientsMutex);
1155  for (const auto& [hdl, clientInfo] : _clients) {
1156  (void)clientInfo;
1157  sendJsonRaw(hdl, msg);
1158  }
1159  }
1160 }
1161 
1162 template <typename ServerConfiguration>
1164  uint64_t timestamp, const uint8_t* payload,
1165  size_t payloadSize) {
1166  std::error_code ec;
1167  const auto con = _server.get_con_from_hdl(clientHandle, ec);
1168  if (ec || !con) {
1169  return;
1170  }
1171 
1172  const auto bufferSizeinBytes = con->get_buffered_amount();
1173  if (bufferSizeinBytes + payloadSize >= _options.sendBufferLimitBytes) {
1174  const auto logFn = [this, clientHandle]() {
1175  sendStatusAndLogMsg(clientHandle, StatusLevel::Warning, "Send buffer limit reached");
1176  };
1177  FOXGLOVE_DEBOUNCE(logFn, 2500);
1178  return;
1179  }
1180 
1181  SubscriptionId subId = std::numeric_limits<SubscriptionId>::max();
1182 
1183  {
1184  std::shared_lock<std::shared_mutex> lock(_clientsMutex);
1185  const auto clientHandleAndInfoIt = _clients.find(clientHandle);
1186  if (clientHandleAndInfoIt == _clients.end()) {
1187  return; // Client got removed in the meantime.
1188  }
1189 
1190  const auto& client = clientHandleAndInfoIt->second;
1191  const auto& subs = client.subscriptionsByChannel.find(chanId);
1192  if (subs == client.subscriptionsByChannel.end()) {
1193  return; // Client not subscribed to this channel.
1194  }
1195  subId = subs->second;
1196  }
1197 
1198  std::array<uint8_t, 1 + 4 + 8> msgHeader;
1199  msgHeader[0] = uint8_t(BinaryOpcode::MESSAGE_DATA);
1200  foxglove::WriteUint32LE(msgHeader.data() + 1, subId);
1201  foxglove::WriteUint64LE(msgHeader.data() + 5, timestamp);
1202 
1203  const size_t messageSize = msgHeader.size() + payloadSize;
1204  auto message = con->get_message(OpCode::BINARY, messageSize);
1205  message->set_compressed(_options.useCompression);
1206 
1207  message->set_payload(msgHeader.data(), msgHeader.size());
1208  message->append_payload(payload, payloadSize);
1209  con->send(message);
1210 }
1211 
1212 template <typename ServerConfiguration>
1213 inline void Server<ServerConfiguration>::broadcastTime(uint64_t timestamp) {
1214  std::array<uint8_t, 1 + 8> message;
1215  message[0] = uint8_t(BinaryOpcode::TIME_DATA);
1216  foxglove::WriteUint64LE(message.data() + 1, timestamp);
1217 
1218  std::shared_lock<std::shared_mutex> lock(_clientsMutex);
1219  for (const auto& [hdl, clientInfo] : _clients) {
1220  (void)clientInfo;
1221  sendBinary(hdl, message.data(), message.size());
1222  }
1223 }
1224 
1225 template <typename ServerConfiguration>
1227  const ServiceResponse& response) {
1228  std::vector<uint8_t> payload(1 + response.size());
1229  payload[0] = uint8_t(BinaryOpcode::SERVICE_CALL_RESPONSE);
1230  response.write(payload.data() + 1);
1231  sendBinary(clientHandle, payload.data(), payload.size());
1232 }
1233 
1234 template <typename ServerConfiguration>
1236  std::error_code ec;
1237  auto endpoint = _server.get_local_endpoint(ec);
1238  if (ec) {
1239  throw std::runtime_error("Server not listening on any port. Has it been started before?");
1240  }
1241  return endpoint.port();
1242 }
1243 
1244 template <typename ServerConfiguration>
1247  const MapOfSets& advertisedServices) {
1248  json::array_t publisherDiff, subscriberDiff, servicesDiff;
1249  std::unordered_set<std::string> topicNames, serviceNames;
1250  std::unordered_set<std::string> knownTopicNames, knownServiceNames;
1251  {
1252  std::unique_lock<std::shared_mutex> lock(_connectionGraphMutex);
1253  for (const auto& [name, publisherIds] : publishedTopics) {
1254  const auto it = _connectionGraph.publishedTopics.find(name);
1255  if (it == _connectionGraph.publishedTopics.end() ||
1256  _connectionGraph.publishedTopics[name] != publisherIds) {
1257  publisherDiff.push_back(nlohmann::json{{"name", name}, {"publisherIds", publisherIds}});
1258  }
1259  topicNames.insert(name);
1260  }
1261  for (const auto& [name, subscriberIds] : subscribedTopics) {
1262  const auto it = _connectionGraph.subscribedTopics.find(name);
1263  if (it == _connectionGraph.subscribedTopics.end() ||
1264  _connectionGraph.subscribedTopics[name] != subscriberIds) {
1265  subscriberDiff.push_back(nlohmann::json{{"name", name}, {"subscriberIds", subscriberIds}});
1266  }
1267  topicNames.insert(name);
1268  }
1269  for (const auto& [name, providerIds] : advertisedServices) {
1270  const auto it = _connectionGraph.advertisedServices.find(name);
1271  if (it == _connectionGraph.advertisedServices.end() ||
1272  _connectionGraph.advertisedServices[name] != providerIds) {
1273  servicesDiff.push_back(nlohmann::json{{"name", name}, {"providerIds", providerIds}});
1274  }
1275  serviceNames.insert(name);
1276  }
1277 
1278  for (const auto& nameWithIds : _connectionGraph.publishedTopics) {
1279  knownTopicNames.insert(nameWithIds.first);
1280  }
1281  for (const auto& nameWithIds : _connectionGraph.subscribedTopics) {
1282  knownTopicNames.insert(nameWithIds.first);
1283  }
1284  for (const auto& nameWithIds : _connectionGraph.advertisedServices) {
1285  knownServiceNames.insert(nameWithIds.first);
1286  }
1287 
1288  _connectionGraph.publishedTopics = publishedTopics;
1289  _connectionGraph.subscribedTopics = subscribedTopics;
1290  _connectionGraph.advertisedServices = advertisedServices;
1291  }
1292 
1293  std::vector<std::string> removedTopics, removedServices;
1294  std::copy_if(knownTopicNames.begin(), knownTopicNames.end(), std::back_inserter(removedTopics),
1295  [&topicNames](const std::string& topic) {
1296  return topicNames.find(topic) == topicNames.end();
1297  });
1298  std::copy_if(knownServiceNames.begin(), knownServiceNames.end(),
1299  std::back_inserter(removedServices), [&serviceNames](const std::string& service) {
1300  return serviceNames.find(service) == serviceNames.end();
1301  });
1302 
1303  if (publisherDiff.empty() && subscriberDiff.empty() && servicesDiff.empty() &&
1304  removedTopics.empty() && removedServices.empty()) {
1305  return;
1306  }
1307 
1308  const json msg = {
1309  {"op", "connectionGraphUpdate"}, {"publishedTopics", publisherDiff},
1310  {"subscribedTopics", subscriberDiff}, {"advertisedServices", servicesDiff},
1311  {"removedTopics", removedTopics}, {"removedServices", removedServices},
1312  };
1313  const auto payload = msg.dump();
1314 
1315  std::shared_lock<std::shared_mutex> clientsLock(_clientsMutex);
1316  for (const auto& [hdl, clientInfo] : _clients) {
1317  if (clientInfo.subscribedToConnectionGraph) {
1318  _server.send(hdl, payload, OpCode::TEXT);
1319  }
1320  }
1321 }
1322 
1323 template <typename ServerConfiguration>
1325  std::error_code ec;
1326  const auto con = _server.get_con_from_hdl(clientHandle, ec);
1327  return con ? con->get_remote_endpoint() : "(unknown)";
1328 }
1329 
1330 template <typename ServerConfiguration>
1331 inline bool Server<ServerConfiguration>::isParameterSubscribed(const std::string& paramName) const {
1332  return std::find_if(_clientParamSubscriptions.begin(), _clientParamSubscriptions.end(),
1333  [paramName](const auto& paramSubscriptions) {
1334  return paramSubscriptions.second.find(paramName) !=
1335  paramSubscriptions.second.end();
1336  }) != _clientParamSubscriptions.end();
1337 }
1338 
1339 template <typename ServerConfiguration>
1341  ConnHandle hdl, const std::unordered_set<std::string>& paramNames) {
1342  std::vector<std::string> paramsToUnsubscribe;
1343  {
1344  std::lock_guard<std::mutex> lock(_clientParamSubscriptionsMutex);
1345  std::copy_if(paramNames.begin(), paramNames.end(), std::back_inserter(paramsToUnsubscribe),
1346  [this](const std::string& paramName) {
1347  return !isParameterSubscribed(paramName);
1348  });
1349  }
1350 
1351  if (_handlers.parameterSubscriptionHandler && !paramsToUnsubscribe.empty()) {
1352  for (const auto& param : paramsToUnsubscribe) {
1353  _server.get_alog().write(APP, "Unsubscribing from parameter '" + param + "'.");
1354  }
1355 
1356  try {
1357  _handlers.parameterSubscriptionHandler(paramsToUnsubscribe,
1359  } catch (const std::exception& e) {
1360  sendStatusAndLogMsg(hdl, StatusLevel::Error, e.what());
1361  } catch (...) {
1363  "Failed to unsubscribe from one more more parameters");
1364  }
1365  }
1366 }
1367 
1368 template <typename ServerConfiguration>
1369 inline bool Server<ServerConfiguration>::hasCapability(const std::string& capability) const {
1370  return std::find(_options.capabilities.begin(), _options.capabilities.end(), capability) !=
1371  _options.capabilities.end();
1372 }
1373 
1374 } // namespace foxglove
size_t size() const
Definition: common.hpp:134
ROSCPP_DECL uint32_t getPort()
bool hasCapability(const std::string &capability) const
void sendStatusAndLogMsg(ConnHandle clientHandle, const StatusLevel level, const std::string &message)
bool param(const std::string &param_name, T &param_val, const T &default_val)
uint32_t SubscriptionId
Definition: common.hpp:27
struct foxglove::Server::@0 _connectionGraph
ROSCPP_DECL void start()
void handleConnectionClosed(ConnHandle hdl)
constexpr char SUPPORTED_SUBPROTOCOL[]
Definition: common.hpp:12
uint16_t getPort() override
std::function< void(const std::vector< std::string > &, const std::optional< std::string > &, ConnectionHandle)> parameterRequestHandler
void updateParameterValues(const std::vector< Parameter > &parameters) override
uint32_t ServiceId
Definition: common.hpp:28
static const websocketpp::log::level WARNING
std::function< void(ChannelId, ConnectionHandle)> unsubscribeHandler
const std::unordered_map< ClientBinaryOpcode, std::string > CAPABILITY_BY_CLIENT_BINARY_OPERATION
Map of required capability by client operation (binary).
void unsubscribeParamsWithoutSubscriptions(ConnHandle hdl, const std::unordered_set< std::string > &paramNames)
std::vector< std::regex > clientTopicWhitelistPatterns
websocketpp::lib::asio::ip::tcp Tcp
void write(uint8_t *data) const
void read(const uint8_t *data, size_t size)
void handleTextMessage(ConnHandle hdl, MessagePtr msg)
nlohmann::json json
bool isWhitelisted(const std::string &name, const std::vector< std::regex > &regexPatterns)
Definition: regex_utils.hpp:10
ClientBinaryOpcode
Definition: common.hpp:36
static const websocketpp::log::level APP
void sendJsonRaw(ConnHandle hdl, const std::string &payload)
void sendMessage(ConnHandle clientHandle, ChannelId chanId, uint64_t timestamp, const uint8_t *payload, size_t payloadSize) override
uint32_t ChannelId
Definition: common.hpp:25
std::shared_mutex _clientChannelsMutex
websocketpp::connection< ServerConfiguration > ConnectionType
void handleMessage(ConnHandle hdl, MessagePtr msg)
void socketInit(ConnHandle hdl)
constexpr char CAPABILITY_CLIENT_PUBLISH[]
Definition: common.hpp:13
std::shared_mutex _connectionGraphMutex
std::unordered_map< std::string, std::string > metadata
ClientInfo(const std::string &name, ConnHandle handle)
std::function< void(const ServiceRequest &, ConnectionHandle)> serviceRequestHandler
std::function< void(const ClientAdvertisement &, ConnectionHandle)> clientAdvertiseHandler
bool isParameterSubscribed(const std::string &paramName) const
std::shared_mutex _servicesMutex
std::function< void(WebSocketLogLevel, char const *)> LogCallback
std::string remoteEndpointString(ConnHandle clientHandle) override
std::function< void(ChannelId, ConnectionHandle)> subscribeHandler
std::function< void(const std::vector< Parameter > &, const std::optional< std::string > &, ConnectionHandle)> parameterChangeHandler
constexpr uint32_t Integer(const std::string_view str)
std::unordered_map< std::string, std::unordered_set< std::string > > MapOfSets
constexpr char CAPABILITY_PARAMETERS_SUBSCRIBE[]
Definition: common.hpp:16
std::shared_mutex _channelsMutex
websocketpp::frame::opcode::value OpCode
void sendJson(ConnHandle hdl, json &&payload)
std::unordered_map< ChannelId, SubscriptionId > subscriptionsByChannel
typename ServerType::message_ptr MessagePtr
uint32_t ClientChannelId
Definition: common.hpp:26
const std::unordered_map< std::string, std::string > CAPABILITY_BY_CLIENT_OPERATION
Map of required capability by client operation (text).
MapOfSets advertisedServices
std::function< void(ClientChannelId, ConnectionHandle)> clientUnadvertiseHandler
void handleBinaryMessage(ConnHandle hdl, MessagePtr msg)
constexpr char CAPABILITY_CONNECTION_GRAPH[]
Definition: common.hpp:18
ServerOptions _options
void updateConnectionGraph(const MapOfSets &publishedTopics, const MapOfSets &subscribedTopics, const MapOfSets &advertisedServices) override
constexpr websocketpp::log::level StatusLevelToLogLevel(StatusLevel level)
std::vector< ChannelId > addChannels(const std::vector< ChannelWithoutId > &channels) override
void publishParameterValues(ConnHandle clientHandle, const std::vector< Parameter > &parameters, const std::optional< std::string > &requestId=std::nullopt) override
websocketpp::server< ServerConfiguration > ServerType
std::unordered_map< ChannelId, Channel > _channels
std::map< ConnHandle, std::unordered_set< std::string >, std::owner_less<> > _clientParamSubscriptions
std::unordered_set< ClientChannelId > advertisedChannels
void stop() override
std::unique_ptr< std::thread > _serverThread
std::unique_ptr< CallbackQueue > _handlerCallbackQueue
void broadcastTime(uint64_t timestamp) override
void removeServices(const std::vector< ServiceId > &serviceIds) override
std::function< void(bool)> subscribeConnectionGraphHandler
void setupTlsHandler()
void removeChannels(const std::vector< ChannelId > &channelIds) override
#define FOXGLOVE_DEBOUNCE(f, ms)
void sendServiceResponse(ConnHandle clientHandle, const ServiceResponse &response) override
bool validateConnection(ConnHandle hdl)
void setHandlers(ServerHandlers< ConnHandle > &&handlers) override
constexpr char CAPABILITY_SERVICES[]
Definition: common.hpp:17
void sendBinary(ConnHandle hdl, const uint8_t *payload, size_t payloadSize)
std::vector< std::string > capabilities
static const websocketpp::log::level RECOVERABLE
std::string IPAddressToString(const asio::ip::address &addr)
std::map< ConnHandle, std::unordered_map< ClientChannelId, ClientAdvertisement >, std::owner_less<> > _clientChannels
websocketpp::connection_hdl ConnHandle
std::shared_mutex _clientsMutex
void start(const std::string &host, uint16_t port) override
std::unordered_map< ServiceId, ServiceWithoutId > _services
constexpr char CAPABILITY_PARAMETERS[]
Definition: common.hpp:15
void handleConnectionOpened(ConnHandle hdl)
ClientChannelId channelId
Definition: common.hpp:76
std::mutex _clientParamSubscriptionsMutex
Server(std::string name, LogCallback logger, const ServerOptions &options)
std::vector< ServiceId > addServices(const std::vector< ServiceWithoutId > &services) override
std::map< ConnHandle, ClientInfo, std::owner_less<> > _clients
std::function< void(const ClientMessage &, ConnectionHandle)> clientMessageHandler
std::function< void(const std::vector< std::string > &, ParameterSubscriptionOperation, ConnectionHandle)> parameterSubscriptionHandler
void WriteUint64LE(uint8_t *buf, uint64_t val)
ServerHandlers< ConnHandle > _handlers
std::vector< std::string > supportedEncodings
void WriteUint32LE(uint8_t *buf, uint32_t val)


foxglove_bridge
Author(s): Foxglove
autogenerated on Mon Jul 3 2023 02:12:22